You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/26 04:22:38 UTC

[GitHub] [flink] shouweikun opened a new pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

shouweikun opened a new pull request #13789:
URL: https://github.com/apache/flink/pull/13789


   …n blink planner
   
   ## What is the purpose of the change
   
   -  Implement `ParallelismProvider` for sink in blink planner 
   -  `SinkFunctionProvider` and `OutputFormatProvider` work with `ParallelismProvider` by implementing `ParallelismProvider` 
   - Prepare for the future work that make all the classes that implements `SinkRuntimeProvider` currently using in all kind of existing connectors implement `ParallelismProvider` in order to configure the sink parallelism 
   
   
   ## Brief change log
   
   -  `CommonPhysicalSink` : get parallelism from `ParallelismProvider` if possible and configure the sink parallelism into sink transformation after validation 
   -  `SinkFunctionProvider`: implements `ParallelismProvider`, add factory method to work with parameter called **parallelism** to set the specified sink parallelism 
   -  `OutputFormatProvider`: implements `ParallelismProvider`, add factory method to work with parameter called **parallelism** to set the specified sink parallelism 
   -  Add necessary tests and helper class for testing the new code mentioned above 
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     -  Add helper class `InternalDataStreamSinkProviderWithParallelism` for the convenience of testing `DataStreamSinkProvider` working with `ParallelismProvider`
     -  Add new field: `@Nullable` parallelism in `TestValuesTableSink` for the test related to  `ParallelismProvider` in sink 
     - Exception is expected if `DataStreamSinkProvider` work with `ParallelismProvider` 
     -  Test  `SinkFunctionProvider` working with `ParallelismProvider` 
     -   Test  `OutputFormatProvider` working with `ParallelismProvider` 
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / *no*)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (*yes* / no)
     - The serializers: (yes / *no* / don't know)
     - The runtime per-record code paths (performance sensitive): (yes /*no* / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / *no* / don't know)
     - The S3 file system connector: (yes / *no* / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (no)
     - If yes, how is the feature documented? (not applicable)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
shouweikun commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r513165608



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -99,11 +105,33 @@ class CommonPhysicalSink (
 
         val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer)
 
+        val inputParallelism = inputTransformation.getParallelism
+        val taskParallelism = env.getParallelism

Review comment:
       Since we don‘t need to do this according to the conversation below here, it is fine to remove this line.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] shouweikun commented on pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
shouweikun commented on pull request #13789:
URL: https://github.com/apache/flink/pull/13789#issuecomment-717890188


   @flinkbot run azure


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
shouweikun commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r513165407



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -99,11 +105,33 @@ class CommonPhysicalSink (
 
         val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer)
 
+        val inputParallelism = inputTransformation.getParallelism
+        val taskParallelism = env.getParallelism

Review comment:
       Well,  correct me if i am wrong, `getMaxParallelism` is the upper bound of parallelism that the task can apply.
   `getParallelism ` is the actual parallelism that the task applies




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] Aireed edited a comment on pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
Aireed edited a comment on pull request #13789:
URL: https://github.com/apache/flink/pull/13789#issuecomment-1009713407


   @shouweikun hello,   what's the purpose of the check??   A connector which support cdc  must be have same parallelism with source or  have primary keys even if it's not in changelog mode
   ![image](https://user-images.githubusercontent.com/8862395/148908370-f9c8ce7a-0f22-438b-a3a0-d78cc0669fb7.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r513152300



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -79,6 +84,7 @@ class CommonPhysicalSink (
     val enforcer = new SinkNotNullEnforcer(notNullEnforcer, notNullFieldIndices, fieldNames)
 
     runtimeProvider match {
+      case _: DataStreamSinkProvider with ParallelismProvider => throw new RuntimeException("`DataStreamSinkProvider` is not allowed to work with `ParallelismProvider`, please see document of `ParallelismProvider`")

Review comment:
       not to have too long lines




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r513152799



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -99,11 +105,33 @@ class CommonPhysicalSink (
 
         val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer)
 
+        val inputParallelism = inputTransformation.getParallelism
+        val taskParallelism = env.getParallelism

Review comment:
       `env.getMaxParallelism`?

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -99,11 +105,33 @@ class CommonPhysicalSink (
 
         val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer)
 
+        val inputParallelism = inputTransformation.getParallelism
+        val taskParallelism = env.getParallelism
+        val parallelism = if (runtimeProvider.isInstanceOf[ParallelismProvider]) runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism.orElse(inputParallelism).intValue()
+        else inputParallelism
+
+        if (implicitly[Ordering[Int]].lteq(parallelism, 0)) throw new RuntimeException(s"the configured sink parallelism: $parallelism should not be less than zero or equal to zero")
+        if (implicitly[Ordering[Int]].gt(parallelism, taskParallelism)) throw new RuntimeException(s"the configured sink parallelism: $parallelism is larger than the task max parallelism: $taskParallelism")

Review comment:
       We don't need verify `env.getMaxParallelism`, DataStream layer will verify it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] shouweikun commented on pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
shouweikun commented on pull request #13789:
URL: https://github.com/apache/flink/pull/13789#issuecomment-719142137


   @JingsongLi  
   Comments addressed! CI passed. Would u plz review again?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r513153207



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -99,11 +105,33 @@ class CommonPhysicalSink (
 
         val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer)
 
+        val inputParallelism = inputTransformation.getParallelism
+        val taskParallelism = env.getParallelism
+        val parallelism = if (runtimeProvider.isInstanceOf[ParallelismProvider]) runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism.orElse(inputParallelism).intValue()
+        else inputParallelism
+
+        if (implicitly[Ordering[Int]].lteq(parallelism, 0)) throw new RuntimeException(s"the configured sink parallelism: $parallelism should not be less than zero or equal to zero")

Review comment:
       Just `parallelism <= 0`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13789:
URL: https://github.com/apache/flink/pull/13789#issuecomment-716299701


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8265",
       "triggerID" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8487",
       "triggerID" : "717800065",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "934e4f0acf9044de5c6cb73ae25d094396bfc146",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "934e4f0acf9044de5c6cb73ae25d094396bfc146",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8491",
       "triggerID" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8504",
       "triggerID" : "717890188",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a42eba2756d3ebfe1a6a73b58b31c12ff98435fb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8514",
       "triggerID" : "a42eba2756d3ebfe1a6a73b58b31c12ff98435fb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "718400385",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "",
       "status" : "CANCELED",
       "url" : "TBD",
       "triggerID" : "718400385",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 934e4f0acf9044de5c6cb73ae25d094396bfc146 UNKNOWN
   *  Unknown: [CANCELED](TBD) 
   * 5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on pull request #13789:
URL: https://github.com/apache/flink/pull/13789#issuecomment-720883830






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r513152977



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -99,11 +105,33 @@ class CommonPhysicalSink (
 
         val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer)
 
+        val inputParallelism = inputTransformation.getParallelism
+        val taskParallelism = env.getParallelism
+        val parallelism = if (runtimeProvider.isInstanceOf[ParallelismProvider]) runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism.orElse(inputParallelism).intValue()

Review comment:
       Remove `isInstanceOf`, it must be




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13789:
URL: https://github.com/apache/flink/pull/13789#issuecomment-716299701


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8265",
       "triggerID" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8487",
       "triggerID" : "717800065",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "934e4f0acf9044de5c6cb73ae25d094396bfc146",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "934e4f0acf9044de5c6cb73ae25d094396bfc146",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8491",
       "triggerID" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8504",
       "triggerID" : "717890188",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a42eba2756d3ebfe1a6a73b58b31c12ff98435fb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8514",
       "triggerID" : "a42eba2756d3ebfe1a6a73b58b31c12ff98435fb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8570",
       "triggerID" : "718400385",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8570",
       "triggerID" : "5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8cca8ea8baa8f718138404c4a605f85f9c323723",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8cca8ea8baa8f718138404c4a605f85f9c323723",
       "triggerType" : "PUSH"
     }, {
       "hash" : "eb7196c290cec28c792cb1d95089bf0f92b8f30f",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8580",
       "triggerID" : "eb7196c290cec28c792cb1d95089bf0f92b8f30f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "718400385",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "eb7196c290cec28c792cb1d95089bf0f92b8f30f",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8609",
       "triggerID" : "718759841",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "eb7196c290cec28c792cb1d95089bf0f92b8f30f",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8580",
       "triggerID" : "718759841",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 934e4f0acf9044de5c6cb73ae25d094396bfc146 UNKNOWN
   * 8cca8ea8baa8f718138404c4a605f85f9c323723 UNKNOWN
   * eb7196c290cec28c792cb1d95089bf0f92b8f30f Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8609) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8580) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13789:
URL: https://github.com/apache/flink/pull/13789#issuecomment-716299701


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6329930d7acc5d2d4d84e777561e79bd25ac9367 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
shouweikun commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r513164185



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -99,11 +105,33 @@ class CommonPhysicalSink (
 
         val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer)
 
+        val inputParallelism = inputTransformation.getParallelism
+        val taskParallelism = env.getParallelism
+        val parallelism = if (runtimeProvider.isInstanceOf[ParallelismProvider]) runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism.orElse(inputParallelism).intValue()

Review comment:
       My concern is that once a NEW provider without `parallelismProvider` is used, exception will be thrown unless we do not check the type 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13789:
URL: https://github.com/apache/flink/pull/13789#issuecomment-716299701


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8265",
       "triggerID" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8487",
       "triggerID" : "717800065",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 6329930d7acc5d2d4d84e777561e79bd25ac9367 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8265) Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8487) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13789:
URL: https://github.com/apache/flink/pull/13789#issuecomment-716299701


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8265",
       "triggerID" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8487",
       "triggerID" : "717800065",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "934e4f0acf9044de5c6cb73ae25d094396bfc146",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "934e4f0acf9044de5c6cb73ae25d094396bfc146",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8491",
       "triggerID" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8504",
       "triggerID" : "717890188",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a42eba2756d3ebfe1a6a73b58b31c12ff98435fb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8514",
       "triggerID" : "a42eba2756d3ebfe1a6a73b58b31c12ff98435fb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8570",
       "triggerID" : "718400385",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8570",
       "triggerID" : "5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8cca8ea8baa8f718138404c4a605f85f9c323723",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8cca8ea8baa8f718138404c4a605f85f9c323723",
       "triggerType" : "PUSH"
     }, {
       "hash" : "eb7196c290cec28c792cb1d95089bf0f92b8f30f",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8580",
       "triggerID" : "eb7196c290cec28c792cb1d95089bf0f92b8f30f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "718400385",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "eb7196c290cec28c792cb1d95089bf0f92b8f30f",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8609",
       "triggerID" : "718759841",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 934e4f0acf9044de5c6cb73ae25d094396bfc146 UNKNOWN
   * 8cca8ea8baa8f718138404c4a605f85f9c323723 UNKNOWN
   * eb7196c290cec28c792cb1d95089bf0f92b8f30f Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8580) Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8609) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r513152137



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -79,6 +84,7 @@ class CommonPhysicalSink (
     val enforcer = new SinkNotNullEnforcer(notNullEnforcer, notNullFieldIndices, fieldNames)
 
     runtimeProvider match {
+      case _: DataStreamSinkProvider with ParallelismProvider => throw new RuntimeException("`DataStreamSinkProvider` is not allowed to work with `ParallelismProvider`, please see document of `ParallelismProvider`")

Review comment:
       `TableException`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r513151741



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/OutputFormatProvider.java
##########
@@ -20,19 +20,39 @@
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.table.connector.ParallelismProvider;
 import org.apache.flink.table.data.RowData;
 
+import java.util.Optional;
+
 /**
  * Provider of an {@link OutputFormat} instance as a runtime implementation for {@link DynamicTableSink}.
  */
 @PublicEvolving
-public interface OutputFormatProvider extends DynamicTableSink.SinkRuntimeProvider {
+public interface OutputFormatProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider{
 
 	/**
 	 * Helper method for creating a static provider.
 	 */
 	static OutputFormatProvider of(OutputFormat<RowData> outputFormat) {
-		return () -> outputFormat;
+		return of(outputFormat, Optional.empty());
+	}
+
+	/**
+	 * Helper method for creating a static provider, sink parallelism will be configured if non-empty parallelism is passed in.
+	 */
+	static OutputFormatProvider of(OutputFormat<RowData> outputFormat, Optional<Integer> parallelism) {

Review comment:
       ditto




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
shouweikun commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r513167212



##########
File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/sink/SinkFunctionProvider.java
##########
@@ -20,19 +20,39 @@
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.connector.ParallelismProvider;
 import org.apache.flink.table.data.RowData;
 
+import java.util.Optional;
+
 /**
  * Provider of a {@link SinkFunction} instance as a runtime implementation for {@link DynamicTableSink}.
  */
 @PublicEvolving
-public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider {
+public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider {
 
 	/**
 	 * Helper method for creating a static provider.
 	 */
 	static SinkFunctionProvider of(SinkFunction<RowData> sinkFunction) {
-		return () -> sinkFunction;
+		return of(sinkFunction, Optional.empty());
+	}
+
+	/**
+	 * Helper method for creating a static provider, sink parallelism will be configured if non-empty parallelism is passed in.
+	 */
+	static SinkFunctionProvider of(SinkFunction<RowData> sinkFunction, Optional<Integer> parallelism) {

Review comment:
       @JingsongLi 
   **it is recommended to use the Optional only in method return values **
   
   copy that.
   
   ** I think we don't need provide method.**
   Well, since `SinkFunctionProvider ` implements `ParallelismProvider` as default, as far as I’m concerned, there should be a method passing the parallelism in. Or Is there an better alternative? It‘s kind of u to tell me that~




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13789:
URL: https://github.com/apache/flink/pull/13789#issuecomment-716299701






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] Aireed commented on pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
Aireed commented on pull request #13789:
URL: https://github.com/apache/flink/pull/13789#issuecomment-1009713407


   @shouweikun hello,   what's the purpose of the check??


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13789:
URL: https://github.com/apache/flink/pull/13789#issuecomment-716299701


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8265",
       "triggerID" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8487",
       "triggerID" : "717800065",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "934e4f0acf9044de5c6cb73ae25d094396bfc146",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "934e4f0acf9044de5c6cb73ae25d094396bfc146",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8491",
       "triggerID" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8504",
       "triggerID" : "717890188",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 934e4f0acf9044de5c6cb73ae25d094396bfc146 UNKNOWN
   * fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8491) Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8504) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r513153908



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -99,11 +105,33 @@ class CommonPhysicalSink (
 
         val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer)
 
+        val inputParallelism = inputTransformation.getParallelism
+        val taskParallelism = env.getParallelism
+        val parallelism = if (runtimeProvider.isInstanceOf[ParallelismProvider]) runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism.orElse(inputParallelism).intValue()
+        else inputParallelism
+
+        if (implicitly[Ordering[Int]].lteq(parallelism, 0)) throw new RuntimeException(s"the configured sink parallelism: $parallelism should not be less than zero or equal to zero")
+        if (implicitly[Ordering[Int]].gt(parallelism, taskParallelism)) throw new RuntimeException(s"the configured sink parallelism: $parallelism is larger than the task max parallelism: $taskParallelism")
+
+        val primaryKeys = TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema)
+        val containedRowKinds = changelogMode.getContainedKinds.toSet
+        val theFinalInputTransformation = if(inputParallelism == parallelism) inputTransformation //if the parallelism is not changed, do nothing
+        else (containedRowKinds, primaryKeys.toList) match {
+        // fixme : if rowKinds only contains  delete, is there somethinng to do with? Currently do nothing.

Review comment:
       Yes, you can just `changelogMode  == ChangelogMode.INSERT_ONLY` (scala, java should use equals), and do keyBy.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r514019228



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -99,11 +111,63 @@ class CommonPhysicalSink (
 
         val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer)
 
+        assert(runtimeProvider.isInstanceOf[ParallelismProvider],
+              "runtimeProvider with `ParallelismProvider` implementation is required")
+
+        val inputParallelism = inputTransformation.getParallelism
+        val parallelism =  {
+          val parallelismOptional = runtimeProvider
+            .asInstanceOf[ParallelismProvider].getParallelism
+          if(parallelismOptional.isPresent) {
+            val parallelismPassedIn = parallelismOptional.get().intValue()
+            if(parallelismPassedIn <= 0) {
+              throw new TableException(
+                s"Table: $tableIdentifier configured sink parallelism: $parallelismPassedIn " +
+                  "should not be less than zero or equal to zero")
+            }
+            parallelismPassedIn
+          } else inputParallelism
+        }
+
+        val primaryKeys = TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema)
+        val theFinalInputTransformation =
+          (inputParallelism == parallelism,changelogMode, primaryKeys.toList) match {
+           // if the inputParallelism equals parallelism, do nothing.
+          case (true, _, _) => inputTransformation
+          case (_, _, _) if (changelogMode.containsOnly(RowKind.INSERT)) => inputTransformation
+          case (_, _, Nil) =>
+            throw new TableException(
+            s"Table: $tableIdentifier configured sink parallelism is: $parallelism, " +
+            s"while the input parallelism is: $inputParallelism. " +
+            s"Since the changelog mode " +
+            s"contains [${changelogMode.getContainedKinds.toList.mkString(",")}], " +
+            s"which is not INSERT_ONLY mode, " +
+            s"primary key is required but no primary key is found"
+          )
+          case (_, _, pks) =>
+            //key by before sink
+            //according to [[StreamExecExchange]]
+            val selector = KeySelectorUtil.getRowDataSelector(
+              pks.toArray, inputTypeInfo)
+            // in case of maxParallelism is negative
+            val keyGroupNum = env.getMaxParallelism match {

Review comment:
       Yes




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r513954139



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/OutputFormatProvider.java
##########
@@ -20,13 +20,15 @@
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.table.connector.ParallelismProvider;
 import org.apache.flink.table.data.RowData;
 
 /**
  * Provider of an {@link OutputFormat} instance as a runtime implementation for {@link DynamicTableSink}.
  */
 @PublicEvolving
-public interface OutputFormatProvider extends DynamicTableSink.SinkRuntimeProvider {
+public interface OutputFormatProvider

Review comment:
       Code format: It is ok to include in one line

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -79,6 +87,10 @@ class CommonPhysicalSink (
     val enforcer = new SinkNotNullEnforcer(notNullEnforcer, notNullFieldIndices, fieldNames)
 
     runtimeProvider match {
+      case _: DataStreamSinkProvider with ParallelismProvider => throw new TableException(

Review comment:
       Code format: Better to break line when `throw new Table...`

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -99,11 +111,63 @@ class CommonPhysicalSink (
 
         val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer)
 
+        assert(runtimeProvider.isInstanceOf[ParallelismProvider],
+              "runtimeProvider with `ParallelismProvider` implementation is required")
+
+        val inputParallelism = inputTransformation.getParallelism
+        val parallelism =  {
+          val parallelismOptional = runtimeProvider
+            .asInstanceOf[ParallelismProvider].getParallelism
+          if(parallelismOptional.isPresent) {
+            val parallelismPassedIn = parallelismOptional.get().intValue()
+            if(parallelismPassedIn <= 0) {
+              throw new TableException(
+                s"Table: $tableIdentifier configured sink parallelism: $parallelismPassedIn " +
+                  "should not be less than zero or equal to zero")
+            }
+            parallelismPassedIn
+          } else inputParallelism
+        }
+
+        val primaryKeys = TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema)
+        val theFinalInputTransformation =
+          (inputParallelism == parallelism,changelogMode, primaryKeys.toList) match {
+           // if the inputParallelism equals parallelism, do nothing.
+          case (true, _, _) => inputTransformation
+          case (_, _, _) if (changelogMode.containsOnly(RowKind.INSERT)) => inputTransformation
+          case (_, _, Nil) =>
+            throw new TableException(
+            s"Table: $tableIdentifier configured sink parallelism is: $parallelism, " +
+            s"while the input parallelism is: $inputParallelism. " +
+            s"Since the changelog mode " +
+            s"contains [${changelogMode.getContainedKinds.toList.mkString(",")}], " +
+            s"which is not INSERT_ONLY mode, " +
+            s"primary key is required but no primary key is found"
+          )
+          case (_, _, pks) =>
+            //key by before sink
+            //according to [[StreamExecExchange]]
+            val selector = KeySelectorUtil.getRowDataSelector(
+              pks.toArray, inputTypeInfo)
+            // in case of maxParallelism is negative
+            val keyGroupNum = env.getMaxParallelism match {

Review comment:
       Why need to check `env.getMaxParallelism`?

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -99,11 +111,63 @@ class CommonPhysicalSink (
 
         val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer)
 
+        assert(runtimeProvider.isInstanceOf[ParallelismProvider],
+              "runtimeProvider with `ParallelismProvider` implementation is required")
+
+        val inputParallelism = inputTransformation.getParallelism
+        val parallelism =  {
+          val parallelismOptional = runtimeProvider
+            .asInstanceOf[ParallelismProvider].getParallelism
+          if(parallelismOptional.isPresent) {
+            val parallelismPassedIn = parallelismOptional.get().intValue()
+            if(parallelismPassedIn <= 0) {
+              throw new TableException(
+                s"Table: $tableIdentifier configured sink parallelism: $parallelismPassedIn " +
+                  "should not be less than zero or equal to zero")
+            }
+            parallelismPassedIn
+          } else inputParallelism
+        }
+
+        val primaryKeys = TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema)
+        val theFinalInputTransformation =
+          (inputParallelism == parallelism,changelogMode, primaryKeys.toList) match {
+           // if the inputParallelism equals parallelism, do nothing.
+          case (true, _, _) => inputTransformation
+          case (_, _, _) if (changelogMode.containsOnly(RowKind.INSERT)) => inputTransformation
+          case (_, _, Nil) =>
+            throw new TableException(
+            s"Table: $tableIdentifier configured sink parallelism is: $parallelism, " +
+            s"while the input parallelism is: $inputParallelism. " +
+            s"Since the changelog mode " +
+            s"contains [${changelogMode.getContainedKinds.toList.mkString(",")}], " +
+            s"which is not INSERT_ONLY mode, " +
+            s"primary key is required but no primary key is found"
+          )
+          case (_, _, pks) =>
+            //key by before sink
+            //according to [[StreamExecExchange]]
+            val selector = KeySelectorUtil.getRowDataSelector(
+              pks.toArray, inputTypeInfo)
+            // in case of maxParallelism is negative
+            val keyGroupNum = env.getMaxParallelism match {
+              case -1 => env.getParallelism
+              case x if(x > 0) => env.getMaxParallelism
+              case _ => DEFAULT_LOWER_BOUND_MAX_PARALLELISM
+            }
+            val partitioner = new KeyGroupStreamPartitioner(selector,keyGroupNum)

Review comment:
       `selector,keyGroupNum` => `selector, keyGroupNum`

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -79,6 +87,10 @@ class CommonPhysicalSink (
     val enforcer = new SinkNotNullEnforcer(notNullEnforcer, notNullFieldIndices, fieldNames)
 
     runtimeProvider match {
+      case _: DataStreamSinkProvider with ParallelismProvider => throw new TableException(
+        "`DataStreamSinkProvider` is not allowed to " +

Review comment:
       Code format: Better to fill one line (max 100 chars)

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -99,11 +111,63 @@ class CommonPhysicalSink (
 
         val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer)
 
+        assert(runtimeProvider.isInstanceOf[ParallelismProvider],
+              "runtimeProvider with `ParallelismProvider` implementation is required")
+
+        val inputParallelism = inputTransformation.getParallelism
+        val parallelism =  {
+          val parallelismOptional = runtimeProvider
+            .asInstanceOf[ParallelismProvider].getParallelism

Review comment:
       Code format:Not break line

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -99,11 +111,63 @@ class CommonPhysicalSink (
 
         val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer)
 
+        assert(runtimeProvider.isInstanceOf[ParallelismProvider],
+              "runtimeProvider with `ParallelismProvider` implementation is required")
+
+        val inputParallelism = inputTransformation.getParallelism
+        val parallelism =  {
+          val parallelismOptional = runtimeProvider
+            .asInstanceOf[ParallelismProvider].getParallelism
+          if(parallelismOptional.isPresent) {
+            val parallelismPassedIn = parallelismOptional.get().intValue()
+            if(parallelismPassedIn <= 0) {
+              throw new TableException(
+                s"Table: $tableIdentifier configured sink parallelism: $parallelismPassedIn " +
+                  "should not be less than zero or equal to zero")
+            }
+            parallelismPassedIn
+          } else inputParallelism

Review comment:
       Code format: 
   ```
   else {
     inputParallelism
   }
   ```

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -99,11 +111,63 @@ class CommonPhysicalSink (
 
         val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer)
 
+        assert(runtimeProvider.isInstanceOf[ParallelismProvider],
+              "runtimeProvider with `ParallelismProvider` implementation is required")
+
+        val inputParallelism = inputTransformation.getParallelism
+        val parallelism =  {
+          val parallelismOptional = runtimeProvider
+            .asInstanceOf[ParallelismProvider].getParallelism
+          if(parallelismOptional.isPresent) {
+            val parallelismPassedIn = parallelismOptional.get().intValue()
+            if(parallelismPassedIn <= 0) {
+              throw new TableException(
+                s"Table: $tableIdentifier configured sink parallelism: $parallelismPassedIn " +
+                  "should not be less than zero or equal to zero")
+            }
+            parallelismPassedIn
+          } else inputParallelism
+        }
+
+        val primaryKeys = TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema)
+        val theFinalInputTransformation =
+          (inputParallelism == parallelism,changelogMode, primaryKeys.toList) match {
+           // if the inputParallelism equals parallelism, do nothing.
+          case (true, _, _) => inputTransformation
+          case (_, _, _) if (changelogMode.containsOnly(RowKind.INSERT)) => inputTransformation
+          case (_, _, Nil) =>
+            throw new TableException(
+            s"Table: $tableIdentifier configured sink parallelism is: $parallelism, " +
+            s"while the input parallelism is: $inputParallelism. " +

Review comment:
       Code format: Better to fill one line (max 100 chars)

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -99,11 +111,63 @@ class CommonPhysicalSink (
 
         val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer)
 
+        assert(runtimeProvider.isInstanceOf[ParallelismProvider],
+              "runtimeProvider with `ParallelismProvider` implementation is required")
+
+        val inputParallelism = inputTransformation.getParallelism
+        val parallelism =  {
+          val parallelismOptional = runtimeProvider
+            .asInstanceOf[ParallelismProvider].getParallelism
+          if(parallelismOptional.isPresent) {
+            val parallelismPassedIn = parallelismOptional.get().intValue()
+            if(parallelismPassedIn <= 0) {
+              throw new TableException(
+                s"Table: $tableIdentifier configured sink parallelism: $parallelismPassedIn " +
+                  "should not be less than zero or equal to zero")
+            }
+            parallelismPassedIn
+          } else inputParallelism
+        }
+
+        val primaryKeys = TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema)
+        val theFinalInputTransformation =
+          (inputParallelism == parallelism,changelogMode, primaryKeys.toList) match {
+           // if the inputParallelism equals parallelism, do nothing.
+          case (true, _, _) => inputTransformation
+          case (_, _, _) if (changelogMode.containsOnly(RowKind.INSERT)) => inputTransformation
+          case (_, _, Nil) =>
+            throw new TableException(
+            s"Table: $tableIdentifier configured sink parallelism is: $parallelism, " +
+            s"while the input parallelism is: $inputParallelism. " +
+            s"Since the changelog mode " +
+            s"contains [${changelogMode.getContainedKinds.toList.mkString(",")}], " +
+            s"which is not INSERT_ONLY mode, " +
+            s"primary key is required but no primary key is found"
+          )
+          case (_, _, pks) =>
+            //key by before sink
+            //according to [[StreamExecExchange]]
+            val selector = KeySelectorUtil.getRowDataSelector(
+              pks.toArray, inputTypeInfo)

Review comment:
       Not break line

##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -99,11 +111,63 @@ class CommonPhysicalSink (
 
         val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer)
 
+        assert(runtimeProvider.isInstanceOf[ParallelismProvider],
+              "runtimeProvider with `ParallelismProvider` implementation is required")
+
+        val inputParallelism = inputTransformation.getParallelism
+        val parallelism =  {
+          val parallelismOptional = runtimeProvider
+            .asInstanceOf[ParallelismProvider].getParallelism
+          if(parallelismOptional.isPresent) {
+            val parallelismPassedIn = parallelismOptional.get().intValue()
+            if(parallelismPassedIn <= 0) {
+              throw new TableException(
+                s"Table: $tableIdentifier configured sink parallelism: $parallelismPassedIn " +
+                  "should not be less than zero or equal to zero")
+            }
+            parallelismPassedIn
+          } else inputParallelism
+        }
+
+        val primaryKeys = TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema)
+        val theFinalInputTransformation =
+          (inputParallelism == parallelism,changelogMode, primaryKeys.toList) match {

Review comment:
       I think it is better to use `if else` here.

##########
File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/sink/SinkFunctionProvider.java
##########
@@ -20,13 +20,15 @@
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.connector.ParallelismProvider;
 import org.apache.flink.table.data.RowData;
 
 /**
  * Provider of a {@link SinkFunction} instance as a runtime implementation for {@link DynamicTableSink}.
  */
 @PublicEvolving
-public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider {
+public interface SinkFunctionProvider

Review comment:
       Code format: It is ok to include in one line




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
shouweikun commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r511713970



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -99,11 +105,33 @@ class CommonPhysicalSink (
 
         val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer)
 
+        val inputParallelism = inputTransformation.getParallelism
+        val taskParallelism = env.getParallelism
+        val parallelism = if (runtimeProvider.isInstanceOf[ParallelismProvider]) runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism.orElse(inputParallelism).intValue()
+        else inputParallelism
+
+        if (implicitly[Ordering[Int]].lteq(parallelism, 0)) throw new RuntimeException(s"the configured sink parallelism: $parallelism should not be less than zero or equal to zero")
+        if (implicitly[Ordering[Int]].gt(parallelism, taskParallelism)) throw new RuntimeException(s"the configured sink parallelism: $parallelism is larger than the task max parallelism: $taskParallelism")
+
+        val primaryKeys = TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema)
+        val containedRowKinds = changelogMode.getContainedKinds.toSet
+        val theFinalInputTransformation = if(inputParallelism == parallelism) inputTransformation //if the parallelism is not changed, do nothing
+        else (containedRowKinds, primaryKeys.toList) match {
+        // fixme : if rowKinds only contains  delete, is there somethinng to do with? Currently do nothing.

Review comment:
       Actually, this part should be under well discussed. 
   For every changelog mode, from my perspective, should be treated differently. Now I only figured out what should be done in `UPSERT MODE` or `INSERT_ONLY` mode.
   
   What shall we do for other changelog modes?
   WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13789:
URL: https://github.com/apache/flink/pull/13789#issuecomment-716299701


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8265",
       "triggerID" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8487",
       "triggerID" : "717800065",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "934e4f0acf9044de5c6cb73ae25d094396bfc146",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "934e4f0acf9044de5c6cb73ae25d094396bfc146",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6329930d7acc5d2d4d84e777561e79bd25ac9367 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8265) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8487) 
   * 934e4f0acf9044de5c6cb73ae25d094396bfc146 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13789:
URL: https://github.com/apache/flink/pull/13789#issuecomment-716299701


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8265",
       "triggerID" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8487",
       "triggerID" : "717800065",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "934e4f0acf9044de5c6cb73ae25d094396bfc146",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "934e4f0acf9044de5c6cb73ae25d094396bfc146",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8491",
       "triggerID" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8504",
       "triggerID" : "717890188",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a42eba2756d3ebfe1a6a73b58b31c12ff98435fb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8514",
       "triggerID" : "a42eba2756d3ebfe1a6a73b58b31c12ff98435fb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "CANCELED",
       "url" : "TBD",
       "triggerID" : "718400385",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8570",
       "triggerID" : "718400385",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8570",
       "triggerID" : "5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8cca8ea8baa8f718138404c4a605f85f9c323723",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8cca8ea8baa8f718138404c4a605f85f9c323723",
       "triggerType" : "PUSH"
     }, {
       "hash" : "eb7196c290cec28c792cb1d95089bf0f92b8f30f",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8580",
       "triggerID" : "eb7196c290cec28c792cb1d95089bf0f92b8f30f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 934e4f0acf9044de5c6cb73ae25d094396bfc146 UNKNOWN
   *  Unknown: [CANCELED](TBD) 
   * 5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8570) 
   * 8cca8ea8baa8f718138404c4a605f85f9c323723 UNKNOWN
   * eb7196c290cec28c792cb1d95089bf0f92b8f30f Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8580) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r513169206



##########
File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/sink/SinkFunctionProvider.java
##########
@@ -20,19 +20,39 @@
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.connector.ParallelismProvider;
 import org.apache.flink.table.data.RowData;
 
+import java.util.Optional;
+
 /**
  * Provider of a {@link SinkFunction} instance as a runtime implementation for {@link DynamicTableSink}.
  */
 @PublicEvolving
-public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider {
+public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider {
 
 	/**
 	 * Helper method for creating a static provider.
 	 */
 	static SinkFunctionProvider of(SinkFunction<RowData> sinkFunction) {
-		return () -> sinkFunction;
+		return of(sinkFunction, Optional.empty());
+	}
+
+	/**
+	 * Helper method for creating a static provider, sink parallelism will be configured if non-empty parallelism is passed in.
+	 */
+	static SinkFunctionProvider of(SinkFunction<RowData> sinkFunction, Optional<Integer> parallelism) {

Review comment:
       For the user which want to set parallelism, I think it is OK to let them create an implementation class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
shouweikun commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r511713970



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -99,11 +105,33 @@ class CommonPhysicalSink (
 
         val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer)
 
+        val inputParallelism = inputTransformation.getParallelism
+        val taskParallelism = env.getParallelism
+        val parallelism = if (runtimeProvider.isInstanceOf[ParallelismProvider]) runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism.orElse(inputParallelism).intValue()
+        else inputParallelism
+
+        if (implicitly[Ordering[Int]].lteq(parallelism, 0)) throw new RuntimeException(s"the configured sink parallelism: $parallelism should not be less than zero or equal to zero")
+        if (implicitly[Ordering[Int]].gt(parallelism, taskParallelism)) throw new RuntimeException(s"the configured sink parallelism: $parallelism is larger than the task max parallelism: $taskParallelism")
+
+        val primaryKeys = TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema)
+        val containedRowKinds = changelogMode.getContainedKinds.toSet
+        val theFinalInputTransformation = if(inputParallelism == parallelism) inputTransformation //if the parallelism is not changed, do nothing
+        else (containedRowKinds, primaryKeys.toList) match {
+        // fixme : if rowKinds only contains  delete, is there somethinng to do with? Currently do nothing.

Review comment:
       Actually, this part should be under well discussed. 
   For every changelog mode, from my perspective, should be treated differently. Now I only figured out what should be done in `UPSERT MODE` or `INSERT_ONLY` mode.
   
   What shall we do for other changelog mode?
   WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] shouweikun commented on pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
shouweikun commented on pull request #13789:
URL: https://github.com/apache/flink/pull/13789#issuecomment-720895918


   @JingsongLi  Thank u for your patience~
   
   - can you create JIRAs for the sink parallelism support of connectors?
   Sure, my pleasure~


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] shouweikun commented on pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
shouweikun commented on pull request #13789:
URL: https://github.com/apache/flink/pull/13789#issuecomment-717805408


   @JingsongLi 
   Hi, comments addressed! Would u plz review the new changes?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r513168676



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -99,11 +105,33 @@ class CommonPhysicalSink (
 
         val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer)
 
+        val inputParallelism = inputTransformation.getParallelism
+        val taskParallelism = env.getParallelism
+        val parallelism = if (runtimeProvider.isInstanceOf[ParallelismProvider]) runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism.orElse(inputParallelism).intValue()

Review comment:
       Add a scala `assert` is enough.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r514053066



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -99,11 +110,57 @@ class CommonPhysicalSink (
 
         val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer)
 
+        assert(runtimeProvider.isInstanceOf[ParallelismProvider],
+          "runtimeProvider with `ParallelismProvider` implementation is required")
+
+        val inputParallelism = inputTransformation.getParallelism
+        val parallelism =  {
+          val parallelismOptional = runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism
+          if(parallelismOptional.isPresent) {

Review comment:
       NOTE, code style should be `if (`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
shouweikun commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r511713970



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -99,11 +105,33 @@ class CommonPhysicalSink (
 
         val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer)
 
+        val inputParallelism = inputTransformation.getParallelism
+        val taskParallelism = env.getParallelism
+        val parallelism = if (runtimeProvider.isInstanceOf[ParallelismProvider]) runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism.orElse(inputParallelism).intValue()
+        else inputParallelism
+
+        if (implicitly[Ordering[Int]].lteq(parallelism, 0)) throw new RuntimeException(s"the configured sink parallelism: $parallelism should not be less than zero or equal to zero")
+        if (implicitly[Ordering[Int]].gt(parallelism, taskParallelism)) throw new RuntimeException(s"the configured sink parallelism: $parallelism is larger than the task max parallelism: $taskParallelism")
+
+        val primaryKeys = TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema)
+        val containedRowKinds = changelogMode.getContainedKinds.toSet
+        val theFinalInputTransformation = if(inputParallelism == parallelism) inputTransformation //if the parallelism is not changed, do nothing
+        else (containedRowKinds, primaryKeys.toList) match {
+        // fixme : if rowKinds only contains  delete, is there somethinng to do with? Currently do nothing.

Review comment:
       Actually, this part should be under well discussed. 
   For every changelog mode, from my perspective, should be treated differently. Now I only figured out what should be done in "UPSERT MODE" or `INSERT_ONLY` mode.
   
   What shall we do for other changelog?
   WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] shouweikun commented on pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
shouweikun commented on pull request #13789:
URL: https://github.com/apache/flink/pull/13789#issuecomment-718759841


   @flinkbot run azure


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on pull request #13789:
URL: https://github.com/apache/flink/pull/13789#issuecomment-718319057


   Thanks for the update, I will take a look these two days.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13789:
URL: https://github.com/apache/flink/pull/13789#issuecomment-716299701


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8265",
       "triggerID" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8487",
       "triggerID" : "717800065",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "934e4f0acf9044de5c6cb73ae25d094396bfc146",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "934e4f0acf9044de5c6cb73ae25d094396bfc146",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8491",
       "triggerID" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8504",
       "triggerID" : "717890188",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a42eba2756d3ebfe1a6a73b58b31c12ff98435fb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8514",
       "triggerID" : "a42eba2756d3ebfe1a6a73b58b31c12ff98435fb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "718400385",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "",
       "status" : "CANCELED",
       "url" : "TBD",
       "triggerID" : "718400385",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 934e4f0acf9044de5c6cb73ae25d094396bfc146 UNKNOWN
   * 5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b UNKNOWN
   *  Unknown: [CANCELED](TBD) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13789:
URL: https://github.com/apache/flink/pull/13789#issuecomment-716299701


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8265",
       "triggerID" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8487",
       "triggerID" : "717800065",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "934e4f0acf9044de5c6cb73ae25d094396bfc146",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "934e4f0acf9044de5c6cb73ae25d094396bfc146",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8491",
       "triggerID" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 934e4f0acf9044de5c6cb73ae25d094396bfc146 UNKNOWN
   * fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8491) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] shouweikun commented on pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
shouweikun commented on pull request #13789:
URL: https://github.com/apache/flink/pull/13789#issuecomment-718400385


   @flinkbot run azure


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13789:
URL: https://github.com/apache/flink/pull/13789#issuecomment-716299701


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8265",
       "triggerID" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8487",
       "triggerID" : "717800065",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "934e4f0acf9044de5c6cb73ae25d094396bfc146",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "934e4f0acf9044de5c6cb73ae25d094396bfc146",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8491",
       "triggerID" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8504",
       "triggerID" : "717890188",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a42eba2756d3ebfe1a6a73b58b31c12ff98435fb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8514",
       "triggerID" : "a42eba2756d3ebfe1a6a73b58b31c12ff98435fb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "CANCELED",
       "url" : "TBD",
       "triggerID" : "718400385",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8570",
       "triggerID" : "718400385",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8570",
       "triggerID" : "5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8cca8ea8baa8f718138404c4a605f85f9c323723",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8cca8ea8baa8f718138404c4a605f85f9c323723",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 934e4f0acf9044de5c6cb73ae25d094396bfc146 UNKNOWN
   *  Unknown: [CANCELED](TBD) 
   * 5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8570) 
   * 8cca8ea8baa8f718138404c4a605f85f9c323723 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
shouweikun commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r513163662



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -99,11 +105,33 @@ class CommonPhysicalSink (
 
         val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer)
 
+        val inputParallelism = inputTransformation.getParallelism
+        val taskParallelism = env.getParallelism
+        val parallelism = if (runtimeProvider.isInstanceOf[ParallelismProvider]) runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism.orElse(inputParallelism).intValue()
+        else inputParallelism
+
+        if (implicitly[Ordering[Int]].lteq(parallelism, 0)) throw new RuntimeException(s"the configured sink parallelism: $parallelism should not be less than zero or equal to zero")

Review comment:
       Sure, btw should `TableException` also be thrown here instead of `RuntimeException`?
   @JingsongLi 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r514077863



##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -386,9 +394,11 @@ public DynamicTableSink createDynamicTableSink(Context context) {
 		boolean isInsertOnly = helper.getOptions().get(SINK_INSERT_ONLY);
 		String runtimeSink = helper.getOptions().get(RUNTIME_SINK);
 		int expectedNum = helper.getOptions().get(SINK_EXPECTED_MESSAGES_NUM);
+		Integer parallelism = helper.getOptions().get(SINK_PARALLELISM);
 		final Map<String, DataType> writableMetadata = convertToMetadataMap(
 			helper.getOptions().get(WRITABLE_METADATA),
 			context.getClassLoader());
+		ChangelogMode changelogMode = Optional.ofNullable(helper.getOptions().get(SINK_CHANGELOG_MODE_ENFORCED)).map(m -> parseChangelogMode(m)).orElse(null);

Review comment:
       This line is too long, you can break it.

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -929,10 +943,13 @@ public String asSummaryString() {
 		private DataType consumedDataType;
 		private int[] primaryKeyIndices;
 		private final String tableName;
-		private final boolean isInsertOnly;
+		private final boolean
+			isInsertOnly;

Review comment:
       Don't break line




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13789:
URL: https://github.com/apache/flink/pull/13789#issuecomment-716299701


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8265",
       "triggerID" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8487",
       "triggerID" : "717800065",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "934e4f0acf9044de5c6cb73ae25d094396bfc146",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "934e4f0acf9044de5c6cb73ae25d094396bfc146",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8491",
       "triggerID" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8504",
       "triggerID" : "717890188",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a42eba2756d3ebfe1a6a73b58b31c12ff98435fb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8514",
       "triggerID" : "a42eba2756d3ebfe1a6a73b58b31c12ff98435fb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8570",
       "triggerID" : "718400385",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8570",
       "triggerID" : "5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8cca8ea8baa8f718138404c4a605f85f9c323723",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8cca8ea8baa8f718138404c4a605f85f9c323723",
       "triggerType" : "PUSH"
     }, {
       "hash" : "eb7196c290cec28c792cb1d95089bf0f92b8f30f",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8580",
       "triggerID" : "eb7196c290cec28c792cb1d95089bf0f92b8f30f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "718400385",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 934e4f0acf9044de5c6cb73ae25d094396bfc146 UNKNOWN
   * 8cca8ea8baa8f718138404c4a605f85f9c323723 UNKNOWN
   * eb7196c290cec28c792cb1d95089bf0f92b8f30f Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8580) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
shouweikun commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r511716192



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -99,11 +105,33 @@ class CommonPhysicalSink (
 
         val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer)
 
+        val inputParallelism = inputTransformation.getParallelism
+        val taskParallelism = env.getParallelism
+        val parallelism = if (runtimeProvider.isInstanceOf[ParallelismProvider]) runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism.orElse(inputParallelism).intValue()
+        else inputParallelism
+
+        if (implicitly[Ordering[Int]].lteq(parallelism, 0)) throw new RuntimeException(s"the configured sink parallelism: $parallelism should not be less than zero or equal to zero")
+        if (implicitly[Ordering[Int]].gt(parallelism, taskParallelism)) throw new RuntimeException(s"the configured sink parallelism: $parallelism is larger than the task max parallelism: $taskParallelism")
+
+        val primaryKeys = TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema)
+        val containedRowKinds = changelogMode.getContainedKinds.toSet
+        val theFinalInputTransformation = if(inputParallelism == parallelism) inputTransformation //if the parallelism is not changed, do nothing
+        else (containedRowKinds, primaryKeys.toList) match {
+        // fixme : if rowKinds only contains  delete, is there somethinng to do with? Currently do nothing.
+        case (_, _) if(containedRowKinds == Set(RowKind.DELETE)) => inputTransformation
+        case (_, _) if(containedRowKinds == Set(RowKind.INSERT)) => inputTransformation
+        // fixme: for retract mode (insert and delete contains only), is there somethinng to do with? Currently do nothing.
+        case (_, _) if(containedRowKinds == Set(RowKind.INSERT,RowKind.DELETE)) => inputTransformation
+        case (_, Nil) if(containedRowKinds.contains(RowKind.UPDATE_AFTER)) => throw new RuntimeException(s"ChangelogMode contains ${RowKind.UPDATE_AFTER}, but no primaryKeys were found")
+        case (_, _) if(containedRowKinds.contains(RowKind.UPDATE_AFTER)) => new DataStream[RowData](env,inputTransformation).keyBy(primaryKeys:_*).getTransformation
+        case _ => throw new RuntimeException(s"the changelogMode is: ${containedRowKinds.mkString(",")}, which is not supported")
+      }
+

Review comment:
       I enumerated all the changelog mode conditions by using Scala match pattern.
   So that we can esaily modify every condition in its own scope.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] shouweikun commented on pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
shouweikun commented on pull request #13789:
URL: https://github.com/apache/flink/pull/13789#issuecomment-717800065


   @flinkbot run azure


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r514054506



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -99,11 +110,57 @@ class CommonPhysicalSink (
 
         val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer)
 
+        assert(runtimeProvider.isInstanceOf[ParallelismProvider],
+          "runtimeProvider with `ParallelismProvider` implementation is required")
+
+        val inputParallelism = inputTransformation.getParallelism
+        val parallelism =  {
+          val parallelismOptional = runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism
+          if(parallelismOptional.isPresent) {
+            val parallelismPassedIn = parallelismOptional.get().intValue()
+            if(parallelismPassedIn <= 0) {
+              throw new TableException(s"Table: $tableIdentifier configured sink parallelism: " +
+                s"$parallelismPassedIn should not be less than zero or equal to zero")
+            }
+            parallelismPassedIn
+          } else {
+            inputParallelism
+          }
+        }
+
+        val primaryKeys = TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema)
+        val theFinalInputTransformation = if (inputParallelism == parallelism) {

Review comment:
       if (inputParallelism == parallelism || changelogMode.containsOnly(RowKind.INSERT)) {
   }




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
shouweikun commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r513167212



##########
File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/sink/SinkFunctionProvider.java
##########
@@ -20,19 +20,39 @@
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.connector.ParallelismProvider;
 import org.apache.flink.table.data.RowData;
 
+import java.util.Optional;
+
 /**
  * Provider of a {@link SinkFunction} instance as a runtime implementation for {@link DynamicTableSink}.
  */
 @PublicEvolving
-public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider {
+public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider {
 
 	/**
 	 * Helper method for creating a static provider.
 	 */
 	static SinkFunctionProvider of(SinkFunction<RowData> sinkFunction) {
-		return () -> sinkFunction;
+		return of(sinkFunction, Optional.empty());
+	}
+
+	/**
+	 * Helper method for creating a static provider, sink parallelism will be configured if non-empty parallelism is passed in.
+	 */
+	static SinkFunctionProvider of(SinkFunction<RowData> sinkFunction, Optional<Integer> parallelism) {

Review comment:
       @JingsongLi 
   it is recommended to use the Optional only in method return values 
   
   copy that.
   
    I think we don't need provide method.
   
   Well, since `SinkFunctionProvider ` implements `ParallelismProvider` as default, as far as I’m concerned, there should be a method passing the parallelism in. Or Is there an better alternative? It‘s kind of u to tell me that~




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13789:
URL: https://github.com/apache/flink/pull/13789#issuecomment-716299701


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8265",
       "triggerID" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8487",
       "triggerID" : "717800065",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "934e4f0acf9044de5c6cb73ae25d094396bfc146",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "934e4f0acf9044de5c6cb73ae25d094396bfc146",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8491",
       "triggerID" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8504",
       "triggerID" : "717890188",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a42eba2756d3ebfe1a6a73b58b31c12ff98435fb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8514",
       "triggerID" : "a42eba2756d3ebfe1a6a73b58b31c12ff98435fb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8570",
       "triggerID" : "718400385",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8570",
       "triggerID" : "5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8cca8ea8baa8f718138404c4a605f85f9c323723",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8cca8ea8baa8f718138404c4a605f85f9c323723",
       "triggerType" : "PUSH"
     }, {
       "hash" : "eb7196c290cec28c792cb1d95089bf0f92b8f30f",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8580",
       "triggerID" : "eb7196c290cec28c792cb1d95089bf0f92b8f30f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "718400385",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 934e4f0acf9044de5c6cb73ae25d094396bfc146 UNKNOWN
   * 5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8570) 
   * 8cca8ea8baa8f718138404c4a605f85f9c323723 UNKNOWN
   * eb7196c290cec28c792cb1d95089bf0f92b8f30f Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8580) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13789:
URL: https://github.com/apache/flink/pull/13789#issuecomment-716299701


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8265",
       "triggerID" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8487",
       "triggerID" : "717800065",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "934e4f0acf9044de5c6cb73ae25d094396bfc146",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "934e4f0acf9044de5c6cb73ae25d094396bfc146",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8491",
       "triggerID" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8504",
       "triggerID" : "717890188",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a42eba2756d3ebfe1a6a73b58b31c12ff98435fb",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a42eba2756d3ebfe1a6a73b58b31c12ff98435fb",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 934e4f0acf9044de5c6cb73ae25d094396bfc146 UNKNOWN
   * fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8491) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8504) 
   * a42eba2756d3ebfe1a6a73b58b31c12ff98435fb UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13789:
URL: https://github.com/apache/flink/pull/13789#issuecomment-716295445


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 6329930d7acc5d2d4d84e777561e79bd25ac9367 (Mon Oct 26 04:25:14 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
shouweikun commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r513167212



##########
File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/sink/SinkFunctionProvider.java
##########
@@ -20,19 +20,39 @@
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.connector.ParallelismProvider;
 import org.apache.flink.table.data.RowData;
 
+import java.util.Optional;
+
 /**
  * Provider of a {@link SinkFunction} instance as a runtime implementation for {@link DynamicTableSink}.
  */
 @PublicEvolving
-public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider {
+public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider {
 
 	/**
 	 * Helper method for creating a static provider.
 	 */
 	static SinkFunctionProvider of(SinkFunction<RowData> sinkFunction) {
-		return () -> sinkFunction;
+		return of(sinkFunction, Optional.empty());
+	}
+
+	/**
+	 * Helper method for creating a static provider, sink parallelism will be configured if non-empty parallelism is passed in.
+	 */
+	static SinkFunctionProvider of(SinkFunction<RowData> sinkFunction, Optional<Integer> parallelism) {

Review comment:
       @JingsongLi 
   **it is recommended to use the Optional only in method return values **
   copy that.
   ** I think we don't need provide method.**
   Well, since `SinkFunctionProvider ` implements `ParallelismProvider` as default, as far as I’m concerned, there should be a method passing the parallelism in. Or Is there an better alternative? It‘s kind of u to tell me that~




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r513172411



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -99,11 +105,33 @@ class CommonPhysicalSink (
 
         val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer)
 
+        val inputParallelism = inputTransformation.getParallelism
+        val taskParallelism = env.getParallelism
+        val parallelism = if (runtimeProvider.isInstanceOf[ParallelismProvider]) runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism.orElse(inputParallelism).intValue()
+        else inputParallelism
+
+        if (implicitly[Ordering[Int]].lteq(parallelism, 0)) throw new RuntimeException(s"the configured sink parallelism: $parallelism should not be less than zero or equal to zero")
+        if (implicitly[Ordering[Int]].gt(parallelism, taskParallelism)) throw new RuntimeException(s"the configured sink parallelism: $parallelism is larger than the task max parallelism: $taskParallelism")
+
+        val primaryKeys = TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema)
+        val containedRowKinds = changelogMode.getContainedKinds.toSet
+        val theFinalInputTransformation = if(inputParallelism == parallelism) inputTransformation //if the parallelism is not changed, do nothing
+        else (containedRowKinds, primaryKeys.toList) match {
+        // fixme : if rowKinds only contains  delete, is there somethinng to do with? Currently do nothing.
+        case (_, _) if(containedRowKinds == Set(RowKind.DELETE)) => inputTransformation
+        case (_, _) if(containedRowKinds == Set(RowKind.INSERT)) => inputTransformation
+        // fixme: for retract mode (insert and delete contains only), is there somethinng to do with? Currently do nothing.
+        case (_, _) if(containedRowKinds == Set(RowKind.INSERT,RowKind.DELETE)) => inputTransformation
+        case (_, Nil) if(containedRowKinds.contains(RowKind.UPDATE_AFTER)) => throw new RuntimeException(s"ChangelogMode contains ${RowKind.UPDATE_AFTER}, but no primaryKeys were found")
+        case (_, _) if(containedRowKinds.contains(RowKind.UPDATE_AFTER)) => new DataStream[RowData](env,inputTransformation).keyBy(primaryKeys:_*).getTransformation

Review comment:
       About how to `HASH_DISTRIBUTED`, you should take a look to `StreamExecExchange`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
shouweikun commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r513169070



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -99,11 +105,33 @@ class CommonPhysicalSink (
 
         val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer)
 
+        val inputParallelism = inputTransformation.getParallelism
+        val taskParallelism = env.getParallelism
+        val parallelism = if (runtimeProvider.isInstanceOf[ParallelismProvider]) runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism.orElse(inputParallelism).intValue()
+        else inputParallelism
+
+        if (implicitly[Ordering[Int]].lteq(parallelism, 0)) throw new RuntimeException(s"the configured sink parallelism: $parallelism should not be less than zero or equal to zero")
+        if (implicitly[Ordering[Int]].gt(parallelism, taskParallelism)) throw new RuntimeException(s"the configured sink parallelism: $parallelism is larger than the task max parallelism: $taskParallelism")
+
+        val primaryKeys = TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema)
+        val containedRowKinds = changelogMode.getContainedKinds.toSet
+        val theFinalInputTransformation = if(inputParallelism == parallelism) inputTransformation //if the parallelism is not changed, do nothing
+        else (containedRowKinds, primaryKeys.toList) match {
+        // fixme : if rowKinds only contains  delete, is there somethinng to do with? Currently do nothing.

Review comment:
       @JingsongLi  
   
   I only do `keyBy` on ChangelogMode which contains `update_after`, while on other changelogMode I just keep the transformation with do nothing upon it. Is that Proper or not?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r514055521



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -99,11 +110,57 @@ class CommonPhysicalSink (
 
         val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer)
 
+        assert(runtimeProvider.isInstanceOf[ParallelismProvider],
+          "runtimeProvider with `ParallelismProvider` implementation is required")
+
+        val inputParallelism = inputTransformation.getParallelism
+        val parallelism =  {
+          val parallelismOptional = runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism
+          if(parallelismOptional.isPresent) {
+            val parallelismPassedIn = parallelismOptional.get().intValue()
+            if(parallelismPassedIn <= 0) {
+              throw new TableException(s"Table: $tableIdentifier configured sink parallelism: " +
+                s"$parallelismPassedIn should not be less than zero or equal to zero")
+            }
+            parallelismPassedIn
+          } else {
+            inputParallelism
+          }
+        }
+
+        val primaryKeys = TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema)
+        val theFinalInputTransformation = if (inputParallelism == parallelism) {
+          // if the inputParallelism is equals to the parallelism, do nothing.
+          inputTransformation
+        } else {
+          (changelogMode, primaryKeys.toList) match {
+            case (_, _) if (changelogMode.containsOnly(RowKind.INSERT)) => inputTransformation
+            case (_, Nil) =>

Review comment:
       `if (primaryKeys.isEmpty)`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r513151711



##########
File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/sink/SinkFunctionProvider.java
##########
@@ -20,19 +20,39 @@
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.connector.ParallelismProvider;
 import org.apache.flink.table.data.RowData;
 
+import java.util.Optional;
+
 /**
  * Provider of a {@link SinkFunction} instance as a runtime implementation for {@link DynamicTableSink}.
  */
 @PublicEvolving
-public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider {
+public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider {
 
 	/**
 	 * Helper method for creating a static provider.
 	 */
 	static SinkFunctionProvider of(SinkFunction<RowData> sinkFunction) {
-		return () -> sinkFunction;
+		return of(sinkFunction, Optional.empty());
+	}
+
+	/**
+	 * Helper method for creating a static provider, sink parallelism will be configured if non-empty parallelism is passed in.
+	 */
+	static SinkFunctionProvider of(SinkFunction<RowData> sinkFunction, Optional<Integer> parallelism) {

Review comment:
       I think we don't need provide method.
   https://flink.apache.org/contributing/code-style-and-quality-java.html#java-optional
   In Flink code style, it is recommended to use the Optional only in method return values.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r514053753



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -99,11 +110,57 @@ class CommonPhysicalSink (
 
         val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer)
 
+        assert(runtimeProvider.isInstanceOf[ParallelismProvider],
+          "runtimeProvider with `ParallelismProvider` implementation is required")
+
+        val inputParallelism = inputTransformation.getParallelism
+        val parallelism =  {
+          val parallelismOptional = runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism
+          if(parallelismOptional.isPresent) {
+            val parallelismPassedIn = parallelismOptional.get().intValue()
+            if(parallelismPassedIn <= 0) {
+              throw new TableException(s"Table: $tableIdentifier configured sink parallelism: " +
+                s"$parallelismPassedIn should not be less than zero or equal to zero")
+            }
+            parallelismPassedIn
+          } else {
+            inputParallelism

Review comment:
       There is no need to add shuffle when `parallelismOptional` is empty.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
shouweikun commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r513167212



##########
File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/sink/SinkFunctionProvider.java
##########
@@ -20,19 +20,39 @@
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.connector.ParallelismProvider;
 import org.apache.flink.table.data.RowData;
 
+import java.util.Optional;
+
 /**
  * Provider of a {@link SinkFunction} instance as a runtime implementation for {@link DynamicTableSink}.
  */
 @PublicEvolving
-public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider {
+public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider {
 
 	/**
 	 * Helper method for creating a static provider.
 	 */
 	static SinkFunctionProvider of(SinkFunction<RowData> sinkFunction) {
-		return () -> sinkFunction;
+		return of(sinkFunction, Optional.empty());
+	}
+
+	/**
+	 * Helper method for creating a static provider, sink parallelism will be configured if non-empty parallelism is passed in.
+	 */
+	static SinkFunctionProvider of(SinkFunction<RowData> sinkFunction, Optional<Integer> parallelism) {

Review comment:
       @JingsongLi 
   **it is recommended to use the Optional only in method return values **
   
   copy that.
   ** I think we don't need provide method.**
   Well, since `SinkFunctionProvider ` implements `ParallelismProvider` as default, as far as I’m concerned, there should be a method passing the parallelism in. Or Is there an better alternative? It‘s kind of u to tell me that~




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13789:
URL: https://github.com/apache/flink/pull/13789#issuecomment-716299701


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8265",
       "triggerID" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8487",
       "triggerID" : "717800065",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "934e4f0acf9044de5c6cb73ae25d094396bfc146",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "934e4f0acf9044de5c6cb73ae25d094396bfc146",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8491",
       "triggerID" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8504",
       "triggerID" : "717890188",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a42eba2756d3ebfe1a6a73b58b31c12ff98435fb",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8514",
       "triggerID" : "a42eba2756d3ebfe1a6a73b58b31c12ff98435fb",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 934e4f0acf9044de5c6cb73ae25d094396bfc146 UNKNOWN
   * a42eba2756d3ebfe1a6a73b58b31c12ff98435fb Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8514) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi merged pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
JingsongLi merged pull request #13789:
URL: https://github.com/apache/flink/pull/13789


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] shouweikun commented on pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
shouweikun commented on pull request #13789:
URL: https://github.com/apache/flink/pull/13789#issuecomment-716296334


   @JingsongLi 
   I push the pr, still some questions should be well discussed. I have left some comments.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13789:
URL: https://github.com/apache/flink/pull/13789#issuecomment-716299701


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8265",
       "triggerID" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6329930d7acc5d2d4d84e777561e79bd25ac9367 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8265) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13789:
URL: https://github.com/apache/flink/pull/13789#issuecomment-716299701


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8265",
       "triggerID" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8487",
       "triggerID" : "717800065",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "934e4f0acf9044de5c6cb73ae25d094396bfc146",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "934e4f0acf9044de5c6cb73ae25d094396bfc146",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6329930d7acc5d2d4d84e777561e79bd25ac9367 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8265) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8487) 
   * 934e4f0acf9044de5c6cb73ae25d094396bfc146 UNKNOWN
   * fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13789:
URL: https://github.com/apache/flink/pull/13789#issuecomment-716299701


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8265",
       "triggerID" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8487",
       "triggerID" : "717800065",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "934e4f0acf9044de5c6cb73ae25d094396bfc146",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "934e4f0acf9044de5c6cb73ae25d094396bfc146",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8491",
       "triggerID" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8504",
       "triggerID" : "717890188",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a42eba2756d3ebfe1a6a73b58b31c12ff98435fb",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8514",
       "triggerID" : "a42eba2756d3ebfe1a6a73b58b31c12ff98435fb",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 934e4f0acf9044de5c6cb73ae25d094396bfc146 UNKNOWN
   * fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8491) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8504) 
   * a42eba2756d3ebfe1a6a73b58b31c12ff98435fb Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8514) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r513168776



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -99,11 +105,33 @@ class CommonPhysicalSink (
 
         val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer)
 
+        val inputParallelism = inputTransformation.getParallelism
+        val taskParallelism = env.getParallelism
+        val parallelism = if (runtimeProvider.isInstanceOf[ParallelismProvider]) runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism.orElse(inputParallelism).intValue()
+        else inputParallelism
+
+        if (implicitly[Ordering[Int]].lteq(parallelism, 0)) throw new RuntimeException(s"the configured sink parallelism: $parallelism should not be less than zero or equal to zero")

Review comment:
       Yes, should always throw `TableException`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r514053753



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -99,11 +110,57 @@ class CommonPhysicalSink (
 
         val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer)
 
+        assert(runtimeProvider.isInstanceOf[ParallelismProvider],
+          "runtimeProvider with `ParallelismProvider` implementation is required")
+
+        val inputParallelism = inputTransformation.getParallelism
+        val parallelism =  {
+          val parallelismOptional = runtimeProvider.asInstanceOf[ParallelismProvider].getParallelism
+          if(parallelismOptional.isPresent) {
+            val parallelismPassedIn = parallelismOptional.get().intValue()
+            if(parallelismPassedIn <= 0) {
+              throw new TableException(s"Table: $tableIdentifier configured sink parallelism: " +
+                s"$parallelismPassedIn should not be less than zero or equal to zero")
+            }
+            parallelismPassedIn
+          } else {
+            inputParallelism

Review comment:
       There is no need when `parallelismOptional` is empty.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13789:
URL: https://github.com/apache/flink/pull/13789#issuecomment-716299701


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8265",
       "triggerID" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8487",
       "triggerID" : "717800065",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "934e4f0acf9044de5c6cb73ae25d094396bfc146",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "934e4f0acf9044de5c6cb73ae25d094396bfc146",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8491",
       "triggerID" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8504",
       "triggerID" : "717890188",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a42eba2756d3ebfe1a6a73b58b31c12ff98435fb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8514",
       "triggerID" : "a42eba2756d3ebfe1a6a73b58b31c12ff98435fb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "CANCELED",
       "url" : "TBD",
       "triggerID" : "718400385",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8570",
       "triggerID" : "718400385",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8570",
       "triggerID" : "5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8cca8ea8baa8f718138404c4a605f85f9c323723",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8cca8ea8baa8f718138404c4a605f85f9c323723",
       "triggerType" : "PUSH"
     }, {
       "hash" : "eb7196c290cec28c792cb1d95089bf0f92b8f30f",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "eb7196c290cec28c792cb1d95089bf0f92b8f30f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 934e4f0acf9044de5c6cb73ae25d094396bfc146 UNKNOWN
   *  Unknown: [CANCELED](TBD) 
   * 5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8570) 
   * 8cca8ea8baa8f718138404c4a605f85f9c323723 UNKNOWN
   * eb7196c290cec28c792cb1d95089bf0f92b8f30f UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
shouweikun commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r513167212



##########
File path: flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/sink/SinkFunctionProvider.java
##########
@@ -20,19 +20,39 @@
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.connector.ParallelismProvider;
 import org.apache.flink.table.data.RowData;
 
+import java.util.Optional;
+
 /**
  * Provider of a {@link SinkFunction} instance as a runtime implementation for {@link DynamicTableSink}.
  */
 @PublicEvolving
-public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider {
+public interface SinkFunctionProvider extends DynamicTableSink.SinkRuntimeProvider, ParallelismProvider {
 
 	/**
 	 * Helper method for creating a static provider.
 	 */
 	static SinkFunctionProvider of(SinkFunction<RowData> sinkFunction) {
-		return () -> sinkFunction;
+		return of(sinkFunction, Optional.empty());
+	}
+
+	/**
+	 * Helper method for creating a static provider, sink parallelism will be configured if non-empty parallelism is passed in.
+	 */
+	static SinkFunctionProvider of(SinkFunction<RowData> sinkFunction, Optional<Integer> parallelism) {

Review comment:
       @JingsongLi 
   > it is recommended to use the Optional only in method return values 
   copy that.
   > I think we don't need provide method.
   Well, since `SinkFunctionProvider ` implements `ParallelismProvider` as default, as far as I’m concerned, there should be a method passing the parallelism in. Or Is there an better alternative? It‘s kind of u to tell me that~




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13789:
URL: https://github.com/apache/flink/pull/13789#issuecomment-716299701


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8265",
       "triggerID" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8487",
       "triggerID" : "717800065",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "934e4f0acf9044de5c6cb73ae25d094396bfc146",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "934e4f0acf9044de5c6cb73ae25d094396bfc146",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8491",
       "triggerID" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8504",
       "triggerID" : "717890188",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a42eba2756d3ebfe1a6a73b58b31c12ff98435fb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8514",
       "triggerID" : "a42eba2756d3ebfe1a6a73b58b31c12ff98435fb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "CANCELED",
       "url" : "TBD",
       "triggerID" : "718400385",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8570",
       "triggerID" : "718400385",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8570",
       "triggerID" : "5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 934e4f0acf9044de5c6cb73ae25d094396bfc146 UNKNOWN
   *  Unknown: [CANCELED](TBD) 
   * 5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8570) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] shouweikun commented on a change in pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
shouweikun commented on a change in pull request #13789:
URL: https://github.com/apache/flink/pull/13789#discussion_r514017050



##########
File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalSink.scala
##########
@@ -99,11 +111,63 @@ class CommonPhysicalSink (
 
         val operator = new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer)
 
+        assert(runtimeProvider.isInstanceOf[ParallelismProvider],
+              "runtimeProvider with `ParallelismProvider` implementation is required")
+
+        val inputParallelism = inputTransformation.getParallelism
+        val parallelism =  {
+          val parallelismOptional = runtimeProvider
+            .asInstanceOf[ParallelismProvider].getParallelism
+          if(parallelismOptional.isPresent) {
+            val parallelismPassedIn = parallelismOptional.get().intValue()
+            if(parallelismPassedIn <= 0) {
+              throw new TableException(
+                s"Table: $tableIdentifier configured sink parallelism: $parallelismPassedIn " +
+                  "should not be less than zero or equal to zero")
+            }
+            parallelismPassedIn
+          } else inputParallelism
+        }
+
+        val primaryKeys = TableSchemaUtils.getPrimaryKeyIndices(catalogTable.getSchema)
+        val theFinalInputTransformation =
+          (inputParallelism == parallelism,changelogMode, primaryKeys.toList) match {
+           // if the inputParallelism equals parallelism, do nothing.
+          case (true, _, _) => inputTransformation
+          case (_, _, _) if (changelogMode.containsOnly(RowKind.INSERT)) => inputTransformation
+          case (_, _, Nil) =>
+            throw new TableException(
+            s"Table: $tableIdentifier configured sink parallelism is: $parallelism, " +
+            s"while the input parallelism is: $inputParallelism. " +
+            s"Since the changelog mode " +
+            s"contains [${changelogMode.getContainedKinds.toList.mkString(",")}], " +
+            s"which is not INSERT_ONLY mode, " +
+            s"primary key is required but no primary key is found"
+          )
+          case (_, _, pks) =>
+            //key by before sink
+            //according to [[StreamExecExchange]]
+            val selector = KeySelectorUtil.getRowDataSelector(
+              pks.toArray, inputTypeInfo)
+            // in case of maxParallelism is negative
+            val keyGroupNum = env.getMaxParallelism match {

Review comment:
       Ok, just use DEFAULT_LOWER_BOUND_MAX_PARALLELISM?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13789:
URL: https://github.com/apache/flink/pull/13789#issuecomment-716299701


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8265",
       "triggerID" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "triggerType" : "PUSH"
     }, {
       "hash" : "6329930d7acc5d2d4d84e777561e79bd25ac9367",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8487",
       "triggerID" : "717800065",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "934e4f0acf9044de5c6cb73ae25d094396bfc146",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "934e4f0acf9044de5c6cb73ae25d094396bfc146",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8491",
       "triggerID" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fd0aa012ed1a15803fe2ed75180d60e6c9fe8d6d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8504",
       "triggerID" : "717890188",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a42eba2756d3ebfe1a6a73b58b31c12ff98435fb",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8514",
       "triggerID" : "a42eba2756d3ebfe1a6a73b58b31c12ff98435fb",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8570",
       "triggerID" : "718400385",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8570",
       "triggerID" : "5caa50b63ed2b7a172ee9c47aff4ea36e40cbb6b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8cca8ea8baa8f718138404c4a605f85f9c323723",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8cca8ea8baa8f718138404c4a605f85f9c323723",
       "triggerType" : "PUSH"
     }, {
       "hash" : "eb7196c290cec28c792cb1d95089bf0f92b8f30f",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8580",
       "triggerID" : "eb7196c290cec28c792cb1d95089bf0f92b8f30f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "718400385",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "eb7196c290cec28c792cb1d95089bf0f92b8f30f",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8609",
       "triggerID" : "718759841",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "eb7196c290cec28c792cb1d95089bf0f92b8f30f",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8580",
       "triggerID" : "718759841",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 934e4f0acf9044de5c6cb73ae25d094396bfc146 UNKNOWN
   * 8cca8ea8baa8f718138404c4a605f85f9c323723 UNKNOWN
   * eb7196c290cec28c792cb1d95089bf0f92b8f30f Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8609) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8580) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] Aireed edited a comment on pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
Aireed edited a comment on pull request #13789:
URL: https://github.com/apache/flink/pull/13789#issuecomment-1009713407


   @shouweikun hello,   what's the purpose of the check??   A connector which support cdc  must be have same parallelism with source or  have primary keys?
   ![image](https://user-images.githubusercontent.com/8862395/148908370-f9c8ce7a-0f22-438b-a3a0-d78cc0669fb7.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] Aireed removed a comment on pull request #13789: [FLINK-19727][table-runtime] Implement ParallelismProvider for sink i…

Posted by GitBox <gi...@apache.org>.
Aireed removed a comment on pull request #13789:
URL: https://github.com/apache/flink/pull/13789#issuecomment-1009713407


   @shouweikun hello,   what's the purpose of the check??   A connector which support cdc  must be have same parallelism with source or  have primary keys even if it's not in changelog mode
   ![image](https://user-images.githubusercontent.com/8862395/148908370-f9c8ce7a-0f22-438b-a3a0-d78cc0669fb7.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org