You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/31 14:35:05 UTC

[GitHub] [flink] SteNicholas opened a new pull request #14822: [FLINK-21005][table-api] Introduce new provider for unified Sink API and implement in planner

SteNicholas opened a new pull request #14822:
URL: https://github.com/apache/flink/pull/14822


   ## What is the purpose of the change
   
   *[FLIP-143](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143) introduced the unified sink API. `SinkRuntimeProvider` should be added for it and support it in Blink planner. Introduce a new provider `SinkProvider` for unified Sink API and implement in planner.*
   
   ## Brief change log
   
     - *Introduce `SinkProvider` as a runtime implementation for `DynamicTableSink`.*
     - *`CommonExecSink` adds the `SinkProvider` implement in the `createSinkTransformation`.*
   
   ## Verifying this change
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / **no**)
     - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14822: [FLINK-21005][table-api] Introduce new provider for unified Sink API and implement in planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14822:
URL: https://github.com/apache/flink/pull/14822#issuecomment-770394744


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9037cffc6cb4a69f455918ba9372d142051fabef",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12711",
       "triggerID" : "9037cffc6cb4a69f455918ba9372d142051fabef",
       "triggerType" : "PUSH"
     }, {
       "hash" : "55066192b0e598f35321934ca8c1f5ea0f3d506c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12840",
       "triggerID" : "55066192b0e598f35321934ca8c1f5ea0f3d506c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9512bc3eb5db31416c4f9c8fcd424a4be25c4143",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13407",
       "triggerID" : "9512bc3eb5db31416c4f9c8fcd424a4be25c4143",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4b0b591d6cc06d0eacc83f9277147d55af6ddf78",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13439",
       "triggerID" : "4b0b591d6cc06d0eacc83f9277147d55af6ddf78",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9512bc3eb5db31416c4f9c8fcd424a4be25c4143 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13407) 
   * 4b0b591d6cc06d0eacc83f9277147d55af6ddf78 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13439) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14822: [FLINK-21005][table-api] Introduce new provider for unified Sink API and implement in planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14822:
URL: https://github.com/apache/flink/pull/14822#issuecomment-770394744


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9037cffc6cb4a69f455918ba9372d142051fabef",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12711",
       "triggerID" : "9037cffc6cb4a69f455918ba9372d142051fabef",
       "triggerType" : "PUSH"
     }, {
       "hash" : "55066192b0e598f35321934ca8c1f5ea0f3d506c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12840",
       "triggerID" : "55066192b0e598f35321934ca8c1f5ea0f3d506c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 55066192b0e598f35321934ca8c1f5ea0f3d506c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12840) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #14822: [FLINK-21005][table-api] Introduce new provider for unified Sink API and implement in planner

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #14822:
URL: https://github.com/apache/flink/pull/14822#discussion_r567610927



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/SinkProvider.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.table.data.RowData;
+
+/** Provider of a {@link Sink} instance as a runtime implementation for {@link DynamicTableSink}. */
+@PublicEvolving
+public interface SinkProvider extends DynamicTableSink.SinkRuntimeProvider {

Review comment:
       extends `ParallelismProvider` as well?
   
   If we supports `ParallelismProvider` in this version, we should also consider the keyBy shuffle when paralleslim is not the same to the upstream operator. 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] SteNicholas removed a comment on pull request #14822: [FLINK-21005][table-api] Introduce new provider for unified Sink API and implement in planner

Posted by GitBox <gi...@apache.org>.
SteNicholas removed a comment on pull request #14822:
URL: https://github.com/apache/flink/pull/14822#issuecomment-772362933


   @wuchong , thanks for your detailed review. I have followed up your comments and add `testSinkProvider` test case for `SinkProvider.` Please help to review again.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] SteNicholas commented on a change in pull request #14822: [FLINK-21005][table-api] Introduce new provider for unified Sink API and implement in planner

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on a change in pull request #14822:
URL: https://github.com/apache/flink/pull/14822#discussion_r572746891



##########
File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
##########
@@ -922,6 +925,42 @@ class TableSinkITCase extends StreamingTestBase {
 
   }
 
+  @Test
+  def testSinkProvider(): Unit = {
+    val file = tempFolder.newFolder()
+    tEnv.executeSql(
+      s"""
+         |CREATE TABLE MyFileSinkTable (
+         |  `a` STRING,
+         |  `b` STRING,
+         |  `c` STRING
+         |) WITH (
+         |  'connector' = 'filesink',
+         |  'path' = '${file.getAbsolutePath}'
+         |)
+         |""".stripMargin)
+
+    val stringTupleData3: Seq[(String, String, String)] = {
+      val data = new mutable.MutableList[(String, String, String)]
+      data.+=(("Test", "Sink", "Hi"))
+      data.+=(("Sink", "Provider", "Hello"))
+      data.+=(("Test", "Provider", "Hello world"))
+      data
+    }
+    val table = env.fromCollection(stringTupleData3).toTable(tEnv, 'a, 'b, 'c)
+    table.executeInsert("MyFileSinkTable").await()
+
+    val source = Source.fromFile(new File(file.getAbsolutePath, file.list()(0)).listFiles()(0).getAbsolutePath)

Review comment:
       @wuchong this could only verify the `FileSink` through `Source.fromFile` because the `FileSink` only write a inprogress file which file the `FileSource` doesn't read.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #14822: [FLINK-21005][table-api] Introduce new provider for unified Sink API and implement in planner

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #14822:
URL: https://github.com/apache/flink/pull/14822#discussion_r569922375



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
##########
@@ -192,6 +196,9 @@ public CommonExecSink(
                 finalInputTransform = new PartitionTransformation<>(inputTransform, partitioner);
                 finalInputTransform.setParallelism(parallelism);
             }
+
+            final SinkOperator operator =
+                    new SinkOperator(env.clean(sinkFunction), rowtimeFieldIndex, enforcer);

Review comment:
       I think we need to do some refactoring for the `createSinkTransformation` method, currently the logic is quite mess. One idea is we can make enforcer always as an single operator if there are not null fields, so the enforcer will not be wrap in the `SinkOperator`.

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
##########
@@ -170,6 +146,27 @@ public CommonExecSink(
                 parallelism = inputParallelism;
             }
 
+            final SinkFunction<RowData> sinkFunction;
+            if (runtimeProvider instanceof SinkFunctionProvider) {
+                sinkFunction = ((SinkFunctionProvider) runtimeProvider).createSinkFunction();
+            } else if (runtimeProvider instanceof OutputFormatProvider) {
+                sinkFunction =
+                        new OutputFormatSinkFunction<>(
+                                ((OutputFormatProvider) runtimeProvider).createOutputFormat());
+            } else if (runtimeProvider instanceof SinkProvider) {
+                return new SinkTransformation<>(

Review comment:
       We should also add key partitioner if needed. 

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestFileSinkFactory.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.factories;
+
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.filesystem.FileSystemOptions;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.HashSet;
+import java.util.Set;
+
+/** Test file source {@link DynamicTableSinkFactory}. */
+public class TestFileSinkFactory implements DynamicTableSinkFactory {

Review comment:
       I would suggest to merge `TestFileSinkFactory` and `TestFileSourceFactory` into a single `TestFileFactory`, in this way, a registered table can both be used as sink and source. 

##########
File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
##########
@@ -922,6 +925,42 @@ class TableSinkITCase extends StreamingTestBase {
 
   }
 
+  @Test
+  def testSinkProvider(): Unit = {
+    val file = tempFolder.newFolder()
+    tEnv.executeSql(
+      s"""
+         |CREATE TABLE MyFileSinkTable (
+         |  `a` STRING,
+         |  `b` STRING,
+         |  `c` STRING
+         |) WITH (
+         |  'connector' = 'filesink',
+         |  'path' = '${file.getAbsolutePath}'
+         |)
+         |""".stripMargin)
+
+    val stringTupleData3: Seq[(String, String, String)] = {
+      val data = new mutable.MutableList[(String, String, String)]
+      data.+=(("Test", "Sink", "Hi"))
+      data.+=(("Sink", "Provider", "Hello"))
+      data.+=(("Test", "Provider", "Hello world"))
+      data
+    }
+    val table = env.fromCollection(stringTupleData3).toTable(tEnv, 'a, 'b, 'c)
+    table.executeInsert("MyFileSinkTable").await()
+
+    val source = Source.fromFile(new File(file.getAbsolutePath, file.list()(0)).listFiles()(0).getAbsolutePath)

Review comment:
       If we merge the `filesink` and `filesource` into one connector, we can simply read `MyFileSinkTable` using SELECT query. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14822: [FLINK-21005][table-api] Introduce new provider for unified Sink API and implement in planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14822:
URL: https://github.com/apache/flink/pull/14822#issuecomment-770394744


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9037cffc6cb4a69f455918ba9372d142051fabef",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12711",
       "triggerID" : "9037cffc6cb4a69f455918ba9372d142051fabef",
       "triggerType" : "PUSH"
     }, {
       "hash" : "55066192b0e598f35321934ca8c1f5ea0f3d506c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12840",
       "triggerID" : "55066192b0e598f35321934ca8c1f5ea0f3d506c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9512bc3eb5db31416c4f9c8fcd424a4be25c4143",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13407",
       "triggerID" : "9512bc3eb5db31416c4f9c8fcd424a4be25c4143",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4b0b591d6cc06d0eacc83f9277147d55af6ddf78",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13439",
       "triggerID" : "4b0b591d6cc06d0eacc83f9277147d55af6ddf78",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f1f95cd2649c7c2d75292ebc06d38ec7febcdad3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13497",
       "triggerID" : "f1f95cd2649c7c2d75292ebc06d38ec7febcdad3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f555e1a49a573cac5e047eb20c75b944ef84ded3",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13661",
       "triggerID" : "f555e1a49a573cac5e047eb20c75b944ef84ded3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f555e1a49a573cac5e047eb20c75b944ef84ded3 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13661) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] SteNicholas commented on a change in pull request #14822: [FLINK-21005][table-api] Introduce new provider for unified Sink API and implement in planner

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on a change in pull request #14822:
URL: https://github.com/apache/flink/pull/14822#discussion_r575254129



##########
File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
##########
@@ -922,6 +925,42 @@ class TableSinkITCase extends StreamingTestBase {
 
   }
 
+  @Test
+  def testSinkProvider(): Unit = {
+    val file = tempFolder.newFolder()
+    tEnv.executeSql(
+      s"""
+         |CREATE TABLE MyFileSinkTable (
+         |  `a` STRING,
+         |  `b` STRING,
+         |  `c` STRING
+         |) WITH (
+         |  'connector' = 'filesink',
+         |  'path' = '${file.getAbsolutePath}'
+         |)
+         |""".stripMargin)
+
+    val stringTupleData3: Seq[(String, String, String)] = {
+      val data = new mutable.MutableList[(String, String, String)]
+      data.+=(("Test", "Sink", "Hi"))
+      data.+=(("Sink", "Provider", "Hello"))
+      data.+=(("Test", "Provider", "Hello world"))
+      data
+    }
+    val table = env.fromCollection(stringTupleData3).toTable(tEnv, 'a, 'b, 'c)
+    table.executeInsert("MyFileSinkTable").await()
+
+    val source = Source.fromFile(new File(file.getAbsolutePath, file.list()(0)).listFiles()(0).getAbsolutePath)

Review comment:
       @wuchong , this couldn't verify the result by simply reading `MyFileSinkTable` using `SELECT` query. Because the `TestFileTableSink` generates in progress file which file `FileSource` couldn't read.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong merged pull request #14822: [FLINK-21005][table-api] Introduce new provider for unified Sink API and implement in planner

Posted by GitBox <gi...@apache.org>.
wuchong merged pull request #14822:
URL: https://github.com/apache/flink/pull/14822


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14822: [FLINK-21005][table-api] Introduce new provider for unified Sink API and implement in planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14822:
URL: https://github.com/apache/flink/pull/14822#issuecomment-770394744


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9037cffc6cb4a69f455918ba9372d142051fabef",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12711",
       "triggerID" : "9037cffc6cb4a69f455918ba9372d142051fabef",
       "triggerType" : "PUSH"
     }, {
       "hash" : "55066192b0e598f35321934ca8c1f5ea0f3d506c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12840",
       "triggerID" : "55066192b0e598f35321934ca8c1f5ea0f3d506c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9512bc3eb5db31416c4f9c8fcd424a4be25c4143",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13407",
       "triggerID" : "9512bc3eb5db31416c4f9c8fcd424a4be25c4143",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4b0b591d6cc06d0eacc83f9277147d55af6ddf78",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4b0b591d6cc06d0eacc83f9277147d55af6ddf78",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9512bc3eb5db31416c4f9c8fcd424a4be25c4143 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13407) 
   * 4b0b591d6cc06d0eacc83f9277147d55af6ddf78 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14822: [FLINK-21005][table-api] Introduce new provider for unified Sink API and implement in planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14822:
URL: https://github.com/apache/flink/pull/14822#issuecomment-770394744


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9037cffc6cb4a69f455918ba9372d142051fabef",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12711",
       "triggerID" : "9037cffc6cb4a69f455918ba9372d142051fabef",
       "triggerType" : "PUSH"
     }, {
       "hash" : "55066192b0e598f35321934ca8c1f5ea0f3d506c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "55066192b0e598f35321934ca8c1f5ea0f3d506c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9037cffc6cb4a69f455918ba9372d142051fabef Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12711) 
   * 55066192b0e598f35321934ca8c1f5ea0f3d506c UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14822: [FLINK-21005][table-api] Introduce new provider for unified Sink API and implement in planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14822:
URL: https://github.com/apache/flink/pull/14822#issuecomment-770392199


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 4b0b591d6cc06d0eacc83f9277147d55af6ddf78 (Fri Feb 19 07:34:06 UTC 2021)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14822: [FLINK-21005][table-api] Introduce new provider for unified Sink API and implement in planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14822:
URL: https://github.com/apache/flink/pull/14822#issuecomment-770394744


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9037cffc6cb4a69f455918ba9372d142051fabef",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12711",
       "triggerID" : "9037cffc6cb4a69f455918ba9372d142051fabef",
       "triggerType" : "PUSH"
     }, {
       "hash" : "55066192b0e598f35321934ca8c1f5ea0f3d506c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12840",
       "triggerID" : "55066192b0e598f35321934ca8c1f5ea0f3d506c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9512bc3eb5db31416c4f9c8fcd424a4be25c4143",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13407",
       "triggerID" : "9512bc3eb5db31416c4f9c8fcd424a4be25c4143",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9512bc3eb5db31416c4f9c8fcd424a4be25c4143 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13407) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #14822: [FLINK-21005][table-api] Introduce new provider for unified Sink API and implement in planner

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #14822:
URL: https://github.com/apache/flink/pull/14822#discussion_r567610927



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/SinkProvider.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.table.data.RowData;
+
+/** Provider of a {@link Sink} instance as a runtime implementation for {@link DynamicTableSink}. */
+@PublicEvolving
+public interface SinkProvider extends DynamicTableSink.SinkRuntimeProvider {

Review comment:
       extends `ParallelismProvider` as well?
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14822: [FLINK-21005][table-api] Introduce new provider for unified Sink API and implement in planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14822:
URL: https://github.com/apache/flink/pull/14822#issuecomment-770394744


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9037cffc6cb4a69f455918ba9372d142051fabef",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12711",
       "triggerID" : "9037cffc6cb4a69f455918ba9372d142051fabef",
       "triggerType" : "PUSH"
     }, {
       "hash" : "55066192b0e598f35321934ca8c1f5ea0f3d506c",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12840",
       "triggerID" : "55066192b0e598f35321934ca8c1f5ea0f3d506c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9037cffc6cb4a69f455918ba9372d142051fabef Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12711) 
   * 55066192b0e598f35321934ca8c1f5ea0f3d506c Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12840) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14822: [FLINK-21005][table-api] Introduce new provider for unified Sink API and implement in planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14822:
URL: https://github.com/apache/flink/pull/14822#issuecomment-770394744


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9037cffc6cb4a69f455918ba9372d142051fabef",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12711",
       "triggerID" : "9037cffc6cb4a69f455918ba9372d142051fabef",
       "triggerType" : "PUSH"
     }, {
       "hash" : "55066192b0e598f35321934ca8c1f5ea0f3d506c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12840",
       "triggerID" : "55066192b0e598f35321934ca8c1f5ea0f3d506c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9512bc3eb5db31416c4f9c8fcd424a4be25c4143",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13407",
       "triggerID" : "9512bc3eb5db31416c4f9c8fcd424a4be25c4143",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4b0b591d6cc06d0eacc83f9277147d55af6ddf78",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13439",
       "triggerID" : "4b0b591d6cc06d0eacc83f9277147d55af6ddf78",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f1f95cd2649c7c2d75292ebc06d38ec7febcdad3",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13497",
       "triggerID" : "f1f95cd2649c7c2d75292ebc06d38ec7febcdad3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f1f95cd2649c7c2d75292ebc06d38ec7febcdad3 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13497) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] SteNicholas removed a comment on pull request #14822: [FLINK-21005][table-api] Introduce new provider for unified Sink API and implement in planner

Posted by GitBox <gi...@apache.org>.
SteNicholas removed a comment on pull request #14822:
URL: https://github.com/apache/flink/pull/14822#issuecomment-780568737


   @wuchong , I have followed your detailed comments about the refactoring of `CommonExecSink#createSinkTransformation`. Please help to review again.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #14822: [FLINK-21005][table-api] Introduce new provider for unified Sink API and implement in planner

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #14822:
URL: https://github.com/apache/flink/pull/14822#issuecomment-770394744


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9037cffc6cb4a69f455918ba9372d142051fabef",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "9037cffc6cb4a69f455918ba9372d142051fabef",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9037cffc6cb4a69f455918ba9372d142051fabef UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] SteNicholas commented on pull request #14822: [FLINK-21005][table-api] Introduce new provider for unified Sink API and implement in planner

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on pull request #14822:
URL: https://github.com/apache/flink/pull/14822#issuecomment-780568737


   @wuchong , I have followed your detailed comments about the refactoring of `CommonExecSink#createSinkTransformation`. Please help to review again.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #14822: [FLINK-21005][table-api] Introduce new provider for unified Sink API and implement in planner

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #14822:
URL: https://github.com/apache/flink/pull/14822#discussion_r567614908



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
##########
@@ -123,6 +126,10 @@ public CommonExecSink(
                 final DataStreamSinkProvider provider = (DataStreamSinkProvider) runtimeProvider;
                 return provider.consumeDataStream(dataStream).getTransformation();
             }
+        } else if (runtimeProvider instanceof SinkProvider) {
+            final Sink<RowData, ?, ?, ?> sink = ((SinkProvider) runtimeProvider).createSink();
+            return new SinkTransformation<>(

Review comment:
       We should also add enfore logic `SinkNotNullEnforcer`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #14822: [FLINK-21005][table-api] Introduce new provider for unified Sink API and implement in planner

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #14822:
URL: https://github.com/apache/flink/pull/14822#discussion_r580011204



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
##########
@@ -201,6 +185,31 @@ protected CommonExecSink(
                 finalInputTransform = new PartitionTransformation<>(inputTransform, partitioner);
                 finalInputTransform.setParallelism(parallelism);
             }
+
+            final SinkFunction<RowData> sinkFunction;
+            if (runtimeProvider instanceof SinkFunctionProvider) {
+                sinkFunction = ((SinkFunctionProvider) runtimeProvider).createSinkFunction();
+            } else if (runtimeProvider instanceof OutputFormatProvider) {
+                OutputFormat<RowData> outputFormat =
+                        ((OutputFormatProvider) runtimeProvider).createOutputFormat();
+                sinkFunction = new OutputFormatSinkFunction<>(outputFormat);
+            } else if (runtimeProvider instanceof SinkProvider) {
+                return new SinkTransformation<>(
+                        finalInputTransform,
+                        ((SinkProvider) runtimeProvider).createSink(),

Review comment:
       This still misses to apply not-null-enforcer.
   Besides, could you add an IT case for not-null-enforcer + unified sink interface?

##########
File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
##########
@@ -838,6 +841,44 @@ class TableSinkITCase extends StreamingTestBase {
 
   }
 
+  @Test
+  def testSinkProvider(): Unit = {

Review comment:
       nit: what about `testUnifiedSinkInterface`? Currently, all the SinkRuntimeProviders have keywords of `Sink` and `Provider`. 

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestFileFactory.java
##########
@@ -31,27 +33,40 @@
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkProvider;
 import org.apache.flink.table.connector.source.DataStreamScanProvider;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
 import org.apache.flink.table.connector.source.SourceProvider;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.filesystem.FileSystemOptions;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
-/** Test file source {@link DynamicTableSourceFactory}. */
-public class TestFileSourceFactory implements DynamicTableSourceFactory {
+/**
+ * Test implementation of {@link DynamicTableSourceFactory} and {@link DynamicTableSinkFactory} that
+ * creates a file source and sink based on {@link SourceProvider} and {@link SinkProvider}.
+ */
+public class TestFileFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+    // --------------------------------------------------------------------------------------------
+    // Factory
+    // --------------------------------------------------------------------------------------------
+
+    private static final String IDENTIFIER = "file";

Review comment:
       What about using `test-file` to make it more clearer it is only used for testing. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] SteNicholas commented on pull request #14822: [FLINK-21005][table-api] Introduce new provider for unified Sink API and implement in planner

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on pull request #14822:
URL: https://github.com/apache/flink/pull/14822#issuecomment-772362933


   @wuchong , thanks for your detailed review. I have followed up your comments and add `testSinkProvider` test case for `SinkProvider.` Please help to review again.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] SteNicholas commented on a change in pull request #14822: [FLINK-21005][table-api] Introduce new provider for unified Sink API and implement in planner

Posted by GitBox <gi...@apache.org>.
SteNicholas commented on a change in pull request #14822:
URL: https://github.com/apache/flink/pull/14822#discussion_r572746891



##########
File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
##########
@@ -922,6 +925,42 @@ class TableSinkITCase extends StreamingTestBase {
 
   }
 
+  @Test
+  def testSinkProvider(): Unit = {
+    val file = tempFolder.newFolder()
+    tEnv.executeSql(
+      s"""
+         |CREATE TABLE MyFileSinkTable (
+         |  `a` STRING,
+         |  `b` STRING,
+         |  `c` STRING
+         |) WITH (
+         |  'connector' = 'filesink',
+         |  'path' = '${file.getAbsolutePath}'
+         |)
+         |""".stripMargin)
+
+    val stringTupleData3: Seq[(String, String, String)] = {
+      val data = new mutable.MutableList[(String, String, String)]
+      data.+=(("Test", "Sink", "Hi"))
+      data.+=(("Sink", "Provider", "Hello"))
+      data.+=(("Test", "Provider", "Hello world"))
+      data
+    }
+    val table = env.fromCollection(stringTupleData3).toTable(tEnv, 'a, 'b, 'c)
+    table.executeInsert("MyFileSinkTable").await()
+
+    val source = Source.fromFile(new File(file.getAbsolutePath, file.list()(0)).listFiles()(0).getAbsolutePath)

Review comment:
       @wuchong this could only verify the `FileSink` through `Source.fromFile` because the `FileSink` only write a inprogress file which file the `FileSource` doesn't read.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14822: [FLINK-21005][table-api] Introduce new provider for unified Sink API and implement in planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14822:
URL: https://github.com/apache/flink/pull/14822#issuecomment-770394744


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9037cffc6cb4a69f455918ba9372d142051fabef",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12711",
       "triggerID" : "9037cffc6cb4a69f455918ba9372d142051fabef",
       "triggerType" : "PUSH"
     }, {
       "hash" : "55066192b0e598f35321934ca8c1f5ea0f3d506c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12840",
       "triggerID" : "55066192b0e598f35321934ca8c1f5ea0f3d506c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9512bc3eb5db31416c4f9c8fcd424a4be25c4143",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13407",
       "triggerID" : "9512bc3eb5db31416c4f9c8fcd424a4be25c4143",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4b0b591d6cc06d0eacc83f9277147d55af6ddf78",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13439",
       "triggerID" : "4b0b591d6cc06d0eacc83f9277147d55af6ddf78",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f1f95cd2649c7c2d75292ebc06d38ec7febcdad3",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13497",
       "triggerID" : "f1f95cd2649c7c2d75292ebc06d38ec7febcdad3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4b0b591d6cc06d0eacc83f9277147d55af6ddf78 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13439) 
   * f1f95cd2649c7c2d75292ebc06d38ec7febcdad3 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13497) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14822: [FLINK-21005][table-api] Introduce new provider for unified Sink API and implement in planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14822:
URL: https://github.com/apache/flink/pull/14822#issuecomment-770394744


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9037cffc6cb4a69f455918ba9372d142051fabef",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12711",
       "triggerID" : "9037cffc6cb4a69f455918ba9372d142051fabef",
       "triggerType" : "PUSH"
     }, {
       "hash" : "55066192b0e598f35321934ca8c1f5ea0f3d506c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12840",
       "triggerID" : "55066192b0e598f35321934ca8c1f5ea0f3d506c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9512bc3eb5db31416c4f9c8fcd424a4be25c4143",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13407",
       "triggerID" : "9512bc3eb5db31416c4f9c8fcd424a4be25c4143",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4b0b591d6cc06d0eacc83f9277147d55af6ddf78",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13439",
       "triggerID" : "4b0b591d6cc06d0eacc83f9277147d55af6ddf78",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f1f95cd2649c7c2d75292ebc06d38ec7febcdad3",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13497",
       "triggerID" : "f1f95cd2649c7c2d75292ebc06d38ec7febcdad3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f555e1a49a573cac5e047eb20c75b944ef84ded3",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13661",
       "triggerID" : "f555e1a49a573cac5e047eb20c75b944ef84ded3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f1f95cd2649c7c2d75292ebc06d38ec7febcdad3 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13497) 
   * f555e1a49a573cac5e047eb20c75b944ef84ded3 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13661) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14822: [FLINK-21005][table-api] Introduce new provider for unified Sink API and implement in planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14822:
URL: https://github.com/apache/flink/pull/14822#issuecomment-770394744


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9037cffc6cb4a69f455918ba9372d142051fabef",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12711",
       "triggerID" : "9037cffc6cb4a69f455918ba9372d142051fabef",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9037cffc6cb4a69f455918ba9372d142051fabef Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12711) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14822: [FLINK-21005][table-api] Introduce new provider for unified Sink API and implement in planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14822:
URL: https://github.com/apache/flink/pull/14822#issuecomment-770394744


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9037cffc6cb4a69f455918ba9372d142051fabef",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12711",
       "triggerID" : "9037cffc6cb4a69f455918ba9372d142051fabef",
       "triggerType" : "PUSH"
     }, {
       "hash" : "55066192b0e598f35321934ca8c1f5ea0f3d506c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12840",
       "triggerID" : "55066192b0e598f35321934ca8c1f5ea0f3d506c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9512bc3eb5db31416c4f9c8fcd424a4be25c4143",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "9512bc3eb5db31416c4f9c8fcd424a4be25c4143",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 55066192b0e598f35321934ca8c1f5ea0f3d506c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12840) 
   * 9512bc3eb5db31416c4f9c8fcd424a4be25c4143 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14822: [FLINK-21005][table-api] Introduce new provider for unified Sink API and implement in planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14822:
URL: https://github.com/apache/flink/pull/14822#issuecomment-770394744


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9037cffc6cb4a69f455918ba9372d142051fabef",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12711",
       "triggerID" : "9037cffc6cb4a69f455918ba9372d142051fabef",
       "triggerType" : "PUSH"
     }, {
       "hash" : "55066192b0e598f35321934ca8c1f5ea0f3d506c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12840",
       "triggerID" : "55066192b0e598f35321934ca8c1f5ea0f3d506c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9512bc3eb5db31416c4f9c8fcd424a4be25c4143",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13407",
       "triggerID" : "9512bc3eb5db31416c4f9c8fcd424a4be25c4143",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4b0b591d6cc06d0eacc83f9277147d55af6ddf78",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13439",
       "triggerID" : "4b0b591d6cc06d0eacc83f9277147d55af6ddf78",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f1f95cd2649c7c2d75292ebc06d38ec7febcdad3",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f1f95cd2649c7c2d75292ebc06d38ec7febcdad3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4b0b591d6cc06d0eacc83f9277147d55af6ddf78 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13439) 
   * f1f95cd2649c7c2d75292ebc06d38ec7febcdad3 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14822: [FLINK-21005][table-api] Introduce new provider for unified Sink API and implement in planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14822:
URL: https://github.com/apache/flink/pull/14822#issuecomment-770394744


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9037cffc6cb4a69f455918ba9372d142051fabef",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12711",
       "triggerID" : "9037cffc6cb4a69f455918ba9372d142051fabef",
       "triggerType" : "PUSH"
     }, {
       "hash" : "55066192b0e598f35321934ca8c1f5ea0f3d506c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12840",
       "triggerID" : "55066192b0e598f35321934ca8c1f5ea0f3d506c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9512bc3eb5db31416c4f9c8fcd424a4be25c4143",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13407",
       "triggerID" : "9512bc3eb5db31416c4f9c8fcd424a4be25c4143",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4b0b591d6cc06d0eacc83f9277147d55af6ddf78",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13439",
       "triggerID" : "4b0b591d6cc06d0eacc83f9277147d55af6ddf78",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4b0b591d6cc06d0eacc83f9277147d55af6ddf78 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13439) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #14822: [FLINK-21005][table-api] Introduce new provider for unified Sink API and implement in planner

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #14822:
URL: https://github.com/apache/flink/pull/14822#issuecomment-770392199


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 9037cffc6cb4a69f455918ba9372d142051fabef (Sun Jan 31 14:37:48 UTC 2021)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14822: [FLINK-21005][table-api] Introduce new provider for unified Sink API and implement in planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14822:
URL: https://github.com/apache/flink/pull/14822#issuecomment-770394744


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9037cffc6cb4a69f455918ba9372d142051fabef",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12711",
       "triggerID" : "9037cffc6cb4a69f455918ba9372d142051fabef",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 9037cffc6cb4a69f455918ba9372d142051fabef Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12711) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14822: [FLINK-21005][table-api] Introduce new provider for unified Sink API and implement in planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14822:
URL: https://github.com/apache/flink/pull/14822#issuecomment-770394744


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9037cffc6cb4a69f455918ba9372d142051fabef",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12711",
       "triggerID" : "9037cffc6cb4a69f455918ba9372d142051fabef",
       "triggerType" : "PUSH"
     }, {
       "hash" : "55066192b0e598f35321934ca8c1f5ea0f3d506c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12840",
       "triggerID" : "55066192b0e598f35321934ca8c1f5ea0f3d506c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9512bc3eb5db31416c4f9c8fcd424a4be25c4143",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13407",
       "triggerID" : "9512bc3eb5db31416c4f9c8fcd424a4be25c4143",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 55066192b0e598f35321934ca8c1f5ea0f3d506c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12840) 
   * 9512bc3eb5db31416c4f9c8fcd424a4be25c4143 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13407) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #14822: [FLINK-21005][table-api] Introduce new provider for unified Sink API and implement in planner

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #14822:
URL: https://github.com/apache/flink/pull/14822#discussion_r567614908



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
##########
@@ -123,6 +126,10 @@ public CommonExecSink(
                 final DataStreamSinkProvider provider = (DataStreamSinkProvider) runtimeProvider;
                 return provider.consumeDataStream(dataStream).getTransformation();
             }
+        } else if (runtimeProvider instanceof SinkProvider) {
+            final Sink<RowData, ?, ?, ?> sink = ((SinkProvider) runtimeProvider).createSink();
+            return new SinkTransformation<>(

Review comment:
       We should also add not-null enfore logic `SinkNotNullEnforcer`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14822: [FLINK-21005][table-api] Introduce new provider for unified Sink API and implement in planner

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14822:
URL: https://github.com/apache/flink/pull/14822#issuecomment-770394744


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "9037cffc6cb4a69f455918ba9372d142051fabef",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12711",
       "triggerID" : "9037cffc6cb4a69f455918ba9372d142051fabef",
       "triggerType" : "PUSH"
     }, {
       "hash" : "55066192b0e598f35321934ca8c1f5ea0f3d506c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12840",
       "triggerID" : "55066192b0e598f35321934ca8c1f5ea0f3d506c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "9512bc3eb5db31416c4f9c8fcd424a4be25c4143",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13407",
       "triggerID" : "9512bc3eb5db31416c4f9c8fcd424a4be25c4143",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4b0b591d6cc06d0eacc83f9277147d55af6ddf78",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13439",
       "triggerID" : "4b0b591d6cc06d0eacc83f9277147d55af6ddf78",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f1f95cd2649c7c2d75292ebc06d38ec7febcdad3",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13497",
       "triggerID" : "f1f95cd2649c7c2d75292ebc06d38ec7febcdad3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f555e1a49a573cac5e047eb20c75b944ef84ded3",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f555e1a49a573cac5e047eb20c75b944ef84ded3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f1f95cd2649c7c2d75292ebc06d38ec7febcdad3 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=13497) 
   * f555e1a49a573cac5e047eb20c75b944ef84ded3 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org