You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/30 03:52:59 UTC

[GitHub] [flink] fsk119 opened a new pull request #13850: [FLINK-19858]Intorduce the upsert-kafka table factory

fsk119 opened a new pull request #13850:
URL: https://github.com/apache/flink/pull/13850


   <!--
   *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
     Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Azure Pipelines CI to do that following [this guide](https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository).
   
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message (including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   
   *Introduce the upsert kafka table factory*
   
   
   ## Brief change log
   
     - *Added table factory*
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - *Added table factory test*
     - *Added ITCases*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (**yes** / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13850: [FLINK-19858]Intorduce the upsert-kafka table factory

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13850:
URL: https://github.com/apache/flink/pull/13850#issuecomment-719162027


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "73ecb300a223c19e23506332846f40a531d8e723",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8645",
       "triggerID" : "73ecb300a223c19e23506332846f40a531d8e723",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 73ecb300a223c19e23506332846f40a531d8e723 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8645) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13850: [FLINK-19858]Intorduce the upsert-kafka table factory

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13850:
URL: https://github.com/apache/flink/pull/13850#issuecomment-719162027


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "73ecb300a223c19e23506332846f40a531d8e723",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "73ecb300a223c19e23506332846f40a531d8e723",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 73ecb300a223c19e23506332846f40a531d8e723 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13850: [FLINK-19858]Intorduce the upsert-kafka table factory

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13850:
URL: https://github.com/apache/flink/pull/13850#issuecomment-719162027


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "73ecb300a223c19e23506332846f40a531d8e723",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8645",
       "triggerID" : "73ecb300a223c19e23506332846f40a531d8e723",
       "triggerType" : "PUSH"
     }, {
       "hash" : "16c0c99d7d74516d0d89899f1f868f436a669182",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8648",
       "triggerID" : "16c0c99d7d74516d0d89899f1f868f436a669182",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d595070a5d7ff863daeae6d8e070f7195304636",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8722",
       "triggerID" : "3d595070a5d7ff863daeae6d8e070f7195304636",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ee2aec1f35053f3ac2f45e4040df21fb8df4c6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8726",
       "triggerID" : "99ee2aec1f35053f3ac2f45e4040df21fb8df4c6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fb13ae80b253b874698f51ab065dc9438251b414",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8779",
       "triggerID" : "fb13ae80b253b874698f51ab065dc9438251b414",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3a16055c657e05d255e37ab3711685cfc13efca",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8941",
       "triggerID" : "d3a16055c657e05d255e37ab3711685cfc13efca",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ab7d66c98efe2cbae4d7c1f1e699df685b8d9bb1",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9008",
       "triggerID" : "ab7d66c98efe2cbae4d7c1f1e699df685b8d9bb1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ab7d66c98efe2cbae4d7c1f1e699df685b8d9bb1 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9008) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13850: [FLINK-19858]Intorduce the upsert-kafka table factory

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13850:
URL: https://github.com/apache/flink/pull/13850#issuecomment-719162027


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "73ecb300a223c19e23506332846f40a531d8e723",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8645",
       "triggerID" : "73ecb300a223c19e23506332846f40a531d8e723",
       "triggerType" : "PUSH"
     }, {
       "hash" : "16c0c99d7d74516d0d89899f1f868f436a669182",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8648",
       "triggerID" : "16c0c99d7d74516d0d89899f1f868f436a669182",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d595070a5d7ff863daeae6d8e070f7195304636",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8722",
       "triggerID" : "3d595070a5d7ff863daeae6d8e070f7195304636",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ee2aec1f35053f3ac2f45e4040df21fb8df4c6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8726",
       "triggerID" : "99ee2aec1f35053f3ac2f45e4040df21fb8df4c6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fb13ae80b253b874698f51ab065dc9438251b414",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8779",
       "triggerID" : "fb13ae80b253b874698f51ab065dc9438251b414",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3a16055c657e05d255e37ab3711685cfc13efca",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8941",
       "triggerID" : "d3a16055c657e05d255e37ab3711685cfc13efca",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ab7d66c98efe2cbae4d7c1f1e699df685b8d9bb1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9008",
       "triggerID" : "ab7d66c98efe2cbae4d7c1f1e699df685b8d9bb1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "45845058aa91089037420a3dea07901497e645d4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9033",
       "triggerID" : "45845058aa91089037420a3dea07901497e645d4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "726b4e0a7a34dfd82e09cf9d04c98d722bb12297",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9043",
       "triggerID" : "726b4e0a7a34dfd82e09cf9d04c98d722bb12297",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8e4ebec4831e4e2800adb10eb12980869214674a",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9052",
       "triggerID" : "8e4ebec4831e4e2800adb10eb12980869214674a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8e4ebec4831e4e2800adb10eb12980869214674a Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9052) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on pull request #13850: [FLINK-19858]Intorduce the upsert-kafka table factory

Posted by GitBox <gi...@apache.org>.
wuchong commented on pull request #13850:
URL: https://github.com/apache/flink/pull/13850#issuecomment-721797062


   @fsk119 I helped to improve some code and fix the IT cases. Please have a check.
   
   Besides, I add a check to disallow primary key on insert-only kafka table.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong closed pull request #13850: [FLINK-19858]Intorduce the upsert-kafka table factory

Posted by GitBox <gi...@apache.org>.
wuchong closed pull request #13850:
URL: https://github.com/apache/flink/pull/13850


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13850: [FLINK-19858]Intorduce the upsert-kafka table factory

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13850:
URL: https://github.com/apache/flink/pull/13850#issuecomment-719162027


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "73ecb300a223c19e23506332846f40a531d8e723",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8645",
       "triggerID" : "73ecb300a223c19e23506332846f40a531d8e723",
       "triggerType" : "PUSH"
     }, {
       "hash" : "16c0c99d7d74516d0d89899f1f868f436a669182",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8648",
       "triggerID" : "16c0c99d7d74516d0d89899f1f868f436a669182",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d595070a5d7ff863daeae6d8e070f7195304636",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8722",
       "triggerID" : "3d595070a5d7ff863daeae6d8e070f7195304636",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ee2aec1f35053f3ac2f45e4040df21fb8df4c6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8726",
       "triggerID" : "99ee2aec1f35053f3ac2f45e4040df21fb8df4c6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fb13ae80b253b874698f51ab065dc9438251b414",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8779",
       "triggerID" : "fb13ae80b253b874698f51ab065dc9438251b414",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3a16055c657e05d255e37ab3711685cfc13efca",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d3a16055c657e05d255e37ab3711685cfc13efca",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * fb13ae80b253b874698f51ab065dc9438251b414 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8779) 
   * d3a16055c657e05d255e37ab3711685cfc13efca UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13850: [FLINK-19858]Intorduce the upsert-kafka table factory

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13850:
URL: https://github.com/apache/flink/pull/13850#issuecomment-719162027


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "73ecb300a223c19e23506332846f40a531d8e723",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8645",
       "triggerID" : "73ecb300a223c19e23506332846f40a531d8e723",
       "triggerType" : "PUSH"
     }, {
       "hash" : "16c0c99d7d74516d0d89899f1f868f436a669182",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8648",
       "triggerID" : "16c0c99d7d74516d0d89899f1f868f436a669182",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d595070a5d7ff863daeae6d8e070f7195304636",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8722",
       "triggerID" : "3d595070a5d7ff863daeae6d8e070f7195304636",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ee2aec1f35053f3ac2f45e4040df21fb8df4c6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8726",
       "triggerID" : "99ee2aec1f35053f3ac2f45e4040df21fb8df4c6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fb13ae80b253b874698f51ab065dc9438251b414",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8779",
       "triggerID" : "fb13ae80b253b874698f51ab065dc9438251b414",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3a16055c657e05d255e37ab3711685cfc13efca",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8941",
       "triggerID" : "d3a16055c657e05d255e37ab3711685cfc13efca",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d3a16055c657e05d255e37ab3711685cfc13efca Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8941) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13850: [FLINK-19858]Intorduce the upsert-kafka table factory

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13850:
URL: https://github.com/apache/flink/pull/13850#issuecomment-719162027


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "73ecb300a223c19e23506332846f40a531d8e723",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8645",
       "triggerID" : "73ecb300a223c19e23506332846f40a531d8e723",
       "triggerType" : "PUSH"
     }, {
       "hash" : "16c0c99d7d74516d0d89899f1f868f436a669182",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "16c0c99d7d74516d0d89899f1f868f436a669182",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 73ecb300a223c19e23506332846f40a531d8e723 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8645) 
   * 16c0c99d7d74516d0d89899f1f868f436a669182 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13850: [FLINK-19858]Intorduce the upsert-kafka table factory

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13850:
URL: https://github.com/apache/flink/pull/13850#issuecomment-719162027


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "73ecb300a223c19e23506332846f40a531d8e723",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8645",
       "triggerID" : "73ecb300a223c19e23506332846f40a531d8e723",
       "triggerType" : "PUSH"
     }, {
       "hash" : "16c0c99d7d74516d0d89899f1f868f436a669182",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8648",
       "triggerID" : "16c0c99d7d74516d0d89899f1f868f436a669182",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d595070a5d7ff863daeae6d8e070f7195304636",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8722",
       "triggerID" : "3d595070a5d7ff863daeae6d8e070f7195304636",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ee2aec1f35053f3ac2f45e4040df21fb8df4c6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8726",
       "triggerID" : "99ee2aec1f35053f3ac2f45e4040df21fb8df4c6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fb13ae80b253b874698f51ab065dc9438251b414",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8779",
       "triggerID" : "fb13ae80b253b874698f51ab065dc9438251b414",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3a16055c657e05d255e37ab3711685cfc13efca",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8941",
       "triggerID" : "d3a16055c657e05d255e37ab3711685cfc13efca",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ab7d66c98efe2cbae4d7c1f1e699df685b8d9bb1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9008",
       "triggerID" : "ab7d66c98efe2cbae4d7c1f1e699df685b8d9bb1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "45845058aa91089037420a3dea07901497e645d4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9033",
       "triggerID" : "45845058aa91089037420a3dea07901497e645d4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "726b4e0a7a34dfd82e09cf9d04c98d722bb12297",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9043",
       "triggerID" : "726b4e0a7a34dfd82e09cf9d04c98d722bb12297",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8e4ebec4831e4e2800adb10eb12980869214674a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9052",
       "triggerID" : "8e4ebec4831e4e2800adb10eb12980869214674a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 726b4e0a7a34dfd82e09cf9d04c98d722bb12297 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9043) 
   * 8e4ebec4831e4e2800adb10eb12980869214674a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9052) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13850: [FLINK-19858]Intorduce the upsert-kafka table factory

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13850:
URL: https://github.com/apache/flink/pull/13850#issuecomment-719162027


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "73ecb300a223c19e23506332846f40a531d8e723",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8645",
       "triggerID" : "73ecb300a223c19e23506332846f40a531d8e723",
       "triggerType" : "PUSH"
     }, {
       "hash" : "16c0c99d7d74516d0d89899f1f868f436a669182",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8648",
       "triggerID" : "16c0c99d7d74516d0d89899f1f868f436a669182",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d595070a5d7ff863daeae6d8e070f7195304636",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8722",
       "triggerID" : "3d595070a5d7ff863daeae6d8e070f7195304636",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ee2aec1f35053f3ac2f45e4040df21fb8df4c6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8726",
       "triggerID" : "99ee2aec1f35053f3ac2f45e4040df21fb8df4c6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fb13ae80b253b874698f51ab065dc9438251b414",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8779",
       "triggerID" : "fb13ae80b253b874698f51ab065dc9438251b414",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3a16055c657e05d255e37ab3711685cfc13efca",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8941",
       "triggerID" : "d3a16055c657e05d255e37ab3711685cfc13efca",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ab7d66c98efe2cbae4d7c1f1e699df685b8d9bb1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9008",
       "triggerID" : "ab7d66c98efe2cbae4d7c1f1e699df685b8d9bb1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "45845058aa91089037420a3dea07901497e645d4",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9033",
       "triggerID" : "45845058aa91089037420a3dea07901497e645d4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "726b4e0a7a34dfd82e09cf9d04c98d722bb12297",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9043",
       "triggerID" : "726b4e0a7a34dfd82e09cf9d04c98d722bb12297",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8e4ebec4831e4e2800adb10eb12980869214674a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9052",
       "triggerID" : "8e4ebec4831e4e2800adb10eb12980869214674a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 45845058aa91089037420a3dea07901497e645d4 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9033) 
   * 726b4e0a7a34dfd82e09cf9d04c98d722bb12297 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9043) 
   * 8e4ebec4831e4e2800adb10eb12980869214674a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9052) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13850: [FLINK-19858]Intorduce the upsert-kafka table factory

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13850:
URL: https://github.com/apache/flink/pull/13850#issuecomment-719162027






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wuchong commented on a change in pull request #13850: [FLINK-19858]Intorduce the upsert-kafka table factory

Posted by GitBox <gi...@apache.org>.
wuchong commented on a change in pull request #13850:
URL: https://github.com/apache/flink/pull/13850#discussion_r514910862



##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.types.RowKind;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPERTIES_PREFIX;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_BOOTSTRAP_SERVERS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.TOPIC;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getSinkSemantic;
+
+/**
+ * Upsert-Kafka factory.
+ */
+public class UpsertKafkaDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+	public static final String IDENTIFIER = "upsert-kafka";
+
+	public static final ChangelogMode SOURCE_CHANGELOG_MODE = ChangelogMode.newBuilder()
+			.addContainedKind(RowKind.UPDATE_AFTER)
+			.addContainedKind(RowKind.DELETE)
+			.build();
+
+	public static final ChangelogMode SINK_CHANGELOG_MODE = ChangelogMode.newBuilder()
+			.addContainedKind(RowKind.INSERT)
+			.addContainedKind(RowKind.UPDATE_AFTER)
+			.addContainedKind(RowKind.DELETE)
+			.build();
+
+	@Override
+	public DynamicTableSource createDynamicTableSource(Context context) {
+		FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		ReadableConfig tableOptions = helper.getOptions();
+		DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat = helper.discoverDecodingFormat(
+				DeserializationFormatFactory.class,
+				FactoryUtil.KEY_FORMAT);
+		DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat = helper.discoverDecodingFormat(
+				DeserializationFormatFactory.class,
+				FactoryUtil.VALUE_FORMAT);
+
+		// Validate the option data type.
+		helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+		TableSchema schema = context.getCatalogTable().getSchema();
+		TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(schema);
+		List<String> physicalColumns = Arrays.asList(physicalSchema.getFieldNames());
+		validateTableSourceOptions(
+				tableOptions,
+				keyDecodingFormat,
+				valueDecodingFormat,
+				schema);
+
+		int[] keyProjection = schema.getPrimaryKey().get().getColumns().stream()
+				.filter(name -> physicalSchema.getTableColumn(name).isPresent())
+				.mapToInt(physicalColumns::indexOf)
+				.toArray();
+		int[] valueProjection = getValueProjection(tableOptions, keyProjection, physicalColumns.size());
+		final String keyPrefix = tableOptions.getOptional(FactoryUtil.KEY_FIELDS_PREFIX).orElse(null);
+		// always use earliest to keep data integrity
+		final StartupMode earliest = StartupMode.EARLIEST;
+
+		return new KafkaDynamicSource(
+				schema.toPhysicalRowDataType(),
+				SOURCE_CHANGELOG_MODE,
+				keyDecodingFormat,
+				valueDecodingFormat,
+				keyProjection,
+				valueProjection,
+				keyPrefix,
+				KafkaOptions.getSourceTopics(tableOptions),
+				KafkaOptions.getSourceTopicPattern(tableOptions),
+				getBootstrapServers(tableOptions),
+				earliest,
+				Collections.emptyMap(),
+				0);
+	}
+
+	@Override
+	public DynamicTableSink createDynamicTableSink(Context context) {
+		FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		ReadableConfig tableOptions = helper.getOptions();
+
+		EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat = helper.discoverEncodingFormat(
+				SerializationFormatFactory.class,
+				FactoryUtil.KEY_FORMAT);
+		EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat = helper.discoverEncodingFormat(
+				SerializationFormatFactory.class,
+				FactoryUtil.VALUE_FORMAT);
+
+		// Validate the option data type.
+		helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+		TableSchema schema = context.getCatalogTable().getSchema();
+		TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(schema);
+		List<String> physicalColumns = Arrays.asList(physicalSchema.getFieldNames());
+		validateTableSinkOptions(
+				tableOptions,
+				keyEncodingFormat,
+				valueEncodingFormat,
+				schema);
+
+		int[] keyProjection = schema.getPrimaryKey().get().getColumns().stream()
+				.filter(name -> physicalSchema.getTableColumn(name).isPresent())
+				.mapToInt(physicalColumns::indexOf)
+				.toArray();
+		int[] valueProjection = getValueProjection(tableOptions, keyProjection, physicalColumns.size());
+		final String keyPrefix = tableOptions.getOptional(FactoryUtil.KEY_FIELDS_PREFIX).orElse(null);
+		// use {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
+		// it will use hash partition if key is set else in round-robin behaviour.
+		final Optional<FlinkKafkaPartitioner<RowData>> partitioner = Optional.empty();
+
+		return new KafkaDynamicSink(
+				schema.toPhysicalRowDataType(),
+				SINK_CHANGELOG_MODE,
+				keyEncodingFormat,
+				valueEncodingFormat,
+				keyProjection,
+				valueProjection,
+				keyPrefix,
+				tableOptions.get(TOPIC).get(0),
+				getBootstrapServers(tableOptions),
+				partitioner,
+				getSinkSemantic(KafkaOptions.SINK_SEMANTIC_VALUE_AT_LEAST_ONCE)
+		);
+
+	}
+
+	@Override
+	public String factoryIdentifier() {
+		return IDENTIFIER;
+	}
+
+	@Override
+	public Set<ConfigOption<?>> requiredOptions() {
+		final Set<ConfigOption<?>> options = new HashSet<>();
+		options.add(PROPS_BOOTSTRAP_SERVERS);
+		options.add(FactoryUtil.KEY_FORMAT);
+		options.add(FactoryUtil.VALUE_FORMAT);
+		return options;
+	}
+
+	@Override
+	public Set<ConfigOption<?>> optionalOptions() {
+		final Set<ConfigOption<?>> options = new HashSet<>();
+		options.add(FactoryUtil.KEY_FIELDS_PREFIX);
+		options.add(KafkaOptions.TOPIC);
+		options.add(KafkaOptions.TOPIC_PATTERN);

Review comment:
       If we are introducing a new connector, I prefer to not support this option in the first version. It is not very useful and may be error-prone in upsert mode. 

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.types.RowKind;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPERTIES_PREFIX;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_BOOTSTRAP_SERVERS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.TOPIC;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getSinkSemantic;
+
+/**
+ * Upsert-Kafka factory.
+ */
+public class UpsertKafkaDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+	public static final String IDENTIFIER = "upsert-kafka";
+
+	public static final ChangelogMode SOURCE_CHANGELOG_MODE = ChangelogMode.newBuilder()
+			.addContainedKind(RowKind.UPDATE_AFTER)
+			.addContainedKind(RowKind.DELETE)
+			.build();
+
+	public static final ChangelogMode SINK_CHANGELOG_MODE = ChangelogMode.newBuilder()
+			.addContainedKind(RowKind.INSERT)
+			.addContainedKind(RowKind.UPDATE_AFTER)
+			.addContainedKind(RowKind.DELETE)
+			.build();
+
+	@Override
+	public DynamicTableSource createDynamicTableSource(Context context) {
+		FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		ReadableConfig tableOptions = helper.getOptions();
+		DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat = helper.discoverDecodingFormat(
+				DeserializationFormatFactory.class,
+				FactoryUtil.KEY_FORMAT);
+		DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat = helper.discoverDecodingFormat(
+				DeserializationFormatFactory.class,
+				FactoryUtil.VALUE_FORMAT);
+
+		// Validate the option data type.
+		helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+		TableSchema schema = context.getCatalogTable().getSchema();
+		TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(schema);
+		List<String> physicalColumns = Arrays.asList(physicalSchema.getFieldNames());
+		validateTableSourceOptions(
+				tableOptions,
+				keyDecodingFormat,
+				valueDecodingFormat,
+				schema);
+
+		int[] keyProjection = schema.getPrimaryKey().get().getColumns().stream()
+				.filter(name -> physicalSchema.getTableColumn(name).isPresent())
+				.mapToInt(physicalColumns::indexOf)
+				.toArray();
+		int[] valueProjection = getValueProjection(tableOptions, keyProjection, physicalColumns.size());
+		final String keyPrefix = tableOptions.getOptional(FactoryUtil.KEY_FIELDS_PREFIX).orElse(null);
+		// always use earliest to keep data integrity
+		final StartupMode earliest = StartupMode.EARLIEST;
+
+		return new KafkaDynamicSource(
+				schema.toPhysicalRowDataType(),
+				SOURCE_CHANGELOG_MODE,
+				keyDecodingFormat,
+				valueDecodingFormat,
+				keyProjection,
+				valueProjection,
+				keyPrefix,
+				KafkaOptions.getSourceTopics(tableOptions),
+				KafkaOptions.getSourceTopicPattern(tableOptions),
+				getBootstrapServers(tableOptions),
+				earliest,
+				Collections.emptyMap(),
+				0);
+	}
+
+	@Override
+	public DynamicTableSink createDynamicTableSink(Context context) {
+		FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		ReadableConfig tableOptions = helper.getOptions();
+
+		EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat = helper.discoverEncodingFormat(
+				SerializationFormatFactory.class,
+				FactoryUtil.KEY_FORMAT);
+		EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat = helper.discoverEncodingFormat(
+				SerializationFormatFactory.class,
+				FactoryUtil.VALUE_FORMAT);
+
+		// Validate the option data type.
+		helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+		TableSchema schema = context.getCatalogTable().getSchema();
+		TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(schema);
+		List<String> physicalColumns = Arrays.asList(physicalSchema.getFieldNames());
+		validateTableSinkOptions(
+				tableOptions,
+				keyEncodingFormat,
+				valueEncodingFormat,
+				schema);
+
+		int[] keyProjection = schema.getPrimaryKey().get().getColumns().stream()
+				.filter(name -> physicalSchema.getTableColumn(name).isPresent())
+				.mapToInt(physicalColumns::indexOf)
+				.toArray();
+		int[] valueProjection = getValueProjection(tableOptions, keyProjection, physicalColumns.size());
+		final String keyPrefix = tableOptions.getOptional(FactoryUtil.KEY_FIELDS_PREFIX).orElse(null);
+		// use {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
+		// it will use hash partition if key is set else in round-robin behaviour.
+		final Optional<FlinkKafkaPartitioner<RowData>> partitioner = Optional.empty();
+
+		return new KafkaDynamicSink(
+				schema.toPhysicalRowDataType(),
+				SINK_CHANGELOG_MODE,
+				keyEncodingFormat,
+				valueEncodingFormat,
+				keyProjection,
+				valueProjection,
+				keyPrefix,
+				tableOptions.get(TOPIC).get(0),
+				getBootstrapServers(tableOptions),
+				partitioner,
+				getSinkSemantic(KafkaOptions.SINK_SEMANTIC_VALUE_AT_LEAST_ONCE)
+		);
+
+	}
+
+	@Override
+	public String factoryIdentifier() {
+		return IDENTIFIER;
+	}
+
+	@Override
+	public Set<ConfigOption<?>> requiredOptions() {
+		final Set<ConfigOption<?>> options = new HashSet<>();
+		options.add(PROPS_BOOTSTRAP_SERVERS);
+		options.add(FactoryUtil.KEY_FORMAT);
+		options.add(FactoryUtil.VALUE_FORMAT);
+		return options;
+	}
+
+	@Override
+	public Set<ConfigOption<?>> optionalOptions() {
+		final Set<ConfigOption<?>> options = new HashSet<>();
+		options.add(FactoryUtil.KEY_FIELDS_PREFIX);
+		options.add(KafkaOptions.TOPIC);
+		options.add(KafkaOptions.TOPIC_PATTERN);
+		options.add(FactoryUtil.VALUE_FIELDS_INCLUDE);
+		return options;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Validation
+	// --------------------------------------------------------------------------------------------
+
+	private static void validateTableSourceOptions(
+			ReadableConfig tableOptions,
+			DecodingFormat<DeserializationSchema<RowData>> keyFormat,
+			DecodingFormat<DeserializationSchema<RowData>> valueFormat,
+			TableSchema schema) {
+		KafkaOptions.validateSourceTopic(tableOptions);
+		if (keyFormat.getChangelogMode().contains(RowKind.UPDATE_AFTER) ||
+				valueFormat.getChangelogMode().contains(RowKind.UPDATE_AFTER)) {
+			throw new ValidationException(
+					"Currently 'upsert-kafka' connector as source only supports insert-only decoding format.");
+		}
+		validatePKConstraints(schema);
+	}
+
+	private static void validateTableSinkOptions(
+			ReadableConfig tableOptions,
+			EncodingFormat<SerializationSchema<RowData>> keyFormat,
+			EncodingFormat<SerializationSchema<RowData>> valueFormat,
+			TableSchema schema) {
+		KafkaOptions.validateSinkTopic(tableOptions);
+		if (keyFormat.getChangelogMode().contains(RowKind.UPDATE_AFTER) ||
+				valueFormat.getChangelogMode().contains(RowKind.UPDATE_AFTER)) {
+			throw new ValidationException(
+					"Currently 'upsert-kafka' connector as sink only supports insert-only encoding format.");
+		}
+		validatePKConstraints(schema);
+	}
+
+	private static void validatePKConstraints(TableSchema schema) {
+		if (!schema.getPrimaryKey().isPresent()) {
+			throw new ValidationException("PK constraints must be defined in the 'upsert-kafka' connector.");

Review comment:
       ```suggestion
   			throw new ValidationException(
   				"'upsert-kafka' tables require to define a PRIMARY KEY constraint. " +
   					"The PRIMARY KEY specifies which columns should be read from or write to the Kafka message key. " +
   					"The PRIMARY KEY also defines records in the 'upsert-kafka' table should update or delete on which keys.");
   ```

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.types.RowKind;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPERTIES_PREFIX;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_BOOTSTRAP_SERVERS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.TOPIC;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getSinkSemantic;
+
+/**
+ * Upsert-Kafka factory.
+ */
+public class UpsertKafkaDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+	public static final String IDENTIFIER = "upsert-kafka";
+
+	public static final ChangelogMode SOURCE_CHANGELOG_MODE = ChangelogMode.newBuilder()
+			.addContainedKind(RowKind.UPDATE_AFTER)
+			.addContainedKind(RowKind.DELETE)
+			.build();
+
+	public static final ChangelogMode SINK_CHANGELOG_MODE = ChangelogMode.newBuilder()
+			.addContainedKind(RowKind.INSERT)
+			.addContainedKind(RowKind.UPDATE_AFTER)
+			.addContainedKind(RowKind.DELETE)
+			.build();
+
+	@Override
+	public DynamicTableSource createDynamicTableSource(Context context) {
+		FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		ReadableConfig tableOptions = helper.getOptions();
+		DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat = helper.discoverDecodingFormat(
+				DeserializationFormatFactory.class,
+				FactoryUtil.KEY_FORMAT);
+		DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat = helper.discoverDecodingFormat(
+				DeserializationFormatFactory.class,
+				FactoryUtil.VALUE_FORMAT);
+
+		// Validate the option data type.
+		helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+		TableSchema schema = context.getCatalogTable().getSchema();
+		TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(schema);
+		List<String> physicalColumns = Arrays.asList(physicalSchema.getFieldNames());
+		validateTableSourceOptions(
+				tableOptions,
+				keyDecodingFormat,
+				valueDecodingFormat,
+				schema);
+
+		int[] keyProjection = schema.getPrimaryKey().get().getColumns().stream()
+				.filter(name -> physicalSchema.getTableColumn(name).isPresent())
+				.mapToInt(physicalColumns::indexOf)
+				.toArray();
+		int[] valueProjection = getValueProjection(tableOptions, keyProjection, physicalColumns.size());
+		final String keyPrefix = tableOptions.getOptional(FactoryUtil.KEY_FIELDS_PREFIX).orElse(null);
+		// always use earliest to keep data integrity
+		final StartupMode earliest = StartupMode.EARLIEST;
+
+		return new KafkaDynamicSource(
+				schema.toPhysicalRowDataType(),
+				SOURCE_CHANGELOG_MODE,
+				keyDecodingFormat,
+				valueDecodingFormat,
+				keyProjection,
+				valueProjection,
+				keyPrefix,
+				KafkaOptions.getSourceTopics(tableOptions),
+				KafkaOptions.getSourceTopicPattern(tableOptions),
+				getBootstrapServers(tableOptions),
+				earliest,
+				Collections.emptyMap(),
+				0);
+	}
+
+	@Override
+	public DynamicTableSink createDynamicTableSink(Context context) {
+		FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		ReadableConfig tableOptions = helper.getOptions();
+
+		EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat = helper.discoverEncodingFormat(
+				SerializationFormatFactory.class,
+				FactoryUtil.KEY_FORMAT);
+		EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat = helper.discoverEncodingFormat(
+				SerializationFormatFactory.class,
+				FactoryUtil.VALUE_FORMAT);
+
+		// Validate the option data type.
+		helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+		TableSchema schema = context.getCatalogTable().getSchema();
+		TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(schema);
+		List<String> physicalColumns = Arrays.asList(physicalSchema.getFieldNames());
+		validateTableSinkOptions(
+				tableOptions,
+				keyEncodingFormat,
+				valueEncodingFormat,
+				schema);
+
+		int[] keyProjection = schema.getPrimaryKey().get().getColumns().stream()
+				.filter(name -> physicalSchema.getTableColumn(name).isPresent())
+				.mapToInt(physicalColumns::indexOf)
+				.toArray();
+		int[] valueProjection = getValueProjection(tableOptions, keyProjection, physicalColumns.size());
+		final String keyPrefix = tableOptions.getOptional(FactoryUtil.KEY_FIELDS_PREFIX).orElse(null);
+		// use {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
+		// it will use hash partition if key is set else in round-robin behaviour.
+		final Optional<FlinkKafkaPartitioner<RowData>> partitioner = Optional.empty();
+
+		return new KafkaDynamicSink(
+				schema.toPhysicalRowDataType(),
+				SINK_CHANGELOG_MODE,
+				keyEncodingFormat,
+				valueEncodingFormat,
+				keyProjection,
+				valueProjection,
+				keyPrefix,
+				tableOptions.get(TOPIC).get(0),
+				getBootstrapServers(tableOptions),
+				partitioner,
+				getSinkSemantic(KafkaOptions.SINK_SEMANTIC_VALUE_AT_LEAST_ONCE)
+		);
+
+	}
+
+	@Override
+	public String factoryIdentifier() {
+		return IDENTIFIER;
+	}
+
+	@Override
+	public Set<ConfigOption<?>> requiredOptions() {
+		final Set<ConfigOption<?>> options = new HashSet<>();
+		options.add(PROPS_BOOTSTRAP_SERVERS);
+		options.add(FactoryUtil.KEY_FORMAT);
+		options.add(FactoryUtil.VALUE_FORMAT);
+		return options;
+	}
+
+	@Override
+	public Set<ConfigOption<?>> optionalOptions() {
+		final Set<ConfigOption<?>> options = new HashSet<>();
+		options.add(FactoryUtil.KEY_FIELDS_PREFIX);
+		options.add(KafkaOptions.TOPIC);
+		options.add(KafkaOptions.TOPIC_PATTERN);
+		options.add(FactoryUtil.VALUE_FIELDS_INCLUDE);
+		return options;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Validation
+	// --------------------------------------------------------------------------------------------
+
+	private static void validateTableSourceOptions(
+			ReadableConfig tableOptions,
+			DecodingFormat<DeserializationSchema<RowData>> keyFormat,
+			DecodingFormat<DeserializationSchema<RowData>> valueFormat,
+			TableSchema schema) {
+		KafkaOptions.validateSourceTopic(tableOptions);
+		if (keyFormat.getChangelogMode().contains(RowKind.UPDATE_AFTER) ||
+				valueFormat.getChangelogMode().contains(RowKind.UPDATE_AFTER)) {
+			throw new ValidationException(
+					"Currently 'upsert-kafka' connector as source only supports insert-only decoding format.");
+		}

Review comment:
       ```suggestion
   		if (!keyFormat.getChangelogMode().containsOnly(RowKind.INSERT)) {
   			String identifier = tableOptions.get(FactoryUtil.KEY_FORMAT);
   			throw new ValidationException(String.format(
   				"'upsert-kafka' connector doesn't support '%s' as key format, " +
   					"because '%s' is not in insert-only mode.",
   				identifier,
   				identifier));
   		}
   		if (!valueFormat.getChangelogMode().containsOnly(RowKind.INSERT)) {
   			String identifier = tableOptions.get(FactoryUtil.VALUE_FORMAT);
   			throw new ValidationException(String.format(
   				"'upsert-kafka' connector doesn't support '%s' as value format, " +
   					"because '%s' is not in insert-only mode.",
   				identifier,
   				identifier));
   		}
   ```

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.types.RowKind;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPERTIES_PREFIX;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_BOOTSTRAP_SERVERS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.TOPIC;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getSinkSemantic;
+
+/**
+ * Upsert-Kafka factory.
+ */
+public class UpsertKafkaDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+	public static final String IDENTIFIER = "upsert-kafka";
+
+	public static final ChangelogMode SOURCE_CHANGELOG_MODE = ChangelogMode.newBuilder()
+			.addContainedKind(RowKind.UPDATE_AFTER)
+			.addContainedKind(RowKind.DELETE)
+			.build();
+
+	public static final ChangelogMode SINK_CHANGELOG_MODE = ChangelogMode.newBuilder()
+			.addContainedKind(RowKind.INSERT)
+			.addContainedKind(RowKind.UPDATE_AFTER)
+			.addContainedKind(RowKind.DELETE)
+			.build();
+
+	@Override
+	public DynamicTableSource createDynamicTableSource(Context context) {
+		FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		ReadableConfig tableOptions = helper.getOptions();
+		DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat = helper.discoverDecodingFormat(
+				DeserializationFormatFactory.class,
+				FactoryUtil.KEY_FORMAT);
+		DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat = helper.discoverDecodingFormat(
+				DeserializationFormatFactory.class,
+				FactoryUtil.VALUE_FORMAT);
+
+		// Validate the option data type.
+		helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+		TableSchema schema = context.getCatalogTable().getSchema();
+		TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(schema);
+		List<String> physicalColumns = Arrays.asList(physicalSchema.getFieldNames());
+		validateTableSourceOptions(
+				tableOptions,
+				keyDecodingFormat,
+				valueDecodingFormat,
+				schema);
+
+		int[] keyProjection = schema.getPrimaryKey().get().getColumns().stream()
+				.filter(name -> physicalSchema.getTableColumn(name).isPresent())
+				.mapToInt(physicalColumns::indexOf)
+				.toArray();
+		int[] valueProjection = getValueProjection(tableOptions, keyProjection, physicalColumns.size());
+		final String keyPrefix = tableOptions.getOptional(FactoryUtil.KEY_FIELDS_PREFIX).orElse(null);
+		// always use earliest to keep data integrity
+		final StartupMode earliest = StartupMode.EARLIEST;
+
+		return new KafkaDynamicSource(
+				schema.toPhysicalRowDataType(),
+				SOURCE_CHANGELOG_MODE,
+				keyDecodingFormat,
+				valueDecodingFormat,
+				keyProjection,
+				valueProjection,
+				keyPrefix,
+				KafkaOptions.getSourceTopics(tableOptions),
+				KafkaOptions.getSourceTopicPattern(tableOptions),
+				getBootstrapServers(tableOptions),
+				earliest,
+				Collections.emptyMap(),
+				0);
+	}
+
+	@Override
+	public DynamicTableSink createDynamicTableSink(Context context) {
+		FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		ReadableConfig tableOptions = helper.getOptions();
+
+		EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat = helper.discoverEncodingFormat(
+				SerializationFormatFactory.class,
+				FactoryUtil.KEY_FORMAT);
+		EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat = helper.discoverEncodingFormat(
+				SerializationFormatFactory.class,
+				FactoryUtil.VALUE_FORMAT);
+
+		// Validate the option data type.
+		helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+		TableSchema schema = context.getCatalogTable().getSchema();
+		TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(schema);
+		List<String> physicalColumns = Arrays.asList(physicalSchema.getFieldNames());
+		validateTableSinkOptions(
+				tableOptions,
+				keyEncodingFormat,
+				valueEncodingFormat,
+				schema);
+
+		int[] keyProjection = schema.getPrimaryKey().get().getColumns().stream()
+				.filter(name -> physicalSchema.getTableColumn(name).isPresent())
+				.mapToInt(physicalColumns::indexOf)
+				.toArray();
+		int[] valueProjection = getValueProjection(tableOptions, keyProjection, physicalColumns.size());
+		final String keyPrefix = tableOptions.getOptional(FactoryUtil.KEY_FIELDS_PREFIX).orElse(null);
+		// use {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
+		// it will use hash partition if key is set else in round-robin behaviour.
+		final Optional<FlinkKafkaPartitioner<RowData>> partitioner = Optional.empty();
+
+		return new KafkaDynamicSink(
+				schema.toPhysicalRowDataType(),
+				SINK_CHANGELOG_MODE,
+				keyEncodingFormat,
+				valueEncodingFormat,
+				keyProjection,
+				valueProjection,
+				keyPrefix,
+				tableOptions.get(TOPIC).get(0),
+				getBootstrapServers(tableOptions),
+				partitioner,
+				getSinkSemantic(KafkaOptions.SINK_SEMANTIC_VALUE_AT_LEAST_ONCE)
+		);
+
+	}
+
+	@Override
+	public String factoryIdentifier() {
+		return IDENTIFIER;
+	}
+
+	@Override
+	public Set<ConfigOption<?>> requiredOptions() {
+		final Set<ConfigOption<?>> options = new HashSet<>();
+		options.add(PROPS_BOOTSTRAP_SERVERS);
+		options.add(FactoryUtil.KEY_FORMAT);
+		options.add(FactoryUtil.VALUE_FORMAT);
+		return options;
+	}
+
+	@Override
+	public Set<ConfigOption<?>> optionalOptions() {
+		final Set<ConfigOption<?>> options = new HashSet<>();
+		options.add(FactoryUtil.KEY_FIELDS_PREFIX);
+		options.add(KafkaOptions.TOPIC);
+		options.add(KafkaOptions.TOPIC_PATTERN);
+		options.add(FactoryUtil.VALUE_FIELDS_INCLUDE);
+		return options;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Validation
+	// --------------------------------------------------------------------------------------------
+
+	private static void validateTableSourceOptions(
+			ReadableConfig tableOptions,
+			DecodingFormat<DeserializationSchema<RowData>> keyFormat,
+			DecodingFormat<DeserializationSchema<RowData>> valueFormat,
+			TableSchema schema) {
+		KafkaOptions.validateSourceTopic(tableOptions);
+		if (keyFormat.getChangelogMode().contains(RowKind.UPDATE_AFTER) ||

Review comment:
       keyFormat and valueFormat must both be `insertOnly`, you can use this to judge `keyFormat.getChangelogMode().containsOnly(RowKind.INSERT)`.

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -343,6 +380,92 @@ public static void validateUnconsumedKeys(
 		}
 	}
 
+	/**
+	 * Creates an array of indices that determine which physical fields of the table schema to include
+	 * in the key format and the order that those fields have in the key format.
+	 *
+	 * <p>See {@link #KEY_FORMAT} and {@link #KEY_FIELDS} for more information.
+	 */
+	public static int[] createKeyFormatProjection(ReadableConfig options, DataType physicalDataType) {
+		final LogicalType physicalType = physicalDataType.getLogicalType();
+		Preconditions.checkArgument(
+			hasRoot(physicalType, LogicalTypeRoot.ROW),
+			"Row data type expected.");
+		final Optional<String> optionalKeyFormat = options.getOptional(KEY_FORMAT);
+		final Optional<List<String>> optionalKeyFields = options.getOptional(KEY_FIELDS);
+
+		if (!optionalKeyFormat.isPresent() && optionalKeyFields.isPresent()) {
+			throw new ValidationException(
+					String.format(
+							"The option '%s' can only be declared if a key format is defined using '%s'.",
+							KEY_FIELDS.key(),
+							KEY_FORMAT.key()
+					)
+			);
+		} else if (optionalKeyFormat.isPresent() &&
+				(!optionalKeyFields.isPresent() || optionalKeyFields.get().size() == 0)) {
+			throw new ValidationException(
+					String.format(
+							"A key format '%s' requires the declaration of one or more of key fields using '%s'.",
+							KEY_FORMAT.key(),
+							KEY_FIELDS.key()
+					)
+			);
+		}
+
+		if (!optionalKeyFormat.isPresent()) {
+			return new int[0];
+		}
+
+		final List<String> keyFields = optionalKeyFields.get();
+		final List<String> physicalFields = LogicalTypeChecks.getFieldNames(physicalType);
+		return keyFields.stream()
+				.mapToInt(keyField -> {
+					final int pos = physicalFields.indexOf(keyField);
+					if (pos < 0) {
+						throw new ValidationException(
+								String.format(
+										"Could not find the field '%s' in the table schema for usage in the key format. "
+												+ "A key field must be a regular, physical column. "
+												+ "The following columns can be selected in the '%s' option:\n"
+												+ "%s",
+										keyField,
+										KEY_FIELDS.key(),
+										physicalFields
+								)
+						);
+					}
+					return pos;
+				})
+			.toArray();

Review comment:
       We can extract this logic and can be reused for key projection from primary key information.

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -343,6 +380,92 @@ public static void validateUnconsumedKeys(
 		}
 	}
 
+	/**
+	 * Creates an array of indices that determine which physical fields of the table schema to include
+	 * in the key format and the order that those fields have in the key format.
+	 *
+	 * <p>See {@link #KEY_FORMAT} and {@link #KEY_FIELDS} for more information.
+	 */
+	public static int[] createKeyFormatProjection(ReadableConfig options, DataType physicalDataType) {

Review comment:
       I don't why Timo put these methods and options in a common `FactoryUtil`. IMO, they should belong to `KafkaOptions` which is a very Kafka specific feature.

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -343,6 +380,92 @@ public static void validateUnconsumedKeys(
 		}
 	}
 
+	/**
+	 * Creates an array of indices that determine which physical fields of the table schema to include
+	 * in the key format and the order that those fields have in the key format.
+	 *
+	 * <p>See {@link #KEY_FORMAT} and {@link #KEY_FIELDS} for more information.
+	 */
+	public static int[] createKeyFormatProjection(ReadableConfig options, DataType physicalDataType) {
+		final LogicalType physicalType = physicalDataType.getLogicalType();
+		Preconditions.checkArgument(
+			hasRoot(physicalType, LogicalTypeRoot.ROW),
+			"Row data type expected.");
+		final Optional<String> optionalKeyFormat = options.getOptional(KEY_FORMAT);
+		final Optional<List<String>> optionalKeyFields = options.getOptional(KEY_FIELDS);
+
+		if (!optionalKeyFormat.isPresent() && optionalKeyFields.isPresent()) {
+			throw new ValidationException(
+					String.format(
+							"The option '%s' can only be declared if a key format is defined using '%s'.",
+							KEY_FIELDS.key(),
+							KEY_FORMAT.key()
+					)
+			);
+		} else if (optionalKeyFormat.isPresent() &&
+				(!optionalKeyFields.isPresent() || optionalKeyFields.get().size() == 0)) {
+			throw new ValidationException(
+					String.format(
+							"A key format '%s' requires the declaration of one or more of key fields using '%s'.",
+							KEY_FORMAT.key(),
+							KEY_FIELDS.key()
+					)
+			);
+		}
+
+		if (!optionalKeyFormat.isPresent()) {
+			return new int[0];
+		}
+
+		final List<String> keyFields = optionalKeyFields.get();
+		final List<String> physicalFields = LogicalTypeChecks.getFieldNames(physicalType);
+		return keyFields.stream()
+				.mapToInt(keyField -> {
+					final int pos = physicalFields.indexOf(keyField);
+					if (pos < 0) {
+						throw new ValidationException(
+								String.format(
+										"Could not find the field '%s' in the table schema for usage in the key format. "
+												+ "A key field must be a regular, physical column. "
+												+ "The following columns can be selected in the '%s' option:\n"
+												+ "%s",
+										keyField,
+										KEY_FIELDS.key(),
+										physicalFields
+								)
+						);
+					}
+					return pos;
+				})
+			.toArray();
+	}
+
+	/**
+	 * Creates an array of indices that determine which physical fields of the table schema to include
+	 * in the value format.
+	 *
+	 * <p>See {@link #VALUE_FORMAT} and {@link #VALUE_FIELDS_INCLUDE} for more information.
+	 */
+	public static int[] createValueFormatProjection(ReadableConfig options, DataType physicalDataType) {

Review comment:
       We can have an overload method `createValueFormatProjection(ReadableConfig options, DataType physicalDataType, Optional<UniqueConstraint> primary key)` to reuse code in this method. 

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -589,7 +712,7 @@ public ReadableConfig getOptions() {
 
 		private String formatPrefix(Factory formatFactory, ConfigOption<String> formatOption) {
 			String identifier = formatFactory.factoryIdentifier();
-			if (formatOption.key().equals(FORMAT_KEY)) {
+			if (formatOption.key().equals(FORMAT_KEY) || formatOption.equals(KEY_FORMAT) || formatOption.equals(VALUE_FORMAT)) {

Review comment:
       We don't need to add this condition. This has been handled by the following condition. 
   
   If the format key is prefixed with `key.` or `value.`, then all the format options should be prefixed with the `key.` or `value.`.
   
   ```
   'value.format' = 'csv',
   'value.csv.ignore-parse-error' = 'true'
   ```

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.types.RowKind;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPERTIES_PREFIX;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_BOOTSTRAP_SERVERS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.TOPIC;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getSinkSemantic;
+
+/**
+ * Upsert-Kafka factory.
+ */
+public class UpsertKafkaDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+	public static final String IDENTIFIER = "upsert-kafka";
+
+	public static final ChangelogMode SOURCE_CHANGELOG_MODE = ChangelogMode.newBuilder()
+			.addContainedKind(RowKind.UPDATE_AFTER)
+			.addContainedKind(RowKind.DELETE)
+			.build();
+
+	public static final ChangelogMode SINK_CHANGELOG_MODE = ChangelogMode.newBuilder()
+			.addContainedKind(RowKind.INSERT)
+			.addContainedKind(RowKind.UPDATE_AFTER)
+			.addContainedKind(RowKind.DELETE)
+			.build();
+
+	@Override
+	public DynamicTableSource createDynamicTableSource(Context context) {
+		FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		ReadableConfig tableOptions = helper.getOptions();
+		DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat = helper.discoverDecodingFormat(
+				DeserializationFormatFactory.class,
+				FactoryUtil.KEY_FORMAT);
+		DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat = helper.discoverDecodingFormat(
+				DeserializationFormatFactory.class,
+				FactoryUtil.VALUE_FORMAT);
+
+		// Validate the option data type.
+		helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+		TableSchema schema = context.getCatalogTable().getSchema();
+		TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(schema);
+		List<String> physicalColumns = Arrays.asList(physicalSchema.getFieldNames());
+		validateTableSourceOptions(
+				tableOptions,
+				keyDecodingFormat,
+				valueDecodingFormat,
+				schema);
+
+		int[] keyProjection = schema.getPrimaryKey().get().getColumns().stream()
+				.filter(name -> physicalSchema.getTableColumn(name).isPresent())
+				.mapToInt(physicalColumns::indexOf)
+				.toArray();
+		int[] valueProjection = getValueProjection(tableOptions, keyProjection, physicalColumns.size());
+		final String keyPrefix = tableOptions.getOptional(FactoryUtil.KEY_FIELDS_PREFIX).orElse(null);
+		// always use earliest to keep data integrity
+		final StartupMode earliest = StartupMode.EARLIEST;
+
+		return new KafkaDynamicSource(
+				schema.toPhysicalRowDataType(),
+				SOURCE_CHANGELOG_MODE,
+				keyDecodingFormat,
+				valueDecodingFormat,
+				keyProjection,
+				valueProjection,
+				keyPrefix,
+				KafkaOptions.getSourceTopics(tableOptions),
+				KafkaOptions.getSourceTopicPattern(tableOptions),
+				getBootstrapServers(tableOptions),
+				earliest,
+				Collections.emptyMap(),
+				0);
+	}
+
+	@Override
+	public DynamicTableSink createDynamicTableSink(Context context) {
+		FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		ReadableConfig tableOptions = helper.getOptions();
+
+		EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat = helper.discoverEncodingFormat(
+				SerializationFormatFactory.class,
+				FactoryUtil.KEY_FORMAT);
+		EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat = helper.discoverEncodingFormat(
+				SerializationFormatFactory.class,
+				FactoryUtil.VALUE_FORMAT);
+
+		// Validate the option data type.
+		helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+		TableSchema schema = context.getCatalogTable().getSchema();
+		TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(schema);
+		List<String> physicalColumns = Arrays.asList(physicalSchema.getFieldNames());
+		validateTableSinkOptions(
+				tableOptions,
+				keyEncodingFormat,
+				valueEncodingFormat,
+				schema);
+
+		int[] keyProjection = schema.getPrimaryKey().get().getColumns().stream()
+				.filter(name -> physicalSchema.getTableColumn(name).isPresent())
+				.mapToInt(physicalColumns::indexOf)
+				.toArray();
+		int[] valueProjection = getValueProjection(tableOptions, keyProjection, physicalColumns.size());
+		final String keyPrefix = tableOptions.getOptional(FactoryUtil.KEY_FIELDS_PREFIX).orElse(null);
+		// use {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
+		// it will use hash partition if key is set else in round-robin behaviour.
+		final Optional<FlinkKafkaPartitioner<RowData>> partitioner = Optional.empty();
+
+		return new KafkaDynamicSink(
+				schema.toPhysicalRowDataType(),
+				SINK_CHANGELOG_MODE,
+				keyEncodingFormat,
+				valueEncodingFormat,
+				keyProjection,
+				valueProjection,
+				keyPrefix,
+				tableOptions.get(TOPIC).get(0),
+				getBootstrapServers(tableOptions),
+				partitioner,
+				getSinkSemantic(KafkaOptions.SINK_SEMANTIC_VALUE_AT_LEAST_ONCE)
+		);
+
+	}
+
+	@Override
+	public String factoryIdentifier() {
+		return IDENTIFIER;
+	}
+
+	@Override
+	public Set<ConfigOption<?>> requiredOptions() {
+		final Set<ConfigOption<?>> options = new HashSet<>();
+		options.add(PROPS_BOOTSTRAP_SERVERS);
+		options.add(FactoryUtil.KEY_FORMAT);
+		options.add(FactoryUtil.VALUE_FORMAT);
+		return options;
+	}
+
+	@Override
+	public Set<ConfigOption<?>> optionalOptions() {
+		final Set<ConfigOption<?>> options = new HashSet<>();
+		options.add(FactoryUtil.KEY_FIELDS_PREFIX);
+		options.add(KafkaOptions.TOPIC);
+		options.add(KafkaOptions.TOPIC_PATTERN);
+		options.add(FactoryUtil.VALUE_FIELDS_INCLUDE);
+		return options;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Validation
+	// --------------------------------------------------------------------------------------------
+
+	private static void validateTableSourceOptions(
+			ReadableConfig tableOptions,
+			DecodingFormat<DeserializationSchema<RowData>> keyFormat,
+			DecodingFormat<DeserializationSchema<RowData>> valueFormat,
+			TableSchema schema) {
+		KafkaOptions.validateSourceTopic(tableOptions);
+		if (keyFormat.getChangelogMode().contains(RowKind.UPDATE_AFTER) ||
+				valueFormat.getChangelogMode().contains(RowKind.UPDATE_AFTER)) {
+			throw new ValidationException(
+					"Currently 'upsert-kafka' connector as source only supports insert-only decoding format.");
+		}
+		validatePKConstraints(schema);
+	}
+
+	private static void validateTableSinkOptions(
+			ReadableConfig tableOptions,
+			EncodingFormat<SerializationSchema<RowData>> keyFormat,
+			EncodingFormat<SerializationSchema<RowData>> valueFormat,
+			TableSchema schema) {
+		KafkaOptions.validateSinkTopic(tableOptions);
+		if (keyFormat.getChangelogMode().contains(RowKind.UPDATE_AFTER) ||
+				valueFormat.getChangelogMode().contains(RowKind.UPDATE_AFTER)) {
+			throw new ValidationException(
+					"Currently 'upsert-kafka' connector as sink only supports insert-only encoding format.");
+		}
+		validatePKConstraints(schema);
+	}
+
+	private static void validatePKConstraints(TableSchema schema) {
+		if (!schema.getPrimaryKey().isPresent()) {
+			throw new ValidationException("PK constraints must be defined in the 'upsert-kafka' connector.");
+		}
+
+		for (String key: schema.getPrimaryKey().get().getColumns()) {
+			if (!schema.getTableColumn(key).get().isPhysical()) {
+				throw new ValidationException(

Review comment:
       This will be validated by the planner. We don't need to validate again in connector. 

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.types.RowKind;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPERTIES_PREFIX;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_BOOTSTRAP_SERVERS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.TOPIC;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getSinkSemantic;
+
+/**
+ * Upsert-Kafka factory.
+ */
+public class UpsertKafkaDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+	public static final String IDENTIFIER = "upsert-kafka";
+
+	public static final ChangelogMode SOURCE_CHANGELOG_MODE = ChangelogMode.newBuilder()
+			.addContainedKind(RowKind.UPDATE_AFTER)
+			.addContainedKind(RowKind.DELETE)
+			.build();
+
+	public static final ChangelogMode SINK_CHANGELOG_MODE = ChangelogMode.newBuilder()
+			.addContainedKind(RowKind.INSERT)
+			.addContainedKind(RowKind.UPDATE_AFTER)
+			.addContainedKind(RowKind.DELETE)
+			.build();
+
+	@Override
+	public DynamicTableSource createDynamicTableSource(Context context) {
+		FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		ReadableConfig tableOptions = helper.getOptions();
+		DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat = helper.discoverDecodingFormat(
+				DeserializationFormatFactory.class,
+				FactoryUtil.KEY_FORMAT);
+		DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat = helper.discoverDecodingFormat(
+				DeserializationFormatFactory.class,
+				FactoryUtil.VALUE_FORMAT);
+
+		// Validate the option data type.
+		helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+		TableSchema schema = context.getCatalogTable().getSchema();
+		TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(schema);
+		List<String> physicalColumns = Arrays.asList(physicalSchema.getFieldNames());
+		validateTableSourceOptions(
+				tableOptions,
+				keyDecodingFormat,
+				valueDecodingFormat,
+				schema);
+
+		int[] keyProjection = schema.getPrimaryKey().get().getColumns().stream()
+				.filter(name -> physicalSchema.getTableColumn(name).isPresent())
+				.mapToInt(physicalColumns::indexOf)
+				.toArray();
+		int[] valueProjection = getValueProjection(tableOptions, keyProjection, physicalColumns.size());
+		final String keyPrefix = tableOptions.getOptional(FactoryUtil.KEY_FIELDS_PREFIX).orElse(null);
+		// always use earliest to keep data integrity
+		final StartupMode earliest = StartupMode.EARLIEST;
+
+		return new KafkaDynamicSource(
+				schema.toPhysicalRowDataType(),
+				SOURCE_CHANGELOG_MODE,
+				keyDecodingFormat,
+				valueDecodingFormat,
+				keyProjection,
+				valueProjection,
+				keyPrefix,
+				KafkaOptions.getSourceTopics(tableOptions),
+				KafkaOptions.getSourceTopicPattern(tableOptions),
+				getBootstrapServers(tableOptions),

Review comment:
       Why not use `KafkaOptions#getKafkaProperties`?

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.types.RowKind;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPERTIES_PREFIX;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_BOOTSTRAP_SERVERS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.TOPIC;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getSinkSemantic;
+
+/**
+ * Upsert-Kafka factory.
+ */
+public class UpsertKafkaDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+	public static final String IDENTIFIER = "upsert-kafka";
+
+	public static final ChangelogMode SOURCE_CHANGELOG_MODE = ChangelogMode.newBuilder()
+			.addContainedKind(RowKind.UPDATE_AFTER)
+			.addContainedKind(RowKind.DELETE)
+			.build();
+
+	public static final ChangelogMode SINK_CHANGELOG_MODE = ChangelogMode.newBuilder()
+			.addContainedKind(RowKind.INSERT)
+			.addContainedKind(RowKind.UPDATE_AFTER)
+			.addContainedKind(RowKind.DELETE)
+			.build();
+
+	@Override
+	public DynamicTableSource createDynamicTableSource(Context context) {
+		FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		ReadableConfig tableOptions = helper.getOptions();
+		DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat = helper.discoverDecodingFormat(
+				DeserializationFormatFactory.class,
+				FactoryUtil.KEY_FORMAT);
+		DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat = helper.discoverDecodingFormat(
+				DeserializationFormatFactory.class,
+				FactoryUtil.VALUE_FORMAT);
+
+		// Validate the option data type.
+		helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+		TableSchema schema = context.getCatalogTable().getSchema();
+		TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(schema);
+		List<String> physicalColumns = Arrays.asList(physicalSchema.getFieldNames());
+		validateTableSourceOptions(
+				tableOptions,
+				keyDecodingFormat,
+				valueDecodingFormat,
+				schema);
+
+		int[] keyProjection = schema.getPrimaryKey().get().getColumns().stream()
+				.filter(name -> physicalSchema.getTableColumn(name).isPresent())
+				.mapToInt(physicalColumns::indexOf)
+				.toArray();
+		int[] valueProjection = getValueProjection(tableOptions, keyProjection, physicalColumns.size());
+		final String keyPrefix = tableOptions.getOptional(FactoryUtil.KEY_FIELDS_PREFIX).orElse(null);
+		// always use earliest to keep data integrity
+		final StartupMode earliest = StartupMode.EARLIEST;
+
+		return new KafkaDynamicSource(
+				schema.toPhysicalRowDataType(),
+				SOURCE_CHANGELOG_MODE,
+				keyDecodingFormat,
+				valueDecodingFormat,
+				keyProjection,
+				valueProjection,
+				keyPrefix,
+				KafkaOptions.getSourceTopics(tableOptions),
+				KafkaOptions.getSourceTopicPattern(tableOptions),
+				getBootstrapServers(tableOptions),
+				earliest,
+				Collections.emptyMap(),
+				0);
+	}
+
+	@Override
+	public DynamicTableSink createDynamicTableSink(Context context) {
+		FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		ReadableConfig tableOptions = helper.getOptions();
+
+		EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat = helper.discoverEncodingFormat(
+				SerializationFormatFactory.class,
+				FactoryUtil.KEY_FORMAT);
+		EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat = helper.discoverEncodingFormat(
+				SerializationFormatFactory.class,
+				FactoryUtil.VALUE_FORMAT);
+
+		// Validate the option data type.
+		helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+		TableSchema schema = context.getCatalogTable().getSchema();
+		TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(schema);
+		List<String> physicalColumns = Arrays.asList(physicalSchema.getFieldNames());
+		validateTableSinkOptions(
+				tableOptions,
+				keyEncodingFormat,
+				valueEncodingFormat,
+				schema);
+
+		int[] keyProjection = schema.getPrimaryKey().get().getColumns().stream()
+				.filter(name -> physicalSchema.getTableColumn(name).isPresent())
+				.mapToInt(physicalColumns::indexOf)
+				.toArray();
+		int[] valueProjection = getValueProjection(tableOptions, keyProjection, physicalColumns.size());
+		final String keyPrefix = tableOptions.getOptional(FactoryUtil.KEY_FIELDS_PREFIX).orElse(null);
+		// use {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
+		// it will use hash partition if key is set else in round-robin behaviour.
+		final Optional<FlinkKafkaPartitioner<RowData>> partitioner = Optional.empty();
+
+		return new KafkaDynamicSink(
+				schema.toPhysicalRowDataType(),
+				SINK_CHANGELOG_MODE,
+				keyEncodingFormat,
+				valueEncodingFormat,
+				keyProjection,
+				valueProjection,
+				keyPrefix,
+				tableOptions.get(TOPIC).get(0),
+				getBootstrapServers(tableOptions),
+				partitioner,
+				getSinkSemantic(KafkaOptions.SINK_SEMANTIC_VALUE_AT_LEAST_ONCE)
+		);
+
+	}
+
+	@Override
+	public String factoryIdentifier() {
+		return IDENTIFIER;
+	}
+
+	@Override
+	public Set<ConfigOption<?>> requiredOptions() {
+		final Set<ConfigOption<?>> options = new HashSet<>();
+		options.add(PROPS_BOOTSTRAP_SERVERS);
+		options.add(FactoryUtil.KEY_FORMAT);
+		options.add(FactoryUtil.VALUE_FORMAT);
+		return options;
+	}
+
+	@Override
+	public Set<ConfigOption<?>> optionalOptions() {
+		final Set<ConfigOption<?>> options = new HashSet<>();
+		options.add(FactoryUtil.KEY_FIELDS_PREFIX);
+		options.add(KafkaOptions.TOPIC);
+		options.add(KafkaOptions.TOPIC_PATTERN);
+		options.add(FactoryUtil.VALUE_FIELDS_INCLUDE);
+		return options;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Validation
+	// --------------------------------------------------------------------------------------------
+
+	private static void validateTableSourceOptions(
+			ReadableConfig tableOptions,
+			DecodingFormat<DeserializationSchema<RowData>> keyFormat,
+			DecodingFormat<DeserializationSchema<RowData>> valueFormat,
+			TableSchema schema) {
+		KafkaOptions.validateSourceTopic(tableOptions);
+		if (keyFormat.getChangelogMode().contains(RowKind.UPDATE_AFTER) ||
+				valueFormat.getChangelogMode().contains(RowKind.UPDATE_AFTER)) {
+			throw new ValidationException(
+					"Currently 'upsert-kafka' connector as source only supports insert-only decoding format.");
+		}
+		validatePKConstraints(schema);
+	}
+
+	private static void validateTableSinkOptions(
+			ReadableConfig tableOptions,
+			EncodingFormat<SerializationSchema<RowData>> keyFormat,
+			EncodingFormat<SerializationSchema<RowData>> valueFormat,
+			TableSchema schema) {
+		KafkaOptions.validateSinkTopic(tableOptions);
+		if (keyFormat.getChangelogMode().contains(RowKind.UPDATE_AFTER) ||

Review comment:
       Extract the above changelog mode validation logic and reuse here. 

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -280,8 +280,8 @@ private static void validateSinkSemantic(ReadableConfig tableOptions) {
 	// Utilities
 	// --------------------------------------------------------------------------------------------
 
-	public static KafkaSinkSemantic getSinkSemantic(ReadableConfig tableOptions){
-		switch (tableOptions.get(SINK_SEMANTIC)){
+	public static KafkaSinkSemantic getSinkSemantic(String semantic){

Review comment:
       We don't need to change this. 

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
##########
@@ -105,24 +122,37 @@
 
 	public KafkaDynamicSource(
 			DataType physicalDataType,
+			ChangelogMode changelogMode,
+			@Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat,
+			DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
+			int[] keyProjection,
+			int[] valueProjection,
+			@Nullable String keyPrefix,
 			@Nullable List<String> topics,
 			@Nullable Pattern topicPattern,
 			Properties properties,
-			DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
 			StartupMode startupMode,
 			Map<KafkaTopicPartition, Long> specificStartupOffsets,
 			long startupTimestampMillis) {
+		// Format attributes
 		this.physicalDataType = Preconditions.checkNotNull(physicalDataType, "Physical data type must not be null.");
+		this.keyDecodingFormat = keyDecodingFormat;
+		this.valueDecodingFormat = Preconditions.checkNotNull(
+				valueDecodingFormat, "Value decoding format must not be null.");
+		this.keyProjection = Preconditions.checkNotNull(keyProjection, "Key projection must not be null.");
+		this.valueProjection = Preconditions.checkNotNull(valueProjection, "Value projection must not be null.");
+		this.keyPrefix = keyPrefix;
+		// Mutable attributes
 		this.producedDataType = physicalDataType;
 		this.metadataKeys = Collections.emptyList();
+		this.changelogMode = changelogMode;

Review comment:
       From my point of view, privoding the `ChangelogMode` by invokers is not safe. 
   A more elegant way is providing a wrapper of `DecodingFormat` and `EncodingFormat` by the table factory, e.g. the upsert kafka factory should provide a wrapper which returns upsert changelog mode for `DecodingFormat#getChangelogMode()`. Then there is nothing to change in the `KafkaDynamicSource` and `KafkaDynamicSink`.
   
   I think you need the wrapper anyway, because you need to change the RowKind from `INSERT` to `UPDATE_AFTER`.

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java
##########
@@ -71,16 +88,45 @@ public RowData deserialize(ConsumerRecord<byte[], byte[]> record) throws Excepti
 		throw new IllegalStateException("A collector is required for deserializing.");
 	}
 
+	/**
+	 * There are 4 situations:
+	 * 1. value data only: collector will ignore the deserialization error;
+	 * 2. value data and metadata only:
+	 *   if value fails to deserialize, value decoding format will ignore data;
+	 *   else OutputProjectionCollector will emitRow with key is null;
+	 * 3. key data and value data (no tombstone):
+	 *   if key fails to deserialize and value succeed: OutputProjection ignores the value data;
+	 *   else if key succeed and value fails, value decoding format will ignore;
+	 *   else if both succeed, OutputProjection will emitRow with the key and data.
+	 * 4.key data and value data (tombstone):
+	 *   if key fails, OutputProjection ignores value data;
+	 *   else emitRow with key and null value.
+	 */
 	@Override
 	public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<RowData> collector) throws Exception {
-		// shortcut if no metadata is required
-		if (!hasMetadata) {
+		// shortcut in case no output projection is required,
+		// also not for a cartesian product with the keys
+		if (keyDeserialization == null && !hasMetadata) {
 			valueDeserialization.deserialize(record.value(), collector);
+			return;
+		}
+
+		// buffer key(s)
+		if (keyDeserialization != null) {
+			keyDeserialization.deserialize(record.key(), keyCollector);
+		}
+
+		// project output while emitting values
+		outputCollector.inputRecord = record;
+		outputCollector.physicalKeyRows = keyCollector.buffer;
+		outputCollector.outputCollector = collector;
+		if (record.value() != null) {
+			valueDeserialization.deserialize(record.value(), outputCollector);
 		} else {
-			metadataAppendingCollector.inputRecord = record;
-			metadataAppendingCollector.outputCollector = collector;
-			valueDeserialization.deserialize(record.value(), metadataAppendingCollector);
+			// trigger collect by hand
+			outputCollector.collect(null);

Review comment:
       We should add a flag, e.g. `upsertMode=true` to enable this behavior. Because for normal kafka connector, the tombstone messages should be skipped.

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java
##########
@@ -96,32 +142,107 @@ public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<RowData
 
 	// --------------------------------------------------------------------------------------------
 
-	private static final class MetadataAppendingCollector implements Collector<RowData>, Serializable {
+	private static final class BufferingCollector implements Collector<RowData>, Serializable {

Review comment:
       Add `serialVersionUID`.

##########
File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCaseUtils.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.utils.TableTestMatchers;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.CloseableIterator;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * utils.
+ * */
+public class KafkaTableITCaseUtils {

Review comment:
       Would be better to call `KafkaTableTestUtils`.

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.types.RowKind;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPERTIES_PREFIX;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_BOOTSTRAP_SERVERS;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.TOPIC;
+import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getSinkSemantic;
+
+/**
+ * Upsert-Kafka factory.
+ */
+public class UpsertKafkaDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+	public static final String IDENTIFIER = "upsert-kafka";
+
+	public static final ChangelogMode SOURCE_CHANGELOG_MODE = ChangelogMode.newBuilder()
+			.addContainedKind(RowKind.UPDATE_AFTER)
+			.addContainedKind(RowKind.DELETE)
+			.build();
+
+	public static final ChangelogMode SINK_CHANGELOG_MODE = ChangelogMode.newBuilder()
+			.addContainedKind(RowKind.INSERT)
+			.addContainedKind(RowKind.UPDATE_AFTER)
+			.addContainedKind(RowKind.DELETE)
+			.build();
+
+	@Override
+	public DynamicTableSource createDynamicTableSource(Context context) {
+		FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		ReadableConfig tableOptions = helper.getOptions();
+		DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat = helper.discoverDecodingFormat(
+				DeserializationFormatFactory.class,
+				FactoryUtil.KEY_FORMAT);
+		DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat = helper.discoverDecodingFormat(
+				DeserializationFormatFactory.class,
+				FactoryUtil.VALUE_FORMAT);
+
+		// Validate the option data type.
+		helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+		TableSchema schema = context.getCatalogTable().getSchema();
+		TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(schema);
+		List<String> physicalColumns = Arrays.asList(physicalSchema.getFieldNames());
+		validateTableSourceOptions(
+				tableOptions,
+				keyDecodingFormat,
+				valueDecodingFormat,
+				schema);
+
+		int[] keyProjection = schema.getPrimaryKey().get().getColumns().stream()
+				.filter(name -> physicalSchema.getTableColumn(name).isPresent())
+				.mapToInt(physicalColumns::indexOf)
+				.toArray();
+		int[] valueProjection = getValueProjection(tableOptions, keyProjection, physicalColumns.size());
+		final String keyPrefix = tableOptions.getOptional(FactoryUtil.KEY_FIELDS_PREFIX).orElse(null);
+		// always use earliest to keep data integrity
+		final StartupMode earliest = StartupMode.EARLIEST;
+
+		return new KafkaDynamicSource(
+				schema.toPhysicalRowDataType(),
+				SOURCE_CHANGELOG_MODE,
+				keyDecodingFormat,
+				valueDecodingFormat,
+				keyProjection,
+				valueProjection,
+				keyPrefix,
+				KafkaOptions.getSourceTopics(tableOptions),
+				KafkaOptions.getSourceTopicPattern(tableOptions),
+				getBootstrapServers(tableOptions),
+				earliest,
+				Collections.emptyMap(),
+				0);
+	}
+
+	@Override
+	public DynamicTableSink createDynamicTableSink(Context context) {
+		FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+
+		ReadableConfig tableOptions = helper.getOptions();
+
+		EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat = helper.discoverEncodingFormat(
+				SerializationFormatFactory.class,
+				FactoryUtil.KEY_FORMAT);
+		EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat = helper.discoverEncodingFormat(
+				SerializationFormatFactory.class,
+				FactoryUtil.VALUE_FORMAT);
+
+		// Validate the option data type.
+		helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+		TableSchema schema = context.getCatalogTable().getSchema();
+		TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(schema);
+		List<String> physicalColumns = Arrays.asList(physicalSchema.getFieldNames());
+		validateTableSinkOptions(
+				tableOptions,
+				keyEncodingFormat,
+				valueEncodingFormat,
+				schema);
+
+		int[] keyProjection = schema.getPrimaryKey().get().getColumns().stream()
+				.filter(name -> physicalSchema.getTableColumn(name).isPresent())
+				.mapToInt(physicalColumns::indexOf)
+				.toArray();
+		int[] valueProjection = getValueProjection(tableOptions, keyProjection, physicalColumns.size());
+		final String keyPrefix = tableOptions.getOptional(FactoryUtil.KEY_FIELDS_PREFIX).orElse(null);
+		// use {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
+		// it will use hash partition if key is set else in round-robin behaviour.
+		final Optional<FlinkKafkaPartitioner<RowData>> partitioner = Optional.empty();
+
+		return new KafkaDynamicSink(
+				schema.toPhysicalRowDataType(),
+				SINK_CHANGELOG_MODE,
+				keyEncodingFormat,
+				valueEncodingFormat,
+				keyProjection,
+				valueProjection,
+				keyPrefix,
+				tableOptions.get(TOPIC).get(0),
+				getBootstrapServers(tableOptions),
+				partitioner,
+				getSinkSemantic(KafkaOptions.SINK_SEMANTIC_VALUE_AT_LEAST_ONCE)

Review comment:
       Hard code to use `KafkaSinkSemantic.AT_LEAST_ONCE`.

##########
File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
##########
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.TestFormatFactory;
+import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
+import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.function.Consumer;
+
+import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Test for {@link UpsertKafkaDynamicTableFactory}.
+ */
+public class UpsertKafkaDynamicTableFactoryTest extends TestLogger {
+
+	private static final String SOURCE_TOPICS = "sourceTopic_1;sourceTopic_2";
+
+	private static final List<String> SOURCE_TOPIC_LIST =
+			Arrays.asList("sourceTopic_1", "sourceTopic_2");
+
+	private static final String SINK_TOPIC = "sinkTopic";
+
+	private static final TableSchema SOURCE_SCHEMA = TableSchema.builder()
+			.field("window_start", new AtomicDataType(new VarCharType(false, 100)))
+			.field("region", new AtomicDataType(new VarCharType(false, 100)))

Review comment:
       Use `DataTypes` to create `DataType`.

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java
##########
@@ -96,32 +142,107 @@ public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<RowData
 
 	// --------------------------------------------------------------------------------------------
 
-	private static final class MetadataAppendingCollector implements Collector<RowData>, Serializable {
+	private static final class BufferingCollector implements Collector<RowData>, Serializable {
+
+		private final List<RowData> buffer = new ArrayList<>();
+
+		@Override
+		public void collect(RowData record) {
+			buffer.add(record);
+		}
+
+		@Override
+		public void close() {
+			// nothing to do
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Emits a row with key, value, and metadata fields.
+	 *
+	 * <p>The collector is able to handle the following kinds of keys:
+	 * <ul>
+	 *     <li>No key is used.
+	 *     <li>A key is used.
+	 *     <li>The deserialization schema emits multiple keys.
+	 * </ul>
+	 */
+	private static final class OutputProjectionCollector implements Collector<RowData>, Serializable {
 
 		private static final long serialVersionUID = 1L;
 
+		private final int[] keyProjection;
+
+		private final int[] valueProjection;
+
 		private final MetadataConverter[] metadataConverters;
 
 		private transient ConsumerRecord<?, ?> inputRecord;
 
+		private transient List<RowData> physicalKeyRows;
+
 		private transient Collector<RowData> outputCollector;
 
-		MetadataAppendingCollector(MetadataConverter[] metadataConverters) {
+		OutputProjectionCollector(
+				int[] keyProjection,
+				int[] valueProjection,
+				MetadataConverter[] metadataConverters) {
+			this.keyProjection = keyProjection;
+			this.valueProjection = valueProjection;
 			this.metadataConverters = metadataConverters;
 		}
 
 		@Override
-		public void collect(RowData physicalRow) {
-			final GenericRowData genericPhysicalRow = (GenericRowData) physicalRow;
-			final int physicalArity = physicalRow.getArity();
+		public void collect(RowData physicalValueRow) {
+			// no key defined
+			if (keyProjection.length == 0) {
+				emitRow(null, (GenericRowData) physicalValueRow);
+				return;
+			}
+
+			// emit a value for each key
+			// if parsing key data gets problems, ignore the value data.
+			for (RowData physicalKeyRow : physicalKeyRows) {
+				if (physicalValueRow == null) {
+					physicalKeyRow.setRowKind(RowKind.DELETE);
+				}
+				emitRow((GenericRowData) physicalKeyRow, (GenericRowData) physicalValueRow);
+			}
+		}
+
+		@Override
+		public void close() {
+			// nothing to do
+		}
+
+		/**
+		 * There are 4 situations:
+		 * 1. key is null && value is not null => project and set rowkind = insert
+		 * 2. key is not null && value is not null => project and set rowkind = insert
+		 * 3. key is not null && value is null => project and set rowkind = delete
+		 * 4. key is null && value is null => impossible
+		 * This situation is impossible:
+		 *   keyProjection.length > 0 && key == null
+		 * */
+		private void emitRow(@Nullable GenericRowData physicalKeyRow, @Nullable GenericRowData physicalValueRow) {
 			final int metadataArity = metadataConverters.length;
+			final int physicalArity = keyProjection.length + valueProjection.length;
+			final RowKind rowkind = physicalValueRow == null ? RowKind.DELETE : physicalValueRow.getRowKind();

Review comment:
       Should also under `upsertMode==true`.

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaSerializationSchema.java
##########
@@ -86,39 +98,38 @@ public void open(SerializationSchema.InitializationContext context) throws Excep
 
 	@Override
 	public ProducerRecord<byte[], byte[]> serialize(RowData consumedRow, @Nullable Long timestamp) {
-		final RowData physicalRow;
-		// shortcut if no metadata is required
-		if (!hasMetadata) {
-			physicalRow = consumedRow;
-		} else {
-			final int physicalArity = physicalFieldGetters.length;
-			final GenericRowData genericRowData = new GenericRowData(
-					consumedRow.getRowKind(),
-					physicalArity);
-			for (int i = 0; i < physicalArity; i++) {
-				genericRowData.setField(i, physicalFieldGetters[i].getFieldOrNull(consumedRow));
-			}
-			physicalRow = genericRowData;
+		// shortcut in case no input projection is required
+		if (keySerialization == null && !hasMetadata) {
+			final byte[] valueSerialized = valueSerialization.serialize(consumedRow);
+			return new ProducerRecord<>(
+					topic,
+					extractPartition(consumedRow, null, valueSerialized),
+					null,
+					valueSerialized);
 		}
 
-		final byte[] valueSerialized = valueSerialization.serialize(physicalRow);
-
-		final Integer partition;
-		if (partitioner != null) {
-			partition = partitioner.partition(physicalRow, null, valueSerialized, topic, partitions);
+		final byte[] keySerialized;
+		final RowKind kind = consumedRow.getRowKind();
+		if (keySerialization == null) {
+			keySerialized = null;
 		} else {
-			partition = null;
+			final RowData keyRow = createProjectedRow(consumedRow, kind, keyFieldGetters);

Review comment:
       `keyRow` should always in `INSERT` RowKind. 

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java
##########
@@ -96,32 +142,107 @@ public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<RowData
 
 	// --------------------------------------------------------------------------------------------
 
-	private static final class MetadataAppendingCollector implements Collector<RowData>, Serializable {
+	private static final class BufferingCollector implements Collector<RowData>, Serializable {
+
+		private final List<RowData> buffer = new ArrayList<>();
+
+		@Override
+		public void collect(RowData record) {
+			buffer.add(record);
+		}
+
+		@Override
+		public void close() {
+			// nothing to do
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Emits a row with key, value, and metadata fields.
+	 *
+	 * <p>The collector is able to handle the following kinds of keys:
+	 * <ul>
+	 *     <li>No key is used.
+	 *     <li>A key is used.
+	 *     <li>The deserialization schema emits multiple keys.
+	 * </ul>
+	 */
+	private static final class OutputProjectionCollector implements Collector<RowData>, Serializable {
 
 		private static final long serialVersionUID = 1L;
 
+		private final int[] keyProjection;
+
+		private final int[] valueProjection;
+
 		private final MetadataConverter[] metadataConverters;
 
 		private transient ConsumerRecord<?, ?> inputRecord;
 
+		private transient List<RowData> physicalKeyRows;
+
 		private transient Collector<RowData> outputCollector;
 
-		MetadataAppendingCollector(MetadataConverter[] metadataConverters) {
+		OutputProjectionCollector(
+				int[] keyProjection,
+				int[] valueProjection,
+				MetadataConverter[] metadataConverters) {
+			this.keyProjection = keyProjection;
+			this.valueProjection = valueProjection;
 			this.metadataConverters = metadataConverters;
 		}
 
 		@Override
-		public void collect(RowData physicalRow) {
-			final GenericRowData genericPhysicalRow = (GenericRowData) physicalRow;
-			final int physicalArity = physicalRow.getArity();
+		public void collect(RowData physicalValueRow) {
+			// no key defined
+			if (keyProjection.length == 0) {
+				emitRow(null, (GenericRowData) physicalValueRow);
+				return;
+			}
+
+			// emit a value for each key
+			// if parsing key data gets problems, ignore the value data.
+			for (RowData physicalKeyRow : physicalKeyRows) {
+				if (physicalValueRow == null) {
+					physicalKeyRow.setRowKind(RowKind.DELETE);

Review comment:
       We don't need to set `RowKind` on `keyRow`, because we never to use it.

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaSerializationSchema.java
##########
@@ -86,39 +98,38 @@ public void open(SerializationSchema.InitializationContext context) throws Excep
 
 	@Override
 	public ProducerRecord<byte[], byte[]> serialize(RowData consumedRow, @Nullable Long timestamp) {
-		final RowData physicalRow;
-		// shortcut if no metadata is required
-		if (!hasMetadata) {
-			physicalRow = consumedRow;
-		} else {
-			final int physicalArity = physicalFieldGetters.length;
-			final GenericRowData genericRowData = new GenericRowData(
-					consumedRow.getRowKind(),
-					physicalArity);
-			for (int i = 0; i < physicalArity; i++) {
-				genericRowData.setField(i, physicalFieldGetters[i].getFieldOrNull(consumedRow));
-			}
-			physicalRow = genericRowData;
+		// shortcut in case no input projection is required
+		if (keySerialization == null && !hasMetadata) {
+			final byte[] valueSerialized = valueSerialization.serialize(consumedRow);
+			return new ProducerRecord<>(
+					topic,
+					extractPartition(consumedRow, null, valueSerialized),
+					null,
+					valueSerialized);
 		}
 
-		final byte[] valueSerialized = valueSerialization.serialize(physicalRow);
-
-		final Integer partition;
-		if (partitioner != null) {
-			partition = partitioner.partition(physicalRow, null, valueSerialized, topic, partitions);
+		final byte[] keySerialized;
+		final RowKind kind = consumedRow.getRowKind();
+		if (keySerialization == null) {
+			keySerialized = null;
 		} else {
-			partition = null;
+			final RowData keyRow = createProjectedRow(consumedRow, kind, keyFieldGetters);
+			keySerialized = keySerialization.serialize(keyRow);
 		}
 
-		// shortcut if no metadata is required
-		if (!hasMetadata) {
-			return new ProducerRecord<>(topic, partition, null, null, valueSerialized);
+		final byte[] valueSerialized;
+		if (kind == RowKind.DELETE) {
+			valueSerialized = null;
+		} else {
+			final RowData valueRow = createProjectedRow(consumedRow, kind, valueFieldGetters);
+			valueSerialized = valueSerialization.serialize(valueRow);

Review comment:
       Should under `upsertMode == true`. 

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaSerializationSchema.java
##########
@@ -86,39 +98,38 @@ public void open(SerializationSchema.InitializationContext context) throws Excep
 
 	@Override
 	public ProducerRecord<byte[], byte[]> serialize(RowData consumedRow, @Nullable Long timestamp) {
-		final RowData physicalRow;
-		// shortcut if no metadata is required
-		if (!hasMetadata) {
-			physicalRow = consumedRow;
-		} else {
-			final int physicalArity = physicalFieldGetters.length;
-			final GenericRowData genericRowData = new GenericRowData(
-					consumedRow.getRowKind(),
-					physicalArity);
-			for (int i = 0; i < physicalArity; i++) {
-				genericRowData.setField(i, physicalFieldGetters[i].getFieldOrNull(consumedRow));
-			}
-			physicalRow = genericRowData;
+		// shortcut in case no input projection is required
+		if (keySerialization == null && !hasMetadata) {
+			final byte[] valueSerialized = valueSerialization.serialize(consumedRow);
+			return new ProducerRecord<>(
+					topic,
+					extractPartition(consumedRow, null, valueSerialized),
+					null,
+					valueSerialized);
 		}
 
-		final byte[] valueSerialized = valueSerialization.serialize(physicalRow);
-
-		final Integer partition;
-		if (partitioner != null) {
-			partition = partitioner.partition(physicalRow, null, valueSerialized, topic, partitions);
+		final byte[] keySerialized;
+		final RowKind kind = consumedRow.getRowKind();
+		if (keySerialization == null) {
+			keySerialized = null;
 		} else {
-			partition = null;
+			final RowData keyRow = createProjectedRow(consumedRow, kind, keyFieldGetters);
+			keySerialized = keySerialization.serialize(keyRow);
 		}
 
-		// shortcut if no metadata is required
-		if (!hasMetadata) {
-			return new ProducerRecord<>(topic, partition, null, null, valueSerialized);
+		final byte[] valueSerialized;
+		if (kind == RowKind.DELETE) {

Review comment:
       `kind == RowKind.DELETE || kind == RowKind.UPDATE_BEFORE`

##########
File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
##########
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.TestFormatFactory;
+import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
+import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.function.Consumer;
+
+import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Test for {@link UpsertKafkaDynamicTableFactory}.
+ */
+public class UpsertKafkaDynamicTableFactoryTest extends TestLogger {
+
+	private static final String SOURCE_TOPICS = "sourceTopic_1;sourceTopic_2";
+
+	private static final List<String> SOURCE_TOPIC_LIST =
+			Arrays.asList("sourceTopic_1", "sourceTopic_2");
+
+	private static final String SINK_TOPIC = "sinkTopic";
+
+	private static final TableSchema SOURCE_SCHEMA = TableSchema.builder()
+			.field("window_start", new AtomicDataType(new VarCharType(false, 100)))
+			.field("region", new AtomicDataType(new VarCharType(false, 100)))
+			.field("view_count", DataTypes.BIGINT())
+			.primaryKey("window_start", "region")
+			.build();
+
+	private static final int[] SOURCE_KEY_FIELDS = new int[]{0, 1};
+
+	private static final int[] SOURCE_VALUE_FIELDS = new int[]{0, 1, 2};
+
+	private static final TableSchema SINK_SCHEMA = TableSchema.builder()
+			.field("region", new AtomicDataType(new VarCharType(false, 100)))
+			.field("view_count", DataTypes.BIGINT())
+			.primaryKey("region")
+			.build();
+
+	private static final int[] SINK_KEY_FIELDS = new int[]{0};
+
+	private static final int[] SINK_VALUE_FIELDS = new int[]{0, 1};
+
+	private static final Properties UPSERT_KAFKA_SOURCE_PROPERTIES = new Properties();
+	private static final Properties UPSERT_KAFKA_SINK_PROPERTIES = new Properties();
+
+	static {
+		UPSERT_KAFKA_SOURCE_PROPERTIES.setProperty("bootstrap.servers", "dummy");
+
+		UPSERT_KAFKA_SINK_PROPERTIES.setProperty("bootstrap.servers", "dummy");
+	}
+
+	@Rule
+	public ExpectedException thrown = ExpectedException.none();
+
+	@Test
+	public void testTableSource() {
+		final DataType producedDataType = SOURCE_SCHEMA.toPhysicalRowDataType();
+
+		DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat =
+				new TestFormatFactory.DecodingFormatMock(
+						",", true, ChangelogMode.insertOnly());
+
+		DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat =
+				new TestFormatFactory.DecodingFormatMock(
+						",", true, ChangelogMode.insertOnly());
+
+		// Construct table source using options and table source factory
+		ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
+				"default",
+				"default",
+				"sourceTable");
+		CatalogTable catalogTable =
+				new CatalogTableImpl(SOURCE_SCHEMA, getFullSourceOptions(), "sourceTable");
+		final DynamicTableSource actualSource = FactoryUtil.createTableSource(null,
+				objectIdentifier,
+				catalogTable,
+				new Configuration(),
+				Thread.currentThread().getContextClassLoader(),
+				false);
+
+		final KafkaDynamicSource expectedSource = new KafkaDynamicSource(
+				producedDataType,
+				UpsertKafkaDynamicTableFactory.SOURCE_CHANGELOG_MODE,
+				keyDecodingFormat,
+				valueDecodingFormat,
+				SOURCE_KEY_FIELDS,
+				SOURCE_VALUE_FIELDS,
+				null,
+				SOURCE_TOPIC_LIST,
+				null,
+				UPSERT_KAFKA_SOURCE_PROPERTIES,
+				StartupMode.EARLIEST,
+				Collections.emptyMap(),
+				0);
+		assertEquals(actualSource, expectedSource);
+
+		final KafkaDynamicSource actualUpsertKafkaSource = (KafkaDynamicSource) actualSource;
+		ScanTableSource.ScanRuntimeProvider provider =
+				actualUpsertKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+		assertThat(provider, instanceOf(SourceFunctionProvider.class));
+		final SourceFunctionProvider sourceFunctionProvider = (SourceFunctionProvider) provider;
+		final SourceFunction<RowData> sourceFunction = sourceFunctionProvider.createSourceFunction();
+		assertThat(sourceFunction, instanceOf(FlinkKafkaConsumer.class));
+	}
+
+	@Test
+	public void testTableSink() {
+		final DataType consumedDataType = SINK_SCHEMA.toPhysicalRowDataType();
+		EncodingFormat<SerializationSchema<RowData>> keyEncodingFormat =
+				new TestFormatFactory.EncodingFormatMock(",", ChangelogMode.insertOnly());
+
+		EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat =
+				new TestFormatFactory.EncodingFormatMock(",", ChangelogMode.insertOnly());
+
+		// Construct table sink using options and table sink factory.
+		ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
+				"default",
+				"default",
+				"sinkTable");
+		final CatalogTable sinkTable =
+				new CatalogTableImpl(SINK_SCHEMA, getFullSinkOptions(), "sinkTable");
+		final DynamicTableSink actualSink = FactoryUtil.createTableSink(
+				null,
+				objectIdentifier,
+				sinkTable,
+				new Configuration(),
+				Thread.currentThread().getContextClassLoader(),
+				false);
+
+		final DynamicTableSink expectedSink = new KafkaDynamicSink(
+				consumedDataType,
+				UpsertKafkaDynamicTableFactory.SINK_CHANGELOG_MODE,
+				keyEncodingFormat,
+				valueEncodingFormat,
+				SINK_KEY_FIELDS,
+				SINK_VALUE_FIELDS,
+				null,
+				SINK_TOPIC,
+				UPSERT_KAFKA_SINK_PROPERTIES,
+				Optional.empty(),
+				KafkaOptions.getSinkSemantic(KafkaOptions.SINK_SEMANTIC_VALUE_AT_LEAST_ONCE));
+
+		// Test sink format.
+		final KafkaDynamicSink actualUpsertKafkaSink = (KafkaDynamicSink) actualSink;
+		assertEquals(valueEncodingFormat, actualUpsertKafkaSink.valueEncodingFormat);
+		assertEquals(keyEncodingFormat, actualUpsertKafkaSink.keyEncodingFormat);
+		assertEquals(expectedSink, actualSink);
+
+		// Test kafka producer.
+		DynamicTableSink.SinkRuntimeProvider provider =
+				actualUpsertKafkaSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false));
+		assertThat(provider, instanceOf(SinkFunctionProvider.class));
+		final SinkFunctionProvider sinkFunctionProvider = (SinkFunctionProvider) provider;
+		final SinkFunction<RowData> sinkFunction = sinkFunctionProvider.createSinkFunction();
+		assertThat(sinkFunction, instanceOf(FlinkKafkaProducer.class));
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Negative tests
+	// --------------------------------------------------------------------------------------------
+
+	@Test
+	public void testCreateSourceTableWithoutPK() {
+		TableSchema illegalSchema = TableSchema.builder()
+				.field("window_start", DataTypes.STRING())
+				.field("region", DataTypes.STRING())
+				.field("view_count", DataTypes.BIGINT())
+				.build();
+		ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
+				"default",
+				"default",
+				"sourceTable");
+		CatalogTable catalogTable = new CatalogTableImpl(illegalSchema, getFullSourceOptions(), "sourceTable");
+
+		thrown.expect(ValidationException.class);
+		thrown.expect(containsCause(new ValidationException("PK constraints must be defined in the 'upsert-kafka' connector.")));
+
+		FactoryUtil.createTableSource(null,
+				objectIdentifier,
+				catalogTable,
+				new Configuration(),
+				Thread.currentThread().getContextClassLoader(),
+				false);
+	}
+
+	@Test
+	public void testCreateSinkTableWithoutPK() {
+		TableSchema illegalSchema = TableSchema.builder()
+				.field("region", DataTypes.STRING())
+				.field("view_count", DataTypes.BIGINT())
+				.build();
+		ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
+				"default",
+				"default",
+				"sinkTable");
+		CatalogTable catalogTable = new CatalogTableImpl(illegalSchema, getFullSinkOptions(), "sinkTable");
+
+		thrown.expect(ValidationException.class);
+		thrown.expect(containsCause(new ValidationException("PK constraints must be defined in the 'upsert-kafka' connector.")));
+
+		FactoryUtil.createTableSink(null,
+				objectIdentifier,
+				catalogTable,
+				new Configuration(),
+				Thread.currentThread().getContextClassLoader(),
+				false);

Review comment:
       I think we can reuse the verbose create table sink/source code, e.g. 
   
   ```java
   	private static DynamicTableSink createTableSink(TableSchema schema, Map<String, String> options) {
   		ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
   			"default",
   			"default",
   			"testTable");
   		CatalogTable catalogTable = new CatalogTableImpl(
   			schema,
   			options,
   			"testTable");
   		return FactoryUtil.createTableSink(null,
   			objectIdentifier,
   			catalogTable,
   			new Configuration(),
   			Thread.currentThread().getContextClassLoader(),
   			false);
   	}
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13850: [FLINK-19858]Intorduce the upsert-kafka table factory

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13850:
URL: https://github.com/apache/flink/pull/13850#issuecomment-719162027


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "73ecb300a223c19e23506332846f40a531d8e723",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8645",
       "triggerID" : "73ecb300a223c19e23506332846f40a531d8e723",
       "triggerType" : "PUSH"
     }, {
       "hash" : "16c0c99d7d74516d0d89899f1f868f436a669182",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8648",
       "triggerID" : "16c0c99d7d74516d0d89899f1f868f436a669182",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d595070a5d7ff863daeae6d8e070f7195304636",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8722",
       "triggerID" : "3d595070a5d7ff863daeae6d8e070f7195304636",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ee2aec1f35053f3ac2f45e4040df21fb8df4c6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8726",
       "triggerID" : "99ee2aec1f35053f3ac2f45e4040df21fb8df4c6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fb13ae80b253b874698f51ab065dc9438251b414",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8779",
       "triggerID" : "fb13ae80b253b874698f51ab065dc9438251b414",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3a16055c657e05d255e37ab3711685cfc13efca",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8941",
       "triggerID" : "d3a16055c657e05d255e37ab3711685cfc13efca",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ab7d66c98efe2cbae4d7c1f1e699df685b8d9bb1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9008",
       "triggerID" : "ab7d66c98efe2cbae4d7c1f1e699df685b8d9bb1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "45845058aa91089037420a3dea07901497e645d4",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9033",
       "triggerID" : "45845058aa91089037420a3dea07901497e645d4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "726b4e0a7a34dfd82e09cf9d04c98d722bb12297",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "726b4e0a7a34dfd82e09cf9d04c98d722bb12297",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 45845058aa91089037420a3dea07901497e645d4 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9033) 
   * 726b4e0a7a34dfd82e09cf9d04c98d722bb12297 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13850: [FLINK-19858]Intorduce the upsert-kafka table factory

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13850:
URL: https://github.com/apache/flink/pull/13850#issuecomment-719162027


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "73ecb300a223c19e23506332846f40a531d8e723",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8645",
       "triggerID" : "73ecb300a223c19e23506332846f40a531d8e723",
       "triggerType" : "PUSH"
     }, {
       "hash" : "16c0c99d7d74516d0d89899f1f868f436a669182",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8648",
       "triggerID" : "16c0c99d7d74516d0d89899f1f868f436a669182",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 73ecb300a223c19e23506332846f40a531d8e723 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8645) 
   * 16c0c99d7d74516d0d89899f1f868f436a669182 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8648) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] fsk119 commented on pull request #13850: [FLINK-19858]Intorduce the upsert-kafka table factory

Posted by GitBox <gi...@apache.org>.
fsk119 commented on pull request #13850:
URL: https://github.com/apache/flink/pull/13850#issuecomment-722079607


   Thanks for your help @wuchong ! The changes look very good.  
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13850: [FLINK-19858]Intorduce the upsert-kafka table factory

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13850:
URL: https://github.com/apache/flink/pull/13850#issuecomment-719162027


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "73ecb300a223c19e23506332846f40a531d8e723",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8645",
       "triggerID" : "73ecb300a223c19e23506332846f40a531d8e723",
       "triggerType" : "PUSH"
     }, {
       "hash" : "16c0c99d7d74516d0d89899f1f868f436a669182",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8648",
       "triggerID" : "16c0c99d7d74516d0d89899f1f868f436a669182",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d595070a5d7ff863daeae6d8e070f7195304636",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8722",
       "triggerID" : "3d595070a5d7ff863daeae6d8e070f7195304636",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ee2aec1f35053f3ac2f45e4040df21fb8df4c6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8726",
       "triggerID" : "99ee2aec1f35053f3ac2f45e4040df21fb8df4c6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fb13ae80b253b874698f51ab065dc9438251b414",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8779",
       "triggerID" : "fb13ae80b253b874698f51ab065dc9438251b414",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3a16055c657e05d255e37ab3711685cfc13efca",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8941",
       "triggerID" : "d3a16055c657e05d255e37ab3711685cfc13efca",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * fb13ae80b253b874698f51ab065dc9438251b414 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8779) 
   * d3a16055c657e05d255e37ab3711685cfc13efca Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8941) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13850: [FLINK-19858]Intorduce the upsert-kafka table factory

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13850:
URL: https://github.com/apache/flink/pull/13850#issuecomment-719156326


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 73ecb300a223c19e23506332846f40a531d8e723 (Fri Oct 30 03:55:44 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13850: [FLINK-19858]Intorduce the upsert-kafka table factory

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13850:
URL: https://github.com/apache/flink/pull/13850#issuecomment-719162027


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "73ecb300a223c19e23506332846f40a531d8e723",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8645",
       "triggerID" : "73ecb300a223c19e23506332846f40a531d8e723",
       "triggerType" : "PUSH"
     }, {
       "hash" : "16c0c99d7d74516d0d89899f1f868f436a669182",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8648",
       "triggerID" : "16c0c99d7d74516d0d89899f1f868f436a669182",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d595070a5d7ff863daeae6d8e070f7195304636",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8722",
       "triggerID" : "3d595070a5d7ff863daeae6d8e070f7195304636",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ee2aec1f35053f3ac2f45e4040df21fb8df4c6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8726",
       "triggerID" : "99ee2aec1f35053f3ac2f45e4040df21fb8df4c6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fb13ae80b253b874698f51ab065dc9438251b414",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8779",
       "triggerID" : "fb13ae80b253b874698f51ab065dc9438251b414",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3a16055c657e05d255e37ab3711685cfc13efca",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8941",
       "triggerID" : "d3a16055c657e05d255e37ab3711685cfc13efca",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ab7d66c98efe2cbae4d7c1f1e699df685b8d9bb1",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9008",
       "triggerID" : "ab7d66c98efe2cbae4d7c1f1e699df685b8d9bb1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "45845058aa91089037420a3dea07901497e645d4",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9033",
       "triggerID" : "45845058aa91089037420a3dea07901497e645d4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ab7d66c98efe2cbae4d7c1f1e699df685b8d9bb1 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9008) 
   * 45845058aa91089037420a3dea07901497e645d4 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9033) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13850: [FLINK-19858]Intorduce the upsert-kafka table factory

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13850:
URL: https://github.com/apache/flink/pull/13850#issuecomment-719162027


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "73ecb300a223c19e23506332846f40a531d8e723",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8645",
       "triggerID" : "73ecb300a223c19e23506332846f40a531d8e723",
       "triggerType" : "PUSH"
     }, {
       "hash" : "16c0c99d7d74516d0d89899f1f868f436a669182",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8648",
       "triggerID" : "16c0c99d7d74516d0d89899f1f868f436a669182",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d595070a5d7ff863daeae6d8e070f7195304636",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8722",
       "triggerID" : "3d595070a5d7ff863daeae6d8e070f7195304636",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ee2aec1f35053f3ac2f45e4040df21fb8df4c6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8726",
       "triggerID" : "99ee2aec1f35053f3ac2f45e4040df21fb8df4c6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fb13ae80b253b874698f51ab065dc9438251b414",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8779",
       "triggerID" : "fb13ae80b253b874698f51ab065dc9438251b414",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3a16055c657e05d255e37ab3711685cfc13efca",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8941",
       "triggerID" : "d3a16055c657e05d255e37ab3711685cfc13efca",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ab7d66c98efe2cbae4d7c1f1e699df685b8d9bb1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9008",
       "triggerID" : "ab7d66c98efe2cbae4d7c1f1e699df685b8d9bb1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "45845058aa91089037420a3dea07901497e645d4",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9033",
       "triggerID" : "45845058aa91089037420a3dea07901497e645d4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "726b4e0a7a34dfd82e09cf9d04c98d722bb12297",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9043",
       "triggerID" : "726b4e0a7a34dfd82e09cf9d04c98d722bb12297",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8e4ebec4831e4e2800adb10eb12980869214674a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8e4ebec4831e4e2800adb10eb12980869214674a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 45845058aa91089037420a3dea07901497e645d4 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9033) 
   * 726b4e0a7a34dfd82e09cf9d04c98d722bb12297 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9043) 
   * 8e4ebec4831e4e2800adb10eb12980869214674a UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13850: [FLINK-19858]Intorduce the upsert-kafka table factory

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13850:
URL: https://github.com/apache/flink/pull/13850#issuecomment-719162027


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "73ecb300a223c19e23506332846f40a531d8e723",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8645",
       "triggerID" : "73ecb300a223c19e23506332846f40a531d8e723",
       "triggerType" : "PUSH"
     }, {
       "hash" : "16c0c99d7d74516d0d89899f1f868f436a669182",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8648",
       "triggerID" : "16c0c99d7d74516d0d89899f1f868f436a669182",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d595070a5d7ff863daeae6d8e070f7195304636",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8722",
       "triggerID" : "3d595070a5d7ff863daeae6d8e070f7195304636",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ee2aec1f35053f3ac2f45e4040df21fb8df4c6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8726",
       "triggerID" : "99ee2aec1f35053f3ac2f45e4040df21fb8df4c6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fb13ae80b253b874698f51ab065dc9438251b414",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8779",
       "triggerID" : "fb13ae80b253b874698f51ab065dc9438251b414",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3a16055c657e05d255e37ab3711685cfc13efca",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8941",
       "triggerID" : "d3a16055c657e05d255e37ab3711685cfc13efca",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ab7d66c98efe2cbae4d7c1f1e699df685b8d9bb1",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9008",
       "triggerID" : "ab7d66c98efe2cbae4d7c1f1e699df685b8d9bb1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d3a16055c657e05d255e37ab3711685cfc13efca Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8941) 
   * ab7d66c98efe2cbae4d7c1f1e699df685b8d9bb1 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9008) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13850: [FLINK-19858]Intorduce the upsert-kafka table factory

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13850:
URL: https://github.com/apache/flink/pull/13850#issuecomment-719162027


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "73ecb300a223c19e23506332846f40a531d8e723",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8645",
       "triggerID" : "73ecb300a223c19e23506332846f40a531d8e723",
       "triggerType" : "PUSH"
     }, {
       "hash" : "16c0c99d7d74516d0d89899f1f868f436a669182",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8648",
       "triggerID" : "16c0c99d7d74516d0d89899f1f868f436a669182",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d595070a5d7ff863daeae6d8e070f7195304636",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8722",
       "triggerID" : "3d595070a5d7ff863daeae6d8e070f7195304636",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ee2aec1f35053f3ac2f45e4040df21fb8df4c6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8726",
       "triggerID" : "99ee2aec1f35053f3ac2f45e4040df21fb8df4c6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fb13ae80b253b874698f51ab065dc9438251b414",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8779",
       "triggerID" : "fb13ae80b253b874698f51ab065dc9438251b414",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3a16055c657e05d255e37ab3711685cfc13efca",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8941",
       "triggerID" : "d3a16055c657e05d255e37ab3711685cfc13efca",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ab7d66c98efe2cbae4d7c1f1e699df685b8d9bb1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "ab7d66c98efe2cbae4d7c1f1e699df685b8d9bb1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * d3a16055c657e05d255e37ab3711685cfc13efca Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8941) 
   * ab7d66c98efe2cbae4d7c1f1e699df685b8d9bb1 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] fsk119 commented on a change in pull request #13850: [FLINK-19858]Intorduce the upsert-kafka table factory

Posted by GitBox <gi...@apache.org>.
fsk119 commented on a change in pull request #13850:
URL: https://github.com/apache/flink/pull/13850#discussion_r515758607



##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java
##########
@@ -96,32 +142,107 @@ public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<RowData
 
 	// --------------------------------------------------------------------------------------------
 
-	private static final class MetadataAppendingCollector implements Collector<RowData>, Serializable {
+	private static final class BufferingCollector implements Collector<RowData>, Serializable {
+
+		private final List<RowData> buffer = new ArrayList<>();
+
+		@Override
+		public void collect(RowData record) {
+			buffer.add(record);
+		}
+
+		@Override
+		public void close() {
+			// nothing to do
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Emits a row with key, value, and metadata fields.
+	 *
+	 * <p>The collector is able to handle the following kinds of keys:
+	 * <ul>
+	 *     <li>No key is used.
+	 *     <li>A key is used.
+	 *     <li>The deserialization schema emits multiple keys.
+	 * </ul>
+	 */
+	private static final class OutputProjectionCollector implements Collector<RowData>, Serializable {
 
 		private static final long serialVersionUID = 1L;
 
+		private final int[] keyProjection;
+
+		private final int[] valueProjection;
+
 		private final MetadataConverter[] metadataConverters;
 
 		private transient ConsumerRecord<?, ?> inputRecord;
 
+		private transient List<RowData> physicalKeyRows;
+
 		private transient Collector<RowData> outputCollector;
 
-		MetadataAppendingCollector(MetadataConverter[] metadataConverters) {
+		OutputProjectionCollector(
+				int[] keyProjection,
+				int[] valueProjection,
+				MetadataConverter[] metadataConverters) {
+			this.keyProjection = keyProjection;
+			this.valueProjection = valueProjection;
 			this.metadataConverters = metadataConverters;
 		}
 
 		@Override
-		public void collect(RowData physicalRow) {
-			final GenericRowData genericPhysicalRow = (GenericRowData) physicalRow;
-			final int physicalArity = physicalRow.getArity();
+		public void collect(RowData physicalValueRow) {
+			// no key defined
+			if (keyProjection.length == 0) {
+				emitRow(null, (GenericRowData) physicalValueRow);
+				return;
+			}
+
+			// emit a value for each key
+			// if parsing key data gets problems, ignore the value data.
+			for (RowData physicalKeyRow : physicalKeyRows) {
+				if (physicalValueRow == null) {
+					physicalKeyRow.setRowKind(RowKind.DELETE);
+				}
+				emitRow((GenericRowData) physicalKeyRow, (GenericRowData) physicalValueRow);
+			}
+		}
+
+		@Override
+		public void close() {
+			// nothing to do
+		}
+
+		/**
+		 * There are 4 situations:
+		 * 1. key is null && value is not null => project and set rowkind = insert
+		 * 2. key is not null && value is not null => project and set rowkind = insert
+		 * 3. key is not null && value is null => project and set rowkind = delete
+		 * 4. key is null && value is null => impossible
+		 * This situation is impossible:
+		 *   keyProjection.length > 0 && key == null
+		 * */
+		private void emitRow(@Nullable GenericRowData physicalKeyRow, @Nullable GenericRowData physicalValueRow) {
 			final int metadataArity = metadataConverters.length;
+			final int physicalArity = keyProjection.length + valueProjection.length;
+			final RowKind rowkind = physicalValueRow == null ? RowKind.DELETE : physicalValueRow.getRowKind();

Review comment:
       It is a inner colletor. It will receive the tombstone only when the outer class `DynamicKafkaDeserializationSchema` decide to emitrow. Therefore, it doesn't need a flag to detect the mode of the deserialization.

##########
File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaSerializationSchema.java
##########
@@ -86,39 +98,38 @@ public void open(SerializationSchema.InitializationContext context) throws Excep
 
 	@Override
 	public ProducerRecord<byte[], byte[]> serialize(RowData consumedRow, @Nullable Long timestamp) {
-		final RowData physicalRow;
-		// shortcut if no metadata is required
-		if (!hasMetadata) {
-			physicalRow = consumedRow;
-		} else {
-			final int physicalArity = physicalFieldGetters.length;
-			final GenericRowData genericRowData = new GenericRowData(
-					consumedRow.getRowKind(),
-					physicalArity);
-			for (int i = 0; i < physicalArity; i++) {
-				genericRowData.setField(i, physicalFieldGetters[i].getFieldOrNull(consumedRow));
-			}
-			physicalRow = genericRowData;
+		// shortcut in case no input projection is required
+		if (keySerialization == null && !hasMetadata) {
+			final byte[] valueSerialized = valueSerialization.serialize(consumedRow);
+			return new ProducerRecord<>(
+					topic,
+					extractPartition(consumedRow, null, valueSerialized),
+					null,
+					valueSerialized);
 		}
 
-		final byte[] valueSerialized = valueSerialization.serialize(physicalRow);
-
-		final Integer partition;
-		if (partitioner != null) {
-			partition = partitioner.partition(physicalRow, null, valueSerialized, topic, partitions);
+		final byte[] keySerialized;
+		final RowKind kind = consumedRow.getRowKind();
+		if (keySerialization == null) {
+			keySerialized = null;
 		} else {
-			partition = null;
+			final RowData keyRow = createProjectedRow(consumedRow, kind, keyFieldGetters);
+			keySerialized = keySerialization.serialize(keyRow);
 		}
 
-		// shortcut if no metadata is required
-		if (!hasMetadata) {
-			return new ProducerRecord<>(topic, partition, null, null, valueSerialized);
+		final byte[] valueSerialized;
+		if (kind == RowKind.DELETE) {
+			valueSerialized = null;
+		} else {
+			final RowData valueRow = createProjectedRow(consumedRow, kind, valueFieldGetters);
+			valueSerialized = valueSerialization.serialize(valueRow);

Review comment:
       If uspert mode is true, we should transfrom the delete/update-before message to tombstone message. But I am not sure what behavirour of the `DynamicKafkaSerializationSchema`? It has no means to discard the row it receives. If we have the same behaviour, I think it's useless to introduce a flag.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13850: [FLINK-19858]Intorduce the upsert-kafka table factory

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13850:
URL: https://github.com/apache/flink/pull/13850#issuecomment-719162027


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "73ecb300a223c19e23506332846f40a531d8e723",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8645",
       "triggerID" : "73ecb300a223c19e23506332846f40a531d8e723",
       "triggerType" : "PUSH"
     }, {
       "hash" : "16c0c99d7d74516d0d89899f1f868f436a669182",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8648",
       "triggerID" : "16c0c99d7d74516d0d89899f1f868f436a669182",
       "triggerType" : "PUSH"
     }, {
       "hash" : "3d595070a5d7ff863daeae6d8e070f7195304636",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8722",
       "triggerID" : "3d595070a5d7ff863daeae6d8e070f7195304636",
       "triggerType" : "PUSH"
     }, {
       "hash" : "99ee2aec1f35053f3ac2f45e4040df21fb8df4c6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8726",
       "triggerID" : "99ee2aec1f35053f3ac2f45e4040df21fb8df4c6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fb13ae80b253b874698f51ab065dc9438251b414",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8779",
       "triggerID" : "fb13ae80b253b874698f51ab065dc9438251b414",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d3a16055c657e05d255e37ab3711685cfc13efca",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8941",
       "triggerID" : "d3a16055c657e05d255e37ab3711685cfc13efca",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ab7d66c98efe2cbae4d7c1f1e699df685b8d9bb1",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9008",
       "triggerID" : "ab7d66c98efe2cbae4d7c1f1e699df685b8d9bb1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "45845058aa91089037420a3dea07901497e645d4",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "45845058aa91089037420a3dea07901497e645d4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ab7d66c98efe2cbae4d7c1f1e699df685b8d9bb1 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=9008) 
   * 45845058aa91089037420a3dea07901497e645d4 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org