You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/01/11 11:41:50 UTC

[GitHub] [iceberg] openinx commented on a change in pull request #1936: Flink: Default read parallelism to number of splits

openinx commented on a change in pull request #1936:
URL: https://github.com/apache/iceberg/pull/1936#discussion_r554950693



##########
File path: flink/src/main/java/org/apache/iceberg/flink/FlinkTableOptions.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+public class FlinkTableOptions {
+
+  private FlinkTableOptions() {
+  }
+
+  public static final ConfigOption<Boolean> TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM =
+      ConfigOptions.key("table.exec.iceberg.infer-source-parallelism").defaultValue(true)

Review comment:
       The `defaultValue(true)` says deprecated now.  Let's change it to: 
   
   ```java
     public static final ConfigOption<Boolean> TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM =
         ConfigOptions.key("table.exec.iceberg.infer-source-parallelism").booleanType().defaultValue(true)
             .withDescription("If is false, parallelism of source are set by config.\n" +
                 "If is true, source parallelism is inferred according to splits number.\n");
   ```
   
   The similar thing in `TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX`.

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -195,7 +205,29 @@ public FlinkInputFormat buildFormat() {
       Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null");
       FlinkInputFormat format = buildFormat();
       if (isBounded(context)) {
-        return env.createInput(format, rowTypeInfo);
+        int parallelism = flinkConf.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);
+        if (flinkConf.get(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM)) {
+          int max = flinkConf.get(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX);
+          if (max < 1) {
+            throw new IllegalConfigurationException(
+                FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX.key() + " cannot be less than 1");
+          }
+
+          int splitNum = 0;
+          try {
+            FlinkInputSplit[] splits = format.createInputSplits(0);
+            splitNum = splits.length;
+          } catch (IOException e) {
+            throw new RuntimeException("get input split  error.", e);

Review comment:
       Nit: use `UncheckedIOException` here. 
   
   ```java
               throw new UncheckedIOException("Failed to create iceberg input splits for table: " + table, e);
   ```

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -195,7 +205,29 @@ public FlinkInputFormat buildFormat() {
       Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null");
       FlinkInputFormat format = buildFormat();
       if (isBounded(context)) {
-        return env.createInput(format, rowTypeInfo);
+        int parallelism = flinkConf.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);

Review comment:
       How about moving those lines into a separate method ? 

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -195,7 +205,29 @@ public FlinkInputFormat buildFormat() {
       Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null");
       FlinkInputFormat format = buildFormat();
       if (isBounded(context)) {
-        return env.createInput(format, rowTypeInfo);
+        int parallelism = flinkConf.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);
+        if (flinkConf.get(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM)) {
+          int max = flinkConf.get(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX);
+          if (max < 1) {
+            throw new IllegalConfigurationException(
+                FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX.key() + " cannot be less than 1");
+          }
+
+          int splitNum = 0;
+          try {
+            FlinkInputSplit[] splits = format.createInputSplits(0);
+            splitNum = splits.length;
+          } catch (IOException e) {
+            throw new RuntimeException("get input split  error.", e);
+          }
+
+          parallelism = Math.min(splitNum, max);
+        }
+
+        parallelism = limit > 0 ? Math.min(parallelism, (int) limit) : parallelism;

Review comment:
       It may be overflow when casting the long `limit` to integer  ?  I'd like to use `(int) Math.min(parallelism, limit)`.

##########
File path: flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
##########
@@ -103,4 +124,45 @@ public void testLimitPushDown() {
     Assert.assertEquals("should have 1 record", 1, mixedResult.size());
     Assert.assertArrayEquals("Should produce the expected records", mixedResult.get(0), new Object[] {1, "a"});
   }
+
+  @Test
+  public void testParallelismOptimize() {
+    sql("INSERT INTO %s  VALUES (1,'hello')", TABLE_NAME);
+    sql("INSERT INTO %s  VALUES (2,'iceberg')", TABLE_NAME);
+
+    TableEnvironment tenv = getTableEnv();
+
+    // empty table ,parallelism at least 1
+    Table tableEmpty = tenv.sqlQuery(String.format("SELECT * FROM %s", TABLE_NAME));
+    testParallelismSettingTranslateAndAssert(1, tableEmpty, tenv);
+
+    // make sure to generate 2 CombinedScanTasks
+    org.apache.iceberg.Table table = validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME));
+    Stream<FileScanTask> stream = StreamSupport.stream(table.newScan().planFiles().spliterator(), false);
+    Optional<FileScanTask> fileScanTaskOptional =  stream.max(Comparator.comparing(FileScanTask::length));
+    Assert.assertTrue(fileScanTaskOptional.isPresent());
+    long maxFileLen = fileScanTaskOptional.get().length();
+    sql("ALTER TABLE %s SET ('read.split.open-file-cost'='1', 'read.split.target-size'='%s')", TABLE_NAME, maxFileLen);
+
+    // 2 splits ,the parallelism is  2
+    Table tableSelect = tenv.sqlQuery(String.format("SELECT * FROM %s", TABLE_NAME));
+    testParallelismSettingTranslateAndAssert(2, tableSelect, tenv);
+
+    // 2 splits  and limit is 1 ,the parallelism is  1
+    Table tableLimit = tenv.sqlQuery(String.format("SELECT * FROM %s LIMIT 1", TABLE_NAME));
+    testParallelismSettingTranslateAndAssert(1, tableLimit, tenv);
+  }
+
+  private void testParallelismSettingTranslateAndAssert(int expected, Table table, TableEnvironment tEnv) {

Review comment:
       Is there another way to assert the parallelism as expected value ?  Here we're using flink's planner to get the `ExecNode` ,  I'm concerning that we're using flink's `Internal` codes which would be a big trouble when upgrading the flink version.  Pls see this PR https://github.com/apache/iceberg/pull/1956 

##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java
##########
@@ -195,7 +205,29 @@ public FlinkInputFormat buildFormat() {
       Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null");
       FlinkInputFormat format = buildFormat();
       if (isBounded(context)) {
-        return env.createInput(format, rowTypeInfo);
+        int parallelism = flinkConf.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);
+        if (flinkConf.get(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM)) {
+          int max = flinkConf.get(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX);
+          if (max < 1) {
+            throw new IllegalConfigurationException(
+                FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX.key() + " cannot be less than 1");
+          }

Review comment:
       nit: Preconditions.checkState ? 

##########
File path: flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
##########
@@ -103,4 +124,45 @@ public void testLimitPushDown() {
     Assert.assertEquals("should have 1 record", 1, mixedResult.size());
     Assert.assertArrayEquals("Should produce the expected records", mixedResult.get(0), new Object[] {1, "a"});
   }
+
+  @Test
+  public void testParallelismOptimize() {

Review comment:
       nit:  `testParallelismOptimize` -> `testInferedParallelism`

##########
File path: flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
##########
@@ -103,4 +124,45 @@ public void testLimitPushDown() {
     Assert.assertEquals("should have 1 record", 1, mixedResult.size());
     Assert.assertArrayEquals("Should produce the expected records", mixedResult.get(0), new Object[] {1, "a"});
   }
+
+  @Test
+  public void testParallelismOptimize() {
+    sql("INSERT INTO %s  VALUES (1,'hello')", TABLE_NAME);
+    sql("INSERT INTO %s  VALUES (2,'iceberg')", TABLE_NAME);
+
+    TableEnvironment tenv = getTableEnv();
+
+    // empty table ,parallelism at least 1
+    Table tableEmpty = tenv.sqlQuery(String.format("SELECT * FROM %s", TABLE_NAME));

Review comment:
       nit: how about introducing a small method: 
   
   ```java
    private Table sqlQuery(String sql, Object... args) {
       return getTableEnv().sqlQuery(String.format(sql, args));
     }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org