You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/29 09:21:43 UTC

[GitHub] [flink-table-store] JingsongLi opened a new pull request #67: [FLINK-26911] Introduce parallelism setter for table store

JingsongLi opened a new pull request #67:
URL: https://github.com/apache/flink-table-store/pull/67


   Support:
   - scan.parallelism
   - sink.parallelism


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #67: [FLINK-26911] Introduce parallelism setter for table store

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #67:
URL: https://github.com/apache/flink-table-store/pull/67#discussion_r841016680



##########
File path: flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
##########
@@ -1713,4 +1748,74 @@ private static StreamExecutionEnvironment buildBatchEnv() {
         env.setParallelism(2);
         return env;
     }
+
+    @Test
+    public void testSourceParallelism() throws Exception {

Review comment:
       test both default and hint




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] LadyForest commented on a change in pull request #67: [FLINK-26911] Introduce parallelism setter for table store

Posted by GitBox <gi...@apache.org>.
LadyForest commented on a change in pull request #67:
URL: https://github.com/apache/flink-table-store/pull/67#discussion_r841003912



##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
##########
@@ -260,6 +273,11 @@ public SourceBuilder withLogSourceProvider(LogSourceProvider logSourceProvider)
             return this;
         }
 
+        public SourceBuilder withParallelism(Integer parallelism) {

Review comment:
       Add a `@Nullable` annotation here, since `SCAN_PARALLELISM` has no default value.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] LadyForest commented on a change in pull request #67: [FLINK-26911] Introduce parallelism setter for table store

Posted by GitBox <gi...@apache.org>.
LadyForest commented on a change in pull request #67:
URL: https://github.com/apache/flink-table-store/pull/67#discussion_r841005753



##########
File path: flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
##########
@@ -1713,4 +1748,74 @@ private static StreamExecutionEnvironment buildBatchEnv() {
         env.setParallelism(2);
         return env;
     }
+
+    @Test
+    public void testSourceParallelism() throws Exception {
+        String managedTable = createSourceAndManagedTable(false, false, false, false, false).f1;
+
+        // without hint
+        String query = prepareSimpleSelectQuery(managedTable, Collections.emptyMap());
+        assertThat(sourceParallelism(query)).isEqualTo(env.getParallelism());
+
+        // with hint
+        query =
+                prepareSimpleSelectQuery(
+                        managedTable, Collections.singletonMap(SCAN_PARALLELISM.key(), "66"));
+        System.out.println(query);

Review comment:
       remove `System.out.println(query);` 😆




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] LadyForest commented on a change in pull request #67: [FLINK-26911] Introduce parallelism setter for table store

Posted by GitBox <gi...@apache.org>.
LadyForest commented on a change in pull request #67:
URL: https://github.com/apache/flink-table-store/pull/67#discussion_r841004556



##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
##########
@@ -298,17 +316,26 @@ private FileStoreSource buildFileSource(
             }
         }
 
-        public DataStreamSource<RowData> build(StreamExecutionEnvironment env) {
+        public DataStreamSource<RowData> build() {
+            if (env == null) {
+                throw new IllegalArgumentException("Env should not be null.");

Review comment:
       I think we'd better clarify the `Env` as`StreamExecutionEnvironment`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #67: [FLINK-26911] Introduce parallelism setter for table store

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #67:
URL: https://github.com/apache/flink-table-store/pull/67#discussion_r841016700



##########
File path: flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
##########
@@ -1713,4 +1748,74 @@ private static StreamExecutionEnvironment buildBatchEnv() {
         env.setParallelism(2);
         return env;
     }
+
+    @Test
+    public void testSourceParallelism() throws Exception {
+        String managedTable = createSourceAndManagedTable(false, false, false, false, false).f1;
+
+        // without hint
+        String query = prepareSimpleSelectQuery(managedTable, Collections.emptyMap());
+        assertThat(sourceParallelism(query)).isEqualTo(env.getParallelism());
+
+        // with hint
+        query =
+                prepareSimpleSelectQuery(
+                        managedTable, Collections.singletonMap(SCAN_PARALLELISM.key(), "66"));
+        System.out.println(query);
+        assertThat(sourceParallelism(query)).isEqualTo(66);
+    }
+
+    private int sourceParallelism(String sql) {
+        DataStream<Row> stream = tEnv.toChangelogStream(tEnv.sqlQuery(sql));
+        return stream.getParallelism();
+    }
+
+    @Test
+    public void testSinkParallelism() {
+        testSinkParallelism(null, env.getParallelism());
+        testSinkParallelism(23, 23);
+    }
+

Review comment:
       this is for testing both default and hint




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi merged pull request #67: [FLINK-26911] Introduce parallelism setter for table store

Posted by GitBox <gi...@apache.org>.
JingsongLi merged pull request #67:
URL: https://github.com/apache/flink-table-store/pull/67


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] LadyForest commented on a change in pull request #67: [FLINK-26911] Introduce parallelism setter for table store

Posted by GitBox <gi...@apache.org>.
LadyForest commented on a change in pull request #67:
URL: https://github.com/apache/flink-table-store/pull/67#discussion_r841005052



##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
##########
@@ -130,15 +135,29 @@ public DataStructureConverter createDataStructureConverter(
                             },
                             projectFields);
         }
-        TableStore.SourceBuilder builder =
+
+        TableStore.SourceBuilder sourceBuilder =
                 tableStore
                         .sourceBuilder()
                         .withContinuousMode(streaming)
                         .withLogSourceProvider(logSourceProvider)
                         .withProjection(projectFields)
                         .withPartitionPredicate(PredicateConverter.convert(partitionFilters))
-                        .withFieldPredicate(PredicateConverter.convert(fieldFilters));
-        return SourceProvider.of(builder.build());
+                        .withFieldPredicate(PredicateConverter.convert(fieldFilters))
+                        .withParallelism(tableStore.options().get(SCAN_PARALLELISM));
+
+        return new DataStreamScanProvider() {
+            @Override
+            public DataStream<RowData> produceDataStream(
+                    ProviderContext providerContext, StreamExecutionEnvironment env) {
+                return sourceBuilder.withEnv(env).build();
+            }
+
+            @Override
+            public boolean isBounded() {
+                return !streaming;

Review comment:
       I'm not very sure about this. Does the file store still serve as a bounded scan if it is a hybrid reading with a full scan?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] LadyForest commented on a change in pull request #67: [FLINK-26911] Introduce parallelism setter for table store

Posted by GitBox <gi...@apache.org>.
LadyForest commented on a change in pull request #67:
URL: https://github.com/apache/flink-table-store/pull/67#discussion_r841006084



##########
File path: flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
##########
@@ -1713,4 +1748,74 @@ private static StreamExecutionEnvironment buildBatchEnv() {
         env.setParallelism(2);
         return env;
     }
+
+    @Test
+    public void testSourceParallelism() throws Exception {

Review comment:
       `testSetSourceParallelismByHint`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a change in pull request #67: [FLINK-26911] Introduce parallelism setter for table store

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #67:
URL: https://github.com/apache/flink-table-store/pull/67#discussion_r841016341



##########
File path: flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
##########
@@ -130,15 +135,29 @@ public DataStructureConverter createDataStructureConverter(
                             },
                             projectFields);
         }
-        TableStore.SourceBuilder builder =
+
+        TableStore.SourceBuilder sourceBuilder =
                 tableStore
                         .sourceBuilder()
                         .withContinuousMode(streaming)
                         .withLogSourceProvider(logSourceProvider)
                         .withProjection(projectFields)
                         .withPartitionPredicate(PredicateConverter.convert(partitionFilters))
-                        .withFieldPredicate(PredicateConverter.convert(fieldFilters));
-        return SourceProvider.of(builder.build());
+                        .withFieldPredicate(PredicateConverter.convert(fieldFilters))
+                        .withParallelism(tableStore.options().get(SCAN_PARALLELISM));
+
+        return new DataStreamScanProvider() {
+            @Override
+            public DataStream<RowData> produceDataStream(
+                    ProviderContext providerContext, StreamExecutionEnvironment env) {
+                return sourceBuilder.withEnv(env).build();
+            }
+
+            @Override
+            public boolean isBounded() {
+                return !streaming;

Review comment:
       hybrid mode must be streaming.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink-table-store] LadyForest commented on a change in pull request #67: [FLINK-26911] Introduce parallelism setter for table store

Posted by GitBox <gi...@apache.org>.
LadyForest commented on a change in pull request #67:
URL: https://github.com/apache/flink-table-store/pull/67#discussion_r841007101



##########
File path: flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
##########
@@ -1713,4 +1748,74 @@ private static StreamExecutionEnvironment buildBatchEnv() {
         env.setParallelism(2);
         return env;
     }
+
+    @Test
+    public void testSourceParallelism() throws Exception {
+        String managedTable = createSourceAndManagedTable(false, false, false, false, false).f1;
+
+        // without hint
+        String query = prepareSimpleSelectQuery(managedTable, Collections.emptyMap());
+        assertThat(sourceParallelism(query)).isEqualTo(env.getParallelism());
+
+        // with hint
+        query =
+                prepareSimpleSelectQuery(
+                        managedTable, Collections.singletonMap(SCAN_PARALLELISM.key(), "66"));
+        System.out.println(query);
+        assertThat(sourceParallelism(query)).isEqualTo(66);
+    }
+
+    private int sourceParallelism(String sql) {
+        DataStream<Row> stream = tEnv.toChangelogStream(tEnv.sqlQuery(sql));
+        return stream.getParallelism();
+    }
+
+    @Test
+    public void testSinkParallelism() {
+        testSinkParallelism(null, env.getParallelism());
+        testSinkParallelism(23, 23);
+    }
+

Review comment:
       Nit: group test methods and util methods together?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org