You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/18 23:10:02 UTC

[GitHub] [flink] guoweiM opened a new pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

guoweiM opened a new pull request #13678:
URL: https://github.com/apache/flink/pull/13678


   <!--
   *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
     Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Azure Pipelines CI to do that following [this guide](https://cwiki.apache.org/confluence/display/FLINK/Azure+Pipelines#AzurePipelines-Tutorial:SettingupAzurePipelinesforaforkoftheFlinkrepository).
   
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message (including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   
   This patch introduces two stream committer operator:
   1. `StreamingCommitterOperator` is the runtime operator for the `Committer` in the new sink api
   2. `StreamingGlobalCommitterOperator` is the runtime operator for the `GlobalCommitter` in the new sink api
   
   The `AbstractStreamingCommitterOperator` is base class of the two operators
   
   ## Brief change log
   
   1. Introduce the `AbstractStreamingCommitterOperator` as the base class of the committer operator
   1. Introduce the `StreamingCommitterOperator` for the local committer
   1. Introduce the `StreamingGlobalCommitterOperator` for the global committer
   1. Introduce the `StreamingCommitterState` and `StreamingCommitterStateSerializer`
   1. Makes'TestSink' as all test sinks's factory.
   
   
   ## Verifying this change
   
   1. `StreamingCommitterTestBase` test the `open/process/presnapshot/snapshot/close/restore` of the two operator
   2. `GlobalStreamingCommitterOperatorTest` test `filterRecoveredCommittables/endOfInput` of the `GlobalStreamingCommiterOperator`
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (JavaDocs)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13678:
URL: https://github.com/apache/flink/pull/13678#issuecomment-711439155


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7816",
       "triggerID" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7876",
       "triggerID" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7892",
       "triggerID" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b31d0fdb8b5336a91231fad0db55afc072400932 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7876) 
   * 4873fb92710324b9d47682dcdfb707815d71bfc3 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7892) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] guoweiM commented on pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
guoweiM commented on pull request #13678:
URL: https://github.com/apache/flink/pull/13678#issuecomment-712273675


   Thanks @kl0u and @gaoyunhaii .


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13678:
URL: https://github.com/apache/flink/pull/13678#issuecomment-711439155


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7816",
       "triggerID" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7876",
       "triggerID" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7892",
       "triggerID" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7904",
       "triggerID" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1816d296b693f771d3a5e0cf417bd1f30f302aad",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7955",
       "triggerID" : "1816d296b693f771d3a5e0cf417bd1f30f302aad",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc7d36d9e7e23f81de0d5cd090fbb818f41f5edd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8038",
       "triggerID" : "fc7d36d9e7e23f81de0d5cd090fbb818f41f5edd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2798c88d1f869baf79e35af9ae67ac954f636857",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8051",
       "triggerID" : "714048464",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "2798c88d1f869baf79e35af9ae67ac954f636857",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8051",
       "triggerID" : "2798c88d1f869baf79e35af9ae67ac954f636857",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "714048464",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "966c1aca195e5f2af7f32a485a48e0f5eebb35d0",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8055",
       "triggerID" : "966c1aca195e5f2af7f32a485a48e0f5eebb35d0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 966c1aca195e5f2af7f32a485a48e0f5eebb35d0 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8055) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13678:
URL: https://github.com/apache/flink/pull/13678#issuecomment-711439155


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7816",
       "triggerID" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7876",
       "triggerID" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7892",
       "triggerID" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7904",
       "triggerID" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1816d296b693f771d3a5e0cf417bd1f30f302aad",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7955",
       "triggerID" : "1816d296b693f771d3a5e0cf417bd1f30f302aad",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc7d36d9e7e23f81de0d5cd090fbb818f41f5edd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8038",
       "triggerID" : "fc7d36d9e7e23f81de0d5cd090fbb818f41f5edd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2798c88d1f869baf79e35af9ae67ac954f636857",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8051",
       "triggerID" : "714048464",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "2798c88d1f869baf79e35af9ae67ac954f636857",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8051",
       "triggerID" : "2798c88d1f869baf79e35af9ae67ac954f636857",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "714048464",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "966c1aca195e5f2af7f32a485a48e0f5eebb35d0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8055",
       "triggerID" : "966c1aca195e5f2af7f32a485a48e0f5eebb35d0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "463b5c8ed21f93caaeb7b938aa9e72abb35619b2",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "714246544",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "463b5c8ed21f93caaeb7b938aa9e72abb35619b2",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "463b5c8ed21f93caaeb7b938aa9e72abb35619b2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2ba5bda5c5a15c370c231e1cedfa03fd0bc0b41b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8076",
       "triggerID" : "2ba5bda5c5a15c370c231e1cedfa03fd0bc0b41b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "714246544",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "9d7e355abedbffb48ef6fa865a3606148d2207d2",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8148",
       "triggerID" : "9d7e355abedbffb48ef6fa865a3606148d2207d2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 463b5c8ed21f93caaeb7b938aa9e72abb35619b2 UNKNOWN
   * 9d7e355abedbffb48ef6fa865a3606148d2207d2 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8148) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13678:
URL: https://github.com/apache/flink/pull/13678#issuecomment-711439155


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7816",
       "triggerID" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7876",
       "triggerID" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7892",
       "triggerID" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7904",
       "triggerID" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1816d296b693f771d3a5e0cf417bd1f30f302aad",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7955",
       "triggerID" : "1816d296b693f771d3a5e0cf417bd1f30f302aad",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc7d36d9e7e23f81de0d5cd090fbb818f41f5edd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8038",
       "triggerID" : "fc7d36d9e7e23f81de0d5cd090fbb818f41f5edd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2798c88d1f869baf79e35af9ae67ac954f636857",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8051",
       "triggerID" : "714048464",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "2798c88d1f869baf79e35af9ae67ac954f636857",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8051",
       "triggerID" : "2798c88d1f869baf79e35af9ae67ac954f636857",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "714048464",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "966c1aca195e5f2af7f32a485a48e0f5eebb35d0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8055",
       "triggerID" : "966c1aca195e5f2af7f32a485a48e0f5eebb35d0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "463b5c8ed21f93caaeb7b938aa9e72abb35619b2",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "714246544",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "463b5c8ed21f93caaeb7b938aa9e72abb35619b2",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "463b5c8ed21f93caaeb7b938aa9e72abb35619b2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2ba5bda5c5a15c370c231e1cedfa03fd0bc0b41b",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8076",
       "triggerID" : "2ba5bda5c5a15c370c231e1cedfa03fd0bc0b41b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "714246544",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 463b5c8ed21f93caaeb7b938aa9e72abb35619b2 UNKNOWN
   * 2ba5bda5c5a15c370c231e1cedfa03fd0bc0b41b Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8076) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13678:
URL: https://github.com/apache/flink/pull/13678#issuecomment-711439155


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7816",
       "triggerID" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7876",
       "triggerID" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7892",
       "triggerID" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7904",
       "triggerID" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1816d296b693f771d3a5e0cf417bd1f30f302aad",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7955",
       "triggerID" : "1816d296b693f771d3a5e0cf417bd1f30f302aad",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc7d36d9e7e23f81de0d5cd090fbb818f41f5edd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8038",
       "triggerID" : "fc7d36d9e7e23f81de0d5cd090fbb818f41f5edd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2798c88d1f869baf79e35af9ae67ac954f636857",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8051",
       "triggerID" : "714048464",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "2798c88d1f869baf79e35af9ae67ac954f636857",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8051",
       "triggerID" : "2798c88d1f869baf79e35af9ae67ac954f636857",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "714048464",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 2798c88d1f869baf79e35af9ae67ac954f636857 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8051) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] guoweiM commented on pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
guoweiM commented on pull request #13678:
URL: https://github.com/apache/flink/pull/13678#issuecomment-714246544


   @flinkbot run azure


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13678:
URL: https://github.com/apache/flink/pull/13678#issuecomment-711439155


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7816",
       "triggerID" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7876",
       "triggerID" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7892",
       "triggerID" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7904",
       "triggerID" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4873fb92710324b9d47682dcdfb707815d71bfc3 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7892) 
   * 82614f698d02d8ce751d0fad7820b85c12e54fff Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7904) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13678:
URL: https://github.com/apache/flink/pull/13678#issuecomment-711439155


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7816",
       "triggerID" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7876",
       "triggerID" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7892",
       "triggerID" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7904",
       "triggerID" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1816d296b693f771d3a5e0cf417bd1f30f302aad",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7955",
       "triggerID" : "1816d296b693f771d3a5e0cf417bd1f30f302aad",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc7d36d9e7e23f81de0d5cd090fbb818f41f5edd",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "fc7d36d9e7e23f81de0d5cd090fbb818f41f5edd",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1816d296b693f771d3a5e0cf417bd1f30f302aad Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7955) 
   * fc7d36d9e7e23f81de0d5cd090fbb818f41f5edd UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] guoweiM commented on a change in pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
guoweiM commented on a change in pull request #13678:
URL: https://github.com/apache/flink/pull/13678#discussion_r507873450



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalStreamingCommitterOperator.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Runtime {@link org.apache.flink.streaming.api.operators.StreamOperator} for executing {@link GlobalCommitter}.
+ *
+ * @param <CommT> The committable type of the {@link GlobalCommitter}.
+ * @param <GlobalCommT> The global committable type of the {@link GlobalCommitter}.
+ */
+@Internal
+public final class GlobalStreamingCommitterOperator<CommT, GlobalCommT> extends AbstractStreamingCommitterOperator<CommT, GlobalCommT>
+		implements BoundedOneInput {
+
+	/** Aggregate committables to global committables and commit the global committables to the external system. */
+	private GlobalCommitter<CommT, GlobalCommT> globalCommitter;
+
+	/** The current pending global committables. */
+	private List<GlobalCommT> currentGlobalCommittables;
+
+	private boolean endOfInput;
+
+	GlobalStreamingCommitterOperator(
+			GlobalCommitter<CommT, GlobalCommT> globalCommitter,
+			SimpleVersionedSerializer<GlobalCommT> committableSerializer) {
+		super(globalCommitter, committableSerializer);
+		this.globalCommitter = globalCommitter;
+		this.endOfInput = false;
+	}
+
+	@Override
+	void recoveredCommittables(List<GlobalCommT> committables) {
+		this.currentGlobalCommittables = globalCommitter.filterRecoveredCommittables(committables);
+	}
+
+	@Override
+	List<GlobalCommT> preCommit() {
+		final List<GlobalCommT> result = computeCurrentGlobalCommittables();
+		currentInputs = new ArrayList<>();
+		currentGlobalCommittables = new ArrayList<>();
+		return result;
+	}
+
+	@Override
+	public void endInput() {
+		endOfInput = true;
+	}
+
+	@Override
+	public void notifyCheckpointComplete(long checkpointId) throws Exception {
+		super.notifyCheckpointComplete(checkpointId);
+		if (endOfInput) {
+			globalCommitter.endOfInput();
+		}
+	}
+
+	private List<GlobalCommT> computeCurrentGlobalCommittables() {
+		final List<GlobalCommT> result = new ArrayList<>();

Review comment:
       I remove this method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13678:
URL: https://github.com/apache/flink/pull/13678#issuecomment-711439155


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7816",
       "triggerID" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 44eb4690cd88de72a900fbf6eebbcddb38f5e684 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7816) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13678:
URL: https://github.com/apache/flink/pull/13678#issuecomment-711439155


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7816",
       "triggerID" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7876",
       "triggerID" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7892",
       "triggerID" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7904",
       "triggerID" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1816d296b693f771d3a5e0cf417bd1f30f302aad",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7955",
       "triggerID" : "1816d296b693f771d3a5e0cf417bd1f30f302aad",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc7d36d9e7e23f81de0d5cd090fbb818f41f5edd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8038",
       "triggerID" : "fc7d36d9e7e23f81de0d5cd090fbb818f41f5edd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2798c88d1f869baf79e35af9ae67ac954f636857",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8051",
       "triggerID" : "714048464",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "2798c88d1f869baf79e35af9ae67ac954f636857",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8051",
       "triggerID" : "2798c88d1f869baf79e35af9ae67ac954f636857",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "714048464",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "966c1aca195e5f2af7f32a485a48e0f5eebb35d0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8055",
       "triggerID" : "966c1aca195e5f2af7f32a485a48e0f5eebb35d0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "463b5c8ed21f93caaeb7b938aa9e72abb35619b2",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "714246544",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "",
       "status" : "CANCELED",
       "url" : "TBD",
       "triggerID" : "714246544",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "463b5c8ed21f93caaeb7b938aa9e72abb35619b2",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "463b5c8ed21f93caaeb7b938aa9e72abb35619b2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2ba5bda5c5a15c370c231e1cedfa03fd0bc0b41b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "2ba5bda5c5a15c370c231e1cedfa03fd0bc0b41b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   *  Unknown: [CANCELED](TBD) 
   * 463b5c8ed21f93caaeb7b938aa9e72abb35619b2 UNKNOWN
   * 2ba5bda5c5a15c370c231e1cedfa03fd0bc0b41b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] kl0u commented on a change in pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
kl0u commented on a change in pull request #13678:
URL: https://github.com/apache/flink/pull/13678#discussion_r507813676



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractStreamingCommitterOperator.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * Abstract base class for operators that work with a {@link Committer} or a {@link org.apache.flink.api.connector.sink.GlobalCommitter}.
+ *
+ * <p>Sub-classes are responsible for implementing {@link #recoveredCommittables(List)} and {@link #preCommit()}.
+ *
+ * @param <InputT> The input type of the {@link Committer}.
+ * @param <CommT> The committable type of the {@link Committer}.
+ */
+abstract class AbstractStreamingCommitterOperator<InputT, CommT> extends AbstractStreamOperator<CommT>
+		implements OneInputStreamOperator<InputT, CommT> {
+
+	private static final long serialVersionUID = 1L;
+
+	/** The operator's state descriptor. */
+	static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC =

Review comment:
       This can be `private`.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractStreamingCommitterOperator.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * Abstract base class for operators that work with a {@link Committer} or a {@link org.apache.flink.api.connector.sink.GlobalCommitter}.
+ *
+ * <p>Sub-classes are responsible for implementing {@link #recoveredCommittables(List)} and {@link #preCommit()}.
+ *
+ * @param <InputT> The input type of the {@link Committer}.
+ * @param <CommT> The committable type of the {@link Committer}.
+ */
+abstract class AbstractStreamingCommitterOperator<InputT, CommT> extends AbstractStreamOperator<CommT>
+		implements OneInputStreamOperator<InputT, CommT> {
+
+	private static final long serialVersionUID = 1L;
+
+	/** The operator's state descriptor. */
+	static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC =
+			new ListStateDescriptor<>(
+					"streaming_committer_raw_states",
+					BytePrimitiveArraySerializer.INSTANCE);
+
+	/** Group the committable by the checkpoint id. */
+	private final NavigableMap<Long, List<CommT>> committablesPerCheckpoint;
+
+	/** The committable's serializer. */
+	private final StreamingCommitterStateSerializer<CommT> streamingCommitterStateSerializer;
+
+	/** Responsible for committing the committable to the external system. **/
+	protected final Committer<CommT> committer;
+
+	/** The operator's state. */
+	private ListState<StreamingCommitterState<CommT>> streamingCommitterState;
+
+	/** Inputs collected between every pre-commit. */
+	protected List<InputT> currentInputs;
+
+	/**
+	 * Notify a list of committables that might need to be committed again after recovering from a failover.
+	 *
+	 * @param committables A list of committables
+	 */
+	abstract void recoveredCommittables(List<CommT> committables);
+
+	/**
+	 * Prepare a commit.
+	 *
+	 * @return A list of committables that could be committed in the following checkpoint complete.
+	 */
+	abstract List<CommT> preCommit();
+
+	AbstractStreamingCommitterOperator(
+			Committer<CommT> committer,
+			SimpleVersionedSerializer<CommT> committableSerializer) {
+		this.committer = committer;

Review comment:
       What about putting a `checkNotNull(committer)` to guarantee the invariants?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractStreamingCommitterOperator.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * Abstract base class for operators that work with a {@link Committer} or a {@link org.apache.flink.api.connector.sink.GlobalCommitter}.
+ *
+ * <p>Sub-classes are responsible for implementing {@link #recoveredCommittables(List)} and {@link #preCommit()}.
+ *
+ * @param <InputT> The input type of the {@link Committer}.
+ * @param <CommT> The committable type of the {@link Committer}.
+ */
+abstract class AbstractStreamingCommitterOperator<InputT, CommT> extends AbstractStreamOperator<CommT>
+		implements OneInputStreamOperator<InputT, CommT> {
+
+	private static final long serialVersionUID = 1L;
+
+	/** The operator's state descriptor. */
+	static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC =
+			new ListStateDescriptor<>(
+					"streaming_committer_raw_states",
+					BytePrimitiveArraySerializer.INSTANCE);
+
+	/** Group the committable by the checkpoint id. */
+	private final NavigableMap<Long, List<CommT>> committablesPerCheckpoint;
+
+	/** The committable's serializer. */
+	private final StreamingCommitterStateSerializer<CommT> streamingCommitterStateSerializer;
+
+	/** Responsible for committing the committable to the external system. **/
+	protected final Committer<CommT> committer;
+
+	/** The operator's state. */
+	private ListState<StreamingCommitterState<CommT>> streamingCommitterState;
+
+	/** Inputs collected between every pre-commit. */
+	protected List<InputT> currentInputs;
+
+	/**
+	 * Notify a list of committables that might need to be committed again after recovering from a failover.
+	 *
+	 * @param committables A list of committables
+	 */
+	abstract void recoveredCommittables(List<CommT> committables);
+
+	/**
+	 * Prepare a commit.
+	 *
+	 * @return A list of committables that could be committed in the following checkpoint complete.
+	 */
+	abstract List<CommT> preCommit();
+
+	AbstractStreamingCommitterOperator(
+			Committer<CommT> committer,
+			SimpleVersionedSerializer<CommT> committableSerializer) {
+		this.committer = committer;
+		this.streamingCommitterStateSerializer = new StreamingCommitterStateSerializer<>(
+				committableSerializer);
+		this.committablesPerCheckpoint = new TreeMap<>();
+		this.currentInputs = new ArrayList<>();
+	}
+
+	@Override
+	public void initializeState(StateInitializationContext context) throws Exception {
+		super.initializeState(context);
+		streamingCommitterState = new SimpleVersionedListState<>(
+				context.getOperatorStateStore().getListState(STREAMING_COMMITTER_RAW_STATES_DESC),
+				streamingCommitterStateSerializer);
+		final List<CommT> restored = new ArrayList<>();
+		streamingCommitterState.get().forEach(s -> restored.addAll(s.getCommittables()));
+		recoveredCommittables(restored);
+	}
+
+	@Override
+	public void processElement(StreamRecord<InputT> element) throws Exception {
+		currentInputs.add(element.getValue());
+	}
+
+	@Override
+	public void close() throws Exception {
+		super.close();
+		committer.close();
+	}
+
+	@Override
+	public void snapshotState(StateSnapshotContext context) throws Exception {
+		super.snapshotState(context);
+		committablesPerCheckpoint.put(context.getCheckpointId(), preCommit());
+		streamingCommitterState.update(
+				Collections.singletonList(new StreamingCommitterState<>(committablesPerCheckpoint)));

Review comment:
       With the change that @gaoyunhaii proposed, here we will also be able to clear the `currentInputs`. Something like:
   
   ```
   committablesPerCheckpoint.put(context.getCheckpointId(), preCommit(currentInputs));
   streamingCommitterState.update(
   				Collections.singletonList(new StreamingCommitterState<>(committablesPerCheckpoint)));
   currentInputs = new ArrayList<>();
   ```

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalStreamingCommitterOperator.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Runtime {@link org.apache.flink.streaming.api.operators.StreamOperator} for executing {@link GlobalCommitter}.
+ *
+ * @param <CommT> The committable type of the {@link GlobalCommitter}.
+ * @param <GlobalCommT> The global committable type of the {@link GlobalCommitter}.
+ */
+@Internal
+public final class GlobalStreamingCommitterOperator<CommT, GlobalCommT> extends AbstractStreamingCommitterOperator<CommT, GlobalCommT>
+		implements BoundedOneInput {
+
+	/** Aggregate committables to global committables and commit the global committables to the external system. */
+	private GlobalCommitter<CommT, GlobalCommT> globalCommitter;
+
+	/** The current pending global committables. */
+	private List<GlobalCommT> currentGlobalCommittables;
+
+	private boolean endOfInput;
+
+	GlobalStreamingCommitterOperator(
+			GlobalCommitter<CommT, GlobalCommT> globalCommitter,
+			SimpleVersionedSerializer<GlobalCommT> committableSerializer) {
+		super(globalCommitter, committableSerializer);
+		this.globalCommitter = globalCommitter;

Review comment:
       Again maybe add a `checkNotNull` for safety.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalStreamingCommitterOperator.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Runtime {@link org.apache.flink.streaming.api.operators.StreamOperator} for executing {@link GlobalCommitter}.
+ *
+ * @param <CommT> The committable type of the {@link GlobalCommitter}.
+ * @param <GlobalCommT> The global committable type of the {@link GlobalCommitter}.
+ */
+@Internal
+public final class GlobalStreamingCommitterOperator<CommT, GlobalCommT> extends AbstractStreamingCommitterOperator<CommT, GlobalCommT>
+		implements BoundedOneInput {
+
+	/** Aggregate committables to global committables and commit the global committables to the external system. */
+	private GlobalCommitter<CommT, GlobalCommT> globalCommitter;
+
+	/** The current pending global committables. */
+	private List<GlobalCommT> currentGlobalCommittables;
+
+	private boolean endOfInput;
+
+	GlobalStreamingCommitterOperator(
+			GlobalCommitter<CommT, GlobalCommT> globalCommitter,
+			SimpleVersionedSerializer<GlobalCommT> committableSerializer) {
+		super(globalCommitter, committableSerializer);
+		this.globalCommitter = globalCommitter;
+		this.endOfInput = false;
+	}
+
+	@Override
+	void recoveredCommittables(List<GlobalCommT> committables) {
+		this.currentGlobalCommittables = globalCommitter.filterRecoveredCommittables(committables);
+	}
+
+	@Override
+	List<GlobalCommT> preCommit() {
+		final List<GlobalCommT> result = computeCurrentGlobalCommittables();
+		currentInputs = new ArrayList<>();
+		currentGlobalCommittables = new ArrayList<>();
+		return result;
+	}
+
+	@Override
+	public void endInput() {
+		endOfInput = true;
+	}
+
+	@Override
+	public void notifyCheckpointComplete(long checkpointId) throws Exception {
+		super.notifyCheckpointComplete(checkpointId);
+		if (endOfInput) {
+			globalCommitter.endOfInput();
+		}
+	}
+
+	private List<GlobalCommT> computeCurrentGlobalCommittables() {
+		final List<GlobalCommT> result = new ArrayList<>();

Review comment:
       This could become: 
   
   ```
   final List<GlobalCommT> result = new ArrayList<>(currentGlobalCommittables);
   		if (!pendingCommittables.isEmpty()) {
   			final GlobalCommT globalCommittable = globalCommitter.combine(currentInputs); // this can change from previous comments
   			result.add(globalCommittable);
   		}
   		currentGlobalCommittables = new ArrayList<>();
   ```

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractStreamingCommitterOperator.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * Abstract base class for operators that work with a {@link Committer} or a {@link org.apache.flink.api.connector.sink.GlobalCommitter}.
+ *
+ * <p>Sub-classes are responsible for implementing {@link #recoveredCommittables(List)} and {@link #preCommit()}.
+ *
+ * @param <InputT> The input type of the {@link Committer}.
+ * @param <CommT> The committable type of the {@link Committer}.
+ */
+abstract class AbstractStreamingCommitterOperator<InputT, CommT> extends AbstractStreamOperator<CommT>
+		implements OneInputStreamOperator<InputT, CommT> {
+
+	private static final long serialVersionUID = 1L;
+
+	/** The operator's state descriptor. */
+	static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC =
+			new ListStateDescriptor<>(
+					"streaming_committer_raw_states",
+					BytePrimitiveArraySerializer.INSTANCE);
+
+	/** Group the committable by the checkpoint id. */
+	private final NavigableMap<Long, List<CommT>> committablesPerCheckpoint;
+
+	/** The committable's serializer. */
+	private final StreamingCommitterStateSerializer<CommT> streamingCommitterStateSerializer;
+
+	/** Responsible for committing the committable to the external system. **/
+	protected final Committer<CommT> committer;
+
+	/** The operator's state. */
+	private ListState<StreamingCommitterState<CommT>> streamingCommitterState;
+
+	/** Inputs collected between every pre-commit. */
+	protected List<InputT> currentInputs;

Review comment:
       Big +1 on the comment from @gaoyunhaii to make the `currentInputs` `private` and pass it on as argument in the `preCommit`.  

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StreamingCommitterState.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.api.connector.sink.Committer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+
+/**
+ * The state fo the {@link AbstractStreamingCommitterOperator}.
+ *
+ * @param <CommT> The committable type of the {@link Committer}.
+ */
+final class StreamingCommitterState<CommT> {
+
+	private final List<CommT> committables;
+
+	StreamingCommitterState(List<CommT> committables) {
+		this.committables = committables;

Review comment:
       `checkNotNull`?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StreamingCommitterStateSerializer.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The serializer for the {@link StreamingCommitterState}.
+ */
+
+final class StreamingCommitterStateSerializer<CommT> implements SimpleVersionedSerializer<StreamingCommitterState<CommT>> {
+
+	private static final int MAGIC_NUMBER = 0xb91f252c;
+
+	private final SimpleVersionedSerializer<CommT> committableSerializer;
+
+	StreamingCommitterStateSerializer(SimpleVersionedSerializer<CommT> committableSerializer) {
+		this.committableSerializer = committableSerializer;

Review comment:
       `checkNotNull`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13678:
URL: https://github.com/apache/flink/pull/13678#issuecomment-711439155


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7816",
       "triggerID" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7876",
       "triggerID" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7892",
       "triggerID" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7904",
       "triggerID" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1816d296b693f771d3a5e0cf417bd1f30f302aad",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7955",
       "triggerID" : "1816d296b693f771d3a5e0cf417bd1f30f302aad",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc7d36d9e7e23f81de0d5cd090fbb818f41f5edd",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8038",
       "triggerID" : "fc7d36d9e7e23f81de0d5cd090fbb818f41f5edd",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * fc7d36d9e7e23f81de0d5cd090fbb818f41f5edd Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8038) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13678:
URL: https://github.com/apache/flink/pull/13678#issuecomment-711439155


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7816",
       "triggerID" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7876",
       "triggerID" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7892",
       "triggerID" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b31d0fdb8b5336a91231fad0db55afc072400932 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7876) 
   * 4873fb92710324b9d47682dcdfb707815d71bfc3 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7892) 
   * 82614f698d02d8ce751d0fad7820b85c12e54fff UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] kl0u closed pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
kl0u closed pull request #13678:
URL: https://github.com/apache/flink/pull/13678


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13678:
URL: https://github.com/apache/flink/pull/13678#issuecomment-711439155


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7816",
       "triggerID" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7876",
       "triggerID" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b31d0fdb8b5336a91231fad0db55afc072400932 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7876) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13678:
URL: https://github.com/apache/flink/pull/13678#issuecomment-711439155


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7816",
       "triggerID" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7876",
       "triggerID" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7892",
       "triggerID" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7904",
       "triggerID" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1816d296b693f771d3a5e0cf417bd1f30f302aad",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7955",
       "triggerID" : "1816d296b693f771d3a5e0cf417bd1f30f302aad",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc7d36d9e7e23f81de0d5cd090fbb818f41f5edd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8038",
       "triggerID" : "fc7d36d9e7e23f81de0d5cd090fbb818f41f5edd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2798c88d1f869baf79e35af9ae67ac954f636857",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "714048464",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "",
       "status" : "CANCELED",
       "url" : "TBD",
       "triggerID" : "714048464",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "2798c88d1f869baf79e35af9ae67ac954f636857",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "2798c88d1f869baf79e35af9ae67ac954f636857",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   *  Unknown: [CANCELED](TBD) 
   * 2798c88d1f869baf79e35af9ae67ac954f636857 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13678:
URL: https://github.com/apache/flink/pull/13678#issuecomment-711439155


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7816",
       "triggerID" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7876",
       "triggerID" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7892",
       "triggerID" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7904",
       "triggerID" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1816d296b693f771d3a5e0cf417bd1f30f302aad",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "1816d296b693f771d3a5e0cf417bd1f30f302aad",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 82614f698d02d8ce751d0fad7820b85c12e54fff Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7904) 
   * 1816d296b693f771d3a5e0cf417bd1f30f302aad UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13678:
URL: https://github.com/apache/flink/pull/13678#issuecomment-711439155


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7816",
       "triggerID" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7876",
       "triggerID" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7892",
       "triggerID" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7904",
       "triggerID" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1816d296b693f771d3a5e0cf417bd1f30f302aad",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7955",
       "triggerID" : "1816d296b693f771d3a5e0cf417bd1f30f302aad",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc7d36d9e7e23f81de0d5cd090fbb818f41f5edd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8038",
       "triggerID" : "fc7d36d9e7e23f81de0d5cd090fbb818f41f5edd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2798c88d1f869baf79e35af9ae67ac954f636857",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8051",
       "triggerID" : "714048464",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "2798c88d1f869baf79e35af9ae67ac954f636857",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8051",
       "triggerID" : "2798c88d1f869baf79e35af9ae67ac954f636857",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "714048464",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "966c1aca195e5f2af7f32a485a48e0f5eebb35d0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8055",
       "triggerID" : "966c1aca195e5f2af7f32a485a48e0f5eebb35d0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "463b5c8ed21f93caaeb7b938aa9e72abb35619b2",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "714246544",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "463b5c8ed21f93caaeb7b938aa9e72abb35619b2",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "463b5c8ed21f93caaeb7b938aa9e72abb35619b2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2ba5bda5c5a15c370c231e1cedfa03fd0bc0b41b",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8076",
       "triggerID" : "2ba5bda5c5a15c370c231e1cedfa03fd0bc0b41b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "714246544",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "9d7e355abedbffb48ef6fa865a3606148d2207d2",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8148",
       "triggerID" : "9d7e355abedbffb48ef6fa865a3606148d2207d2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 463b5c8ed21f93caaeb7b938aa9e72abb35619b2 UNKNOWN
   * 2ba5bda5c5a15c370c231e1cedfa03fd0bc0b41b Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8076) 
   * 9d7e355abedbffb48ef6fa865a3606148d2207d2 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8148) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13678:
URL: https://github.com/apache/flink/pull/13678#issuecomment-711439155


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7816",
       "triggerID" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7876",
       "triggerID" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7892",
       "triggerID" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7904",
       "triggerID" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1816d296b693f771d3a5e0cf417bd1f30f302aad",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7955",
       "triggerID" : "1816d296b693f771d3a5e0cf417bd1f30f302aad",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 82614f698d02d8ce751d0fad7820b85c12e54fff Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7904) 
   * 1816d296b693f771d3a5e0cf417bd1f30f302aad Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7955) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13678:
URL: https://github.com/apache/flink/pull/13678#issuecomment-711438275


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 44eb4690cd88de72a900fbf6eebbcddb38f5e684 (Sun Oct 18 23:13:07 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13678:
URL: https://github.com/apache/flink/pull/13678#issuecomment-711439155


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7816",
       "triggerID" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7876",
       "triggerID" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7892",
       "triggerID" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7904",
       "triggerID" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1816d296b693f771d3a5e0cf417bd1f30f302aad",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7955",
       "triggerID" : "1816d296b693f771d3a5e0cf417bd1f30f302aad",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc7d36d9e7e23f81de0d5cd090fbb818f41f5edd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8038",
       "triggerID" : "fc7d36d9e7e23f81de0d5cd090fbb818f41f5edd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2798c88d1f869baf79e35af9ae67ac954f636857",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8051",
       "triggerID" : "714048464",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "2798c88d1f869baf79e35af9ae67ac954f636857",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8051",
       "triggerID" : "2798c88d1f869baf79e35af9ae67ac954f636857",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "714048464",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "966c1aca195e5f2af7f32a485a48e0f5eebb35d0",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "966c1aca195e5f2af7f32a485a48e0f5eebb35d0",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2798c88d1f869baf79e35af9ae67ac954f636857 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8051) 
   * 966c1aca195e5f2af7f32a485a48e0f5eebb35d0 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] guoweiM commented on a change in pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
guoweiM commented on a change in pull request #13678:
URL: https://github.com/apache/flink/pull/13678#discussion_r508509632



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractStreamingCommitterOperator.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Abstract base class for operators that work with a {@link Committer} or a
+ * {@link org.apache.flink.api.connector.sink.GlobalCommitter} in the streaming execution mode.
+ *
+ * <p>Sub-classes are responsible for implementing {@link #recoveredCommittables(List)} and {@link #preCommit(List)}.
+ *
+ * @param <InputT> The input type of the {@link Committer}.
+ * @param <CommT> The committable type of the {@link Committer}.
+ */
+abstract class AbstractStreamingCommitterOperator<InputT, CommT> extends AbstractStreamOperator<CommT>
+		implements OneInputStreamOperator<InputT, CommT> {
+
+	private static final long serialVersionUID = 1L;
+
+	/** The operator's state descriptor. */
+	private static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC =
+			new ListStateDescriptor<>(
+					"streaming_committer_raw_states",
+					BytePrimitiveArraySerializer.INSTANCE);
+
+	/** Group the committable by the checkpoint id. */
+	private final NavigableMap<Long, List<CommT>> committablesPerCheckpoint;
+
+	/** The committable's serializer. */
+	private final StreamingCommitterStateSerializer<CommT> streamingCommitterStateSerializer;
+
+	/** Responsible for committing the committable to the external system. **/
+	protected final Committer<CommT> committer;

Review comment:
       You raise a good point. I would like to open a following pr for this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] kl0u commented on a change in pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
kl0u commented on a change in pull request #13678:
URL: https://github.com/apache/flink/pull/13678#discussion_r508475873



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractStreamingCommitterOperator.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Abstract base class for operators that work with a {@link Committer} or a
+ * {@link org.apache.flink.api.connector.sink.GlobalCommitter} in the streaming execution mode.
+ *
+ * <p>Sub-classes are responsible for implementing {@link #recoveredCommittables(List)} and {@link #preCommit(List)}.
+ *
+ * @param <InputT> The input type of the {@link Committer}.
+ * @param <CommT> The committable type of the {@link Committer}.
+ */
+abstract class AbstractStreamingCommitterOperator<InputT, CommT> extends AbstractStreamOperator<CommT>
+		implements OneInputStreamOperator<InputT, CommT> {
+
+	private static final long serialVersionUID = 1L;
+
+	/** The operator's state descriptor. */
+	private static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC =
+			new ListStateDescriptor<>(
+					"streaming_committer_raw_states",
+					BytePrimitiveArraySerializer.INSTANCE);
+
+	/** Group the committable by the checkpoint id. */
+	private final NavigableMap<Long, List<CommT>> committablesPerCheckpoint;
+
+	/** The committable's serializer. */
+	private final StreamingCommitterStateSerializer<CommT> streamingCommitterStateSerializer;
+
+	/** Responsible for committing the committable to the external system. **/
+	protected final Committer<CommT> committer;
+
+	/** The operator's state. */
+	private ListState<StreamingCommitterState<CommT>> streamingCommitterState;
+
+	/** Inputs collected between every pre-commit. */
+	private List<InputT> currentInputs;
+
+	/**
+	 * Notifies a list of committables that might need to be committed again after recovering from a failover.
+	 *
+	 * @param committables A list of committables
+	 */
+	abstract void recoveredCommittables(List<CommT> committables);
+
+	/**
+	 * Prepares a commit.
+	 *
+	 * @param input A list of input elements received since last pre-commit
+	 *
+	 * @return @return A list of committables that could be committed in the following checkpoint complete.
+	 */
+	abstract List<CommT> preCommit(List<InputT> input);
+
+	AbstractStreamingCommitterOperator(
+			Committer<CommT> committer,
+			SimpleVersionedSerializer<CommT> committableSerializer) {
+		this.committer = checkNotNull(committer);
+		this.streamingCommitterStateSerializer = new StreamingCommitterStateSerializer<>(
+				checkNotNull(committableSerializer));
+		this.committablesPerCheckpoint = new TreeMap<>();
+		this.currentInputs = new ArrayList<>();
+	}
+
+	@Override
+	public void initializeState(StateInitializationContext context) throws Exception {
+		super.initializeState(context);
+		streamingCommitterState = new SimpleVersionedListState<>(
+				context.getOperatorStateStore().getListState(STREAMING_COMMITTER_RAW_STATES_DESC),
+				streamingCommitterStateSerializer);
+		final List<CommT> restored = new ArrayList<>();
+		streamingCommitterState.get().forEach(s -> restored.addAll(s.getCommittables()));
+		recoveredCommittables(restored);
+	}
+
+	@Override
+	public void processElement(StreamRecord<InputT> element) throws Exception {
+		currentInputs.add(element.getValue());
+	}
+
+	@Override
+	public void close() throws Exception {
+		super.close();
+		committer.close();
+	}
+
+	@Override
+	public void snapshotState(StateSnapshotContext context) throws Exception {
+		super.snapshotState(context);
+		committablesPerCheckpoint.put(context.getCheckpointId(), preCommit(currentInputs));
+		streamingCommitterState.update(
+				Collections.singletonList(new StreamingCommitterState<>(committablesPerCheckpoint)));
+		currentInputs = new ArrayList<>();
+	}
+
+	@Override
+	public void notifyCheckpointComplete(long checkpointId) throws Exception {
+		super.notifyCheckpointComplete(checkpointId);
+		commitUpTo(checkpointId);
+	}
+
+	private void commitUpTo(long checkpointId) throws Exception {
+		final Iterator<Map.Entry<Long, List<CommT>>>
+				it = committablesPerCheckpoint.headMap(checkpointId, true).entrySet().iterator();
+
+		while (it.hasNext()) {
+			final Map.Entry<Long, List<CommT>> entry = it.next();
+			final List<CommT> neededToRetryCommittables = committer.commit(entry.getValue());

Review comment:
       Here we can get the values (`List<CommT> committables = entry.getValue();`)  and reuse them, and not call `entry.getValue()` multiple times.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalStreamingCommitterOperator.java
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Runtime {@link org.apache.flink.streaming.api.operators.StreamOperator} for executing
+ * {@link GlobalCommitter} in the streaming execution mode.
+ *
+ * @param <CommT> The committable type of the {@link GlobalCommitter}.
+ * @param <GlobalCommT> The global committable type of the {@link GlobalCommitter}.
+ */
+@Internal
+public final class GlobalStreamingCommitterOperator<CommT, GlobalCommT> extends AbstractStreamingCommitterOperator<CommT, GlobalCommT>
+		implements BoundedOneInput {
+
+	/** Aggregate committables to global committables and commit the global committables to the external system. */
+	private final GlobalCommitter<CommT, GlobalCommT> globalCommitter;
+
+	/** The global committables that might need to be committed again after recovering from a failover. */
+	private List<GlobalCommT> recoveredGlobalCommittables;

Review comment:
       I think the code would be easier if this becomes `final` and we do not set it to `null` but we call `clear()` whenever we want to re-initialize it. This will remove all the checks about `if (mylist == null) ...`.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractStreamingCommitterOperator.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Abstract base class for operators that work with a {@link Committer} or a
+ * {@link org.apache.flink.api.connector.sink.GlobalCommitter} in the streaming execution mode.
+ *
+ * <p>Sub-classes are responsible for implementing {@link #recoveredCommittables(List)} and {@link #preCommit(List)}.
+ *
+ * @param <InputT> The input type of the {@link Committer}.
+ * @param <CommT> The committable type of the {@link Committer}.
+ */
+abstract class AbstractStreamingCommitterOperator<InputT, CommT> extends AbstractStreamOperator<CommT>
+		implements OneInputStreamOperator<InputT, CommT> {
+
+	private static final long serialVersionUID = 1L;
+
+	/** The operator's state descriptor. */
+	private static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC =
+			new ListStateDescriptor<>(
+					"streaming_committer_raw_states",
+					BytePrimitiveArraySerializer.INSTANCE);
+
+	/** Group the committable by the checkpoint id. */
+	private final NavigableMap<Long, List<CommT>> committablesPerCheckpoint;
+
+	/** The committable's serializer. */
+	private final StreamingCommitterStateSerializer<CommT> streamingCommitterStateSerializer;
+
+	/** Responsible for committing the committable to the external system. **/
+	protected final Committer<CommT> committer;

Review comment:
       I would propose to make this `private` and add a getter that is visible for testing. In general I am not so much in favour of `@VisibleForTesting` things and I would prefer if we passed a custom `Committer` in the test harness and use that to get whatever internal information we want, but I understand that this may require refactoring the tests which may take some time. 
   
   But that can be a thing to keep it in the list of future things to do :)

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StreamingCommitterOperator.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Runtime {@link org.apache.flink.streaming.api.operators.StreamOperator} for executing
+ * {@link Committer} in the streaming execution mode.
+ *
+ * @param <CommT> The committable type of the {@link Committer}.
+ */
+final class StreamingCommitterOperator<CommT> extends AbstractStreamingCommitterOperator<CommT, CommT> {
+
+	/** The committables that might need to be committed again after recovering from a failover. */
+	private List<CommT> recoveredCommittables;

Review comment:
       As in the global committer, I would make this `final` and simply clear it up whenever we want to re-initialize it. I think that checking for `null` always leaves a possibility for error in the future, when someone else will touch this code.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractStreamingCommitterOperator.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Abstract base class for operators that work with a {@link Committer} or a
+ * {@link org.apache.flink.api.connector.sink.GlobalCommitter} in the streaming execution mode.
+ *
+ * <p>Sub-classes are responsible for implementing {@link #recoveredCommittables(List)} and {@link #preCommit(List)}.
+ *
+ * @param <InputT> The input type of the {@link Committer}.
+ * @param <CommT> The committable type of the {@link Committer}.
+ */
+abstract class AbstractStreamingCommitterOperator<InputT, CommT> extends AbstractStreamOperator<CommT>
+		implements OneInputStreamOperator<InputT, CommT> {
+
+	private static final long serialVersionUID = 1L;
+
+	/** The operator's state descriptor. */
+	private static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC =
+			new ListStateDescriptor<>(
+					"streaming_committer_raw_states",
+					BytePrimitiveArraySerializer.INSTANCE);
+
+	/** Group the committable by the checkpoint id. */
+	private final NavigableMap<Long, List<CommT>> committablesPerCheckpoint;
+
+	/** The committable's serializer. */
+	private final StreamingCommitterStateSerializer<CommT> streamingCommitterStateSerializer;
+
+	/** Responsible for committing the committable to the external system. **/
+	protected final Committer<CommT> committer;
+
+	/** The operator's state. */
+	private ListState<StreamingCommitterState<CommT>> streamingCommitterState;
+
+	/** Inputs collected between every pre-commit. */
+	private List<InputT> currentInputs;
+
+	/**
+	 * Notifies a list of committables that might need to be committed again after recovering from a failover.
+	 *
+	 * @param committables A list of committables
+	 */
+	abstract void recoveredCommittables(List<CommT> committables);
+
+	/**
+	 * Prepares a commit.
+	 *
+	 * @param input A list of input elements received since last pre-commit
+	 *
+	 * @return @return A list of committables that could be committed in the following checkpoint complete.
+	 */
+	abstract List<CommT> preCommit(List<InputT> input);
+
+	AbstractStreamingCommitterOperator(
+			Committer<CommT> committer,
+			SimpleVersionedSerializer<CommT> committableSerializer) {
+		this.committer = checkNotNull(committer);
+		this.streamingCommitterStateSerializer = new StreamingCommitterStateSerializer<>(
+				checkNotNull(committableSerializer));

Review comment:
       We do not need `checkNotNull` here because we check in the `StreamingCommitterStateSerializer` constructor.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalStreamingCommitterOperator.java
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Runtime {@link org.apache.flink.streaming.api.operators.StreamOperator} for executing
+ * {@link GlobalCommitter} in the streaming execution mode.
+ *
+ * @param <CommT> The committable type of the {@link GlobalCommitter}.
+ * @param <GlobalCommT> The global committable type of the {@link GlobalCommitter}.
+ */
+@Internal
+public final class GlobalStreamingCommitterOperator<CommT, GlobalCommT> extends AbstractStreamingCommitterOperator<CommT, GlobalCommT>
+		implements BoundedOneInput {
+
+	/** Aggregate committables to global committables and commit the global committables to the external system. */
+	private final GlobalCommitter<CommT, GlobalCommT> globalCommitter;
+
+	/** The global committables that might need to be committed again after recovering from a failover. */
+	private List<GlobalCommT> recoveredGlobalCommittables;
+
+	private boolean endOfInput;
+
+	GlobalStreamingCommitterOperator(
+			GlobalCommitter<CommT, GlobalCommT> globalCommitter,
+			SimpleVersionedSerializer<GlobalCommT> committableSerializer) {
+		super(globalCommitter, committableSerializer);
+		this.globalCommitter = checkNotNull(globalCommitter);
+		this.endOfInput = false;
+	}
+
+	@Override
+	void recoveredCommittables(List<GlobalCommT> committables) {
+		recoveredGlobalCommittables = globalCommitter.filterRecoveredCommittables(committables);
+	}
+
+	@Override
+	List<GlobalCommT> preCommit(List<CommT> input) {
+		final List<GlobalCommT> result = new ArrayList<>();
+		if (recoveredGlobalCommittables != null && !recoveredGlobalCommittables.isEmpty()) {

Review comment:
       With the above changes, this can become :
   
   ```
   final List<GlobalCommT> result =
   				new ArrayList<>(recoveredGlobalCommittables);
   recoveredGlobalCommittables.clear();
   
   if (!input.isEmpty()) {
   	result.add(globalCommitter.combine(input));
   }
   ```

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractStreamingCommitterOperator.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Abstract base class for operators that work with a {@link Committer} or a
+ * {@link org.apache.flink.api.connector.sink.GlobalCommitter} in the streaming execution mode.
+ *
+ * <p>Sub-classes are responsible for implementing {@link #recoveredCommittables(List)} and {@link #preCommit(List)}.
+ *
+ * @param <InputT> The input type of the {@link Committer}.
+ * @param <CommT> The committable type of the {@link Committer}.
+ */
+abstract class AbstractStreamingCommitterOperator<InputT, CommT> extends AbstractStreamOperator<CommT>
+		implements OneInputStreamOperator<InputT, CommT> {
+
+	private static final long serialVersionUID = 1L;
+
+	/** The operator's state descriptor. */
+	private static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC =
+			new ListStateDescriptor<>(
+					"streaming_committer_raw_states",
+					BytePrimitiveArraySerializer.INSTANCE);
+
+	/** Group the committable by the checkpoint id. */
+	private final NavigableMap<Long, List<CommT>> committablesPerCheckpoint;
+
+	/** The committable's serializer. */
+	private final StreamingCommitterStateSerializer<CommT> streamingCommitterStateSerializer;
+
+	/** Responsible for committing the committable to the external system. **/
+	protected final Committer<CommT> committer;
+
+	/** The operator's state. */
+	private ListState<StreamingCommitterState<CommT>> streamingCommitterState;
+
+	/** Inputs collected between every pre-commit. */
+	private List<InputT> currentInputs;
+
+	/**
+	 * Notifies a list of committables that might need to be committed again after recovering from a failover.
+	 *
+	 * @param committables A list of committables
+	 */
+	abstract void recoveredCommittables(List<CommT> committables);
+
+	/**
+	 * Prepares a commit.
+	 *
+	 * @param input A list of input elements received since last pre-commit
+	 *
+	 * @return @return A list of committables that could be committed in the following checkpoint complete.
+	 */
+	abstract List<CommT> preCommit(List<InputT> input);
+
+	AbstractStreamingCommitterOperator(
+			Committer<CommT> committer,
+			SimpleVersionedSerializer<CommT> committableSerializer) {
+		this.committer = checkNotNull(committer);
+		this.streamingCommitterStateSerializer = new StreamingCommitterStateSerializer<>(
+				checkNotNull(committableSerializer));
+		this.committablesPerCheckpoint = new TreeMap<>();
+		this.currentInputs = new ArrayList<>();
+	}
+
+	@Override
+	public void initializeState(StateInitializationContext context) throws Exception {
+		super.initializeState(context);
+		streamingCommitterState = new SimpleVersionedListState<>(
+				context.getOperatorStateStore().getListState(STREAMING_COMMITTER_RAW_STATES_DESC),
+				streamingCommitterStateSerializer);
+		final List<CommT> restored = new ArrayList<>();
+		streamingCommitterState.get().forEach(s -> restored.addAll(s.getCommittables()));
+		recoveredCommittables(restored);
+	}
+
+	@Override
+	public void processElement(StreamRecord<InputT> element) throws Exception {
+		currentInputs.add(element.getValue());
+	}
+
+	@Override
+	public void close() throws Exception {
+		super.close();
+		committer.close();
+	}
+
+	@Override
+	public void snapshotState(StateSnapshotContext context) throws Exception {
+		super.snapshotState(context);
+		committablesPerCheckpoint.put(context.getCheckpointId(), preCommit(currentInputs));
+		streamingCommitterState.update(
+				Collections.singletonList(new StreamingCommitterState<>(committablesPerCheckpoint)));
+		currentInputs = new ArrayList<>();
+	}
+
+	@Override
+	public void notifyCheckpointComplete(long checkpointId) throws Exception {
+		super.notifyCheckpointComplete(checkpointId);
+		commitUpTo(checkpointId);
+	}
+
+	private void commitUpTo(long checkpointId) throws Exception {
+		final Iterator<Map.Entry<Long, List<CommT>>>
+				it = committablesPerCheckpoint.headMap(checkpointId, true).entrySet().iterator();
+
+		while (it.hasNext()) {
+			final Map.Entry<Long, List<CommT>> entry = it.next();
+			final List<CommT> neededToRetryCommittables = committer.commit(entry.getValue());
+
+			if (!neededToRetryCommittables.isEmpty()) {
+				throw new UnsupportedOperationException("Currently does not support the re-commit!");
+			}
+

Review comment:
       What about logging here that we are `committing state for checkpoint XXXX`. This may help if something goes wrong.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalStreamingCommitterOperator.java
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Runtime {@link org.apache.flink.streaming.api.operators.StreamOperator} for executing
+ * {@link GlobalCommitter} in the streaming execution mode.
+ *
+ * @param <CommT> The committable type of the {@link GlobalCommitter}.
+ * @param <GlobalCommT> The global committable type of the {@link GlobalCommitter}.
+ */
+@Internal
+public final class GlobalStreamingCommitterOperator<CommT, GlobalCommT> extends AbstractStreamingCommitterOperator<CommT, GlobalCommT>
+		implements BoundedOneInput {
+
+	/** Aggregate committables to global committables and commit the global committables to the external system. */
+	private final GlobalCommitter<CommT, GlobalCommT> globalCommitter;
+
+	/** The global committables that might need to be committed again after recovering from a failover. */
+	private List<GlobalCommT> recoveredGlobalCommittables;
+
+	private boolean endOfInput;
+
+	GlobalStreamingCommitterOperator(
+			GlobalCommitter<CommT, GlobalCommT> globalCommitter,
+			SimpleVersionedSerializer<GlobalCommT> committableSerializer) {
+		super(globalCommitter, committableSerializer);
+		this.globalCommitter = checkNotNull(globalCommitter);
+		this.endOfInput = false;
+	}
+
+	@Override
+	void recoveredCommittables(List<GlobalCommT> committables) {
+		recoveredGlobalCommittables = globalCommitter.filterRecoveredCommittables(committables);

Review comment:
       Here we should check that committbles are not `null` to be safe. In addition, I would propose to not reuse the same list that we expose to the users but we copy the result of the `filterRecoveredCommittables()` in our own list.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] kl0u commented on a change in pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
kl0u commented on a change in pull request #13678:
URL: https://github.com/apache/flink/pull/13678#discussion_r507835140



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StreamingCommitterOperatorFactory.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+
+/**
+ * A {@link org.apache.flink.streaming.api.operators.StreamOperatorFactory} for {@link StreamingCommitterOperator}.
+ *
+ * @param <CommT> The committable type of the {@link Committer}.
+ */
+@Internal
+public class StreamingCommitterOperatorFactory<CommT> extends AbstractStreamingCommitterOperatorFactory<CommT, CommT> {
+
+	private final Sink<?, CommT, ?, ?> sink;
+
+	public StreamingCommitterOperatorFactory(Sink<?, CommT, ?, ?> sink) {
+		this.sink = sink;
+	}
+
+	@Override
+	AbstractStreamingCommitterOperator<CommT, CommT> createStreamingCommitterOperator() {
+		return new StreamingCommitterOperator<>(
+				sink.createCommitter()
+						.orElseThrow(() -> new IllegalArgumentException(
+								"Could not create committer from the sink")),
+				sink.getCommittableSerializer()
+						.orElseThrow(() -> new IllegalArgumentException(
+								"Could not get committable serializer from the sink")));
+	}
+
+	@Override
+	public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
+		return AbstractStreamingCommitterOperator.class;

Review comment:
       Here it should be `StreamingCommitterOperator`, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] guoweiM commented on pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
guoweiM commented on pull request #13678:
URL: https://github.com/apache/flink/pull/13678#issuecomment-714048464


   @flinkbot run azure


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on a change in pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on a change in pull request #13678:
URL: https://github.com/apache/flink/pull/13678#discussion_r507586576



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalStreamingCommitterOperator.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Runtime {@link org.apache.flink.streaming.api.operators.StreamOperator} for executing {@link GlobalCommitter}.
+ *
+ * @param <CommT> The committable type of the {@link GlobalCommitter}.
+ * @param <GlobalCommT> The global committable type of the {@link GlobalCommitter}.
+ */
+@Internal
+public final class GlobalStreamingCommitterOperator<CommT, GlobalCommT> extends AbstractStreamingCommitterOperator<CommT, GlobalCommT>
+		implements BoundedOneInput {
+
+	/** Aggregate committables to global committables and commit the global committables to the external system. */
+	private GlobalCommitter<CommT, GlobalCommT> globalCommitter;
+
+	/** The current pending global committables. */
+	private List<GlobalCommT> currentGlobalCommittables;
+
+	private boolean endOfInput;
+
+	GlobalStreamingCommitterOperator(
+			GlobalCommitter<CommT, GlobalCommT> globalCommitter,
+			SimpleVersionedSerializer<GlobalCommT> committableSerializer) {
+		super(globalCommitter, committableSerializer);
+		this.globalCommitter = globalCommitter;
+		this.endOfInput = false;
+	}
+
+	@Override
+	void recoveredCommittables(List<GlobalCommT> committables) {
+		this.currentGlobalCommittables = globalCommitter.filterRecoveredCommittables(committables);

Review comment:
       Remove `this.` if not needed unless we are in constructor




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13678:
URL: https://github.com/apache/flink/pull/13678#issuecomment-711439155


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7816",
       "triggerID" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7876",
       "triggerID" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b31d0fdb8b5336a91231fad0db55afc072400932 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7876) 
   * 4873fb92710324b9d47682dcdfb707815d71bfc3 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] guoweiM commented on a change in pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
guoweiM commented on a change in pull request #13678:
URL: https://github.com/apache/flink/pull/13678#discussion_r507869520



##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/StreamingCommitterStateSerializerTest.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Test {@link StreamingCommitterStateSerializer}.
+ */
+public class StreamingCommitterStateSerializerTest {
+
+	@Test
+	public void serializeNonEmptyState() throws IOException {
+		final StreamingCommitterState<String> expectedStreamingCommitterState =
+				new StreamingCommitterState<>(Arrays.asList("city", "great", "temper", "valley"));
+		final StreamingCommitterStateSerializer<String> streamingCommitterStateSerializer =
+				new StreamingCommitterStateSerializer<>(SimpleVersionedStringSerializer.INSTANCE);
+
+		final byte[] serialize = streamingCommitterStateSerializer.serialize(
+				expectedStreamingCommitterState);
+		final StreamingCommitterState<String> streamingCommitterState =
+				streamingCommitterStateSerializer.deserialize(
+						streamingCommitterStateSerializer.getVersion(),
+						serialize);
+
+		assertThat(

Review comment:
       I prefer to all use the `assertThat`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on a change in pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on a change in pull request #13678:
URL: https://github.com/apache/flink/pull/13678#discussion_r507569024



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractStreamingCommitterOperator.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * Abstract base class for operators that work with a {@link Committer} or a {@link org.apache.flink.api.connector.sink.GlobalCommitter}.
+ *
+ * <p>Sub-classes are responsible for implementing {@link #recoveredCommittables(List)} and {@link #preCommit()}.
+ *
+ * @param <InputT> The input type of the {@link Committer}.
+ * @param <CommT> The committable type of the {@link Committer}.
+ */
+abstract class AbstractStreamingCommitterOperator<InputT, CommT> extends AbstractStreamOperator<CommT>
+		implements OneInputStreamOperator<InputT, CommT> {
+
+	private static final long serialVersionUID = 1L;
+
+	/** The operator's state descriptor. */
+	static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC =
+			new ListStateDescriptor<>(
+					"streaming_committer_raw_states",
+					BytePrimitiveArraySerializer.INSTANCE);
+
+	/** Group the committable by the checkpoint id. */
+	private final NavigableMap<Long, List<CommT>> committablesPerCheckpoint;
+
+	/** The committable's serializer. */
+	private final StreamingCommitterStateSerializer<CommT> streamingCommitterStateSerializer;
+
+	/** Responsible for committing the committable to the external system. **/
+	protected final Committer<CommT> committer;
+
+	/** The operator's state. */
+	private ListState<StreamingCommitterState<CommT>> streamingCommitterState;
+
+	/** Inputs collected between every pre-commit. */
+	protected List<InputT> currentInputs;
+
+	/**
+	 * Notify a list of committables that might need to be committed again after recovering from a failover.

Review comment:
       One small format problem is that it seems we are always using "Third person singular" for method/class comments starts with verbs, namely we should use `Notifies` instead of `Notify` here and similarly for the other comments in the PR. 

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalStreamingCommitterOperator.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Runtime {@link org.apache.flink.streaming.api.operators.StreamOperator} for executing {@link GlobalCommitter}.
+ *
+ * @param <CommT> The committable type of the {@link GlobalCommitter}.
+ * @param <GlobalCommT> The global committable type of the {@link GlobalCommitter}.
+ */
+@Internal
+public final class GlobalStreamingCommitterOperator<CommT, GlobalCommT> extends AbstractStreamingCommitterOperator<CommT, GlobalCommT>
+		implements BoundedOneInput {
+
+	/** Aggregate committables to global committables and commit the global committables to the external system. */
+	private GlobalCommitter<CommT, GlobalCommT> globalCommitter;

Review comment:
       could be `final`

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractStreamingCommitterOperator.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * Abstract base class for operators that work with a {@link Committer} or a {@link org.apache.flink.api.connector.sink.GlobalCommitter}.
+ *
+ * <p>Sub-classes are responsible for implementing {@link #recoveredCommittables(List)} and {@link #preCommit()}.
+ *
+ * @param <InputT> The input type of the {@link Committer}.
+ * @param <CommT> The committable type of the {@link Committer}.
+ */
+abstract class AbstractStreamingCommitterOperator<InputT, CommT> extends AbstractStreamOperator<CommT>
+		implements OneInputStreamOperator<InputT, CommT> {
+
+	private static final long serialVersionUID = 1L;
+
+	/** The operator's state descriptor. */
+	static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC =
+			new ListStateDescriptor<>(
+					"streaming_committer_raw_states",
+					BytePrimitiveArraySerializer.INSTANCE);
+
+	/** Group the committable by the checkpoint id. */
+	private final NavigableMap<Long, List<CommT>> committablesPerCheckpoint;
+
+	/** The committable's serializer. */
+	private final StreamingCommitterStateSerializer<CommT> streamingCommitterStateSerializer;
+
+	/** Responsible for committing the committable to the external system. **/
+	protected final Committer<CommT> committer;
+
+	/** The operator's state. */
+	private ListState<StreamingCommitterState<CommT>> streamingCommitterState;
+
+	/** Inputs collected between every pre-commit. */
+	protected List<InputT> currentInputs;

Review comment:
       The relationship between `currentInput` and `currentGlobalCommittables` seems to be a little complex to understand. As a whole, I would tend to 
   
   1. Make `currentInputs` only available in the abstract class, pass it to `precommit` as a parameter and clear it in the abstract class.
   2. In `StreamingCommitterOperator` the result of `recoveredCommittables` should be saved in a standalone list, as did in the `GlobalStreamingCommitterOperator`. The recovered committables are semantically not inputs.
   3. In `StreamingCommitterOperator` and `GlobalStreamingCommitterOperator` rename the `currentGlobalCommittables` to be `recovered(Global)Committables`.  
   
   
   One pending problem here is that do we need to immediately commit the recovered committables ? In the current implementation of `StreamingFileSink` we did so.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalStreamingCommitterOperator.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Runtime {@link org.apache.flink.streaming.api.operators.StreamOperator} for executing {@link GlobalCommitter}.
+ *
+ * @param <CommT> The committable type of the {@link GlobalCommitter}.
+ * @param <GlobalCommT> The global committable type of the {@link GlobalCommitter}.
+ */
+@Internal
+public final class GlobalStreamingCommitterOperator<CommT, GlobalCommT> extends AbstractStreamingCommitterOperator<CommT, GlobalCommT>
+		implements BoundedOneInput {
+
+	/** Aggregate committables to global committables and commit the global committables to the external system. */
+	private GlobalCommitter<CommT, GlobalCommT> globalCommitter;
+
+	/** The current pending global committables. */
+	private List<GlobalCommT> currentGlobalCommittables;
+
+	private boolean endOfInput;
+
+	GlobalStreamingCommitterOperator(
+			GlobalCommitter<CommT, GlobalCommT> globalCommitter,
+			SimpleVersionedSerializer<GlobalCommT> committableSerializer) {
+		super(globalCommitter, committableSerializer);
+		this.globalCommitter = globalCommitter;
+		this.endOfInput = false;
+	}
+
+	@Override
+	void recoveredCommittables(List<GlobalCommT> committables) {
+		this.currentGlobalCommittables = globalCommitter.filterRecoveredCommittables(committables);

Review comment:
       Remove `this.` if not needed

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractStreamingCommitterOperator.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * Abstract base class for operators that work with a {@link Committer} or a {@link org.apache.flink.api.connector.sink.GlobalCommitter}.
+ *
+ * <p>Sub-classes are responsible for implementing {@link #recoveredCommittables(List)} and {@link #preCommit()}.
+ *
+ * @param <InputT> The input type of the {@link Committer}.
+ * @param <CommT> The committable type of the {@link Committer}.
+ */
+abstract class AbstractStreamingCommitterOperator<InputT, CommT> extends AbstractStreamOperator<CommT>
+		implements OneInputStreamOperator<InputT, CommT> {
+
+	private static final long serialVersionUID = 1L;
+
+	/** The operator's state descriptor. */
+	static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC =
+			new ListStateDescriptor<>(
+					"streaming_committer_raw_states",
+					BytePrimitiveArraySerializer.INSTANCE);
+
+	/** Group the committable by the checkpoint id. */
+	private final NavigableMap<Long, List<CommT>> committablesPerCheckpoint;
+
+	/** The committable's serializer. */
+	private final StreamingCommitterStateSerializer<CommT> streamingCommitterStateSerializer;
+
+	/** Responsible for committing the committable to the external system. **/
+	protected final Committer<CommT> committer;
+
+	/** The operator's state. */
+	private ListState<StreamingCommitterState<CommT>> streamingCommitterState;
+
+	/** Inputs collected between every pre-commit. */
+	protected List<InputT> currentInputs;
+
+	/**
+	 * Notify a list of committables that might need to be committed again after recovering from a failover.
+	 *
+	 * @param committables A list of committables
+	 */
+	abstract void recoveredCommittables(List<CommT> committables);
+
+	/**
+	 * Prepare a commit.
+	 *
+	 * @return A list of committables that could be committed in the following checkpoint complete.
+	 */
+	abstract List<CommT> preCommit();
+
+	AbstractStreamingCommitterOperator(
+			Committer<CommT> committer,
+			SimpleVersionedSerializer<CommT> committableSerializer) {
+		this.committer = committer;
+		this.streamingCommitterStateSerializer = new StreamingCommitterStateSerializer<>(
+				committableSerializer);
+		this.committablesPerCheckpoint = new TreeMap<>();
+		this.currentInputs = new ArrayList<>();
+	}
+
+	@Override
+	public void initializeState(StateInitializationContext context) throws Exception {
+		super.initializeState(context);
+		streamingCommitterState = new SimpleVersionedListState<>(
+				context.getOperatorStateStore().getListState(STREAMING_COMMITTER_RAW_STATES_DESC),
+				streamingCommitterStateSerializer);
+		final List<CommT> restored = new ArrayList<>();
+		streamingCommitterState.get().forEach(s -> restored.addAll(s.getCommittables()));
+		recoveredCommittables(restored);
+	}
+
+	@Override
+	public void processElement(StreamRecord<InputT> element) throws Exception {
+		currentInputs.add(element.getValue());
+	}
+
+	@Override
+	public void close() throws Exception {
+		super.close();
+		committer.close();
+	}
+
+	@Override
+	public void snapshotState(StateSnapshotContext context) throws Exception {
+		super.snapshotState(context);
+		committablesPerCheckpoint.put(context.getCheckpointId(), preCommit());
+		streamingCommitterState.update(
+				Collections.singletonList(new StreamingCommitterState<>(committablesPerCheckpoint)));
+	}
+
+	@Override
+	public void notifyCheckpointComplete(long checkpointId) throws Exception {
+		super.notifyCheckpointComplete(checkpointId);
+		commitUpTo(checkpointId);
+	}
+
+	private void commitUpTo(long checkpointId) throws Exception {
+		final Iterator<Map.Entry<Long, List<CommT>>>
+				it = committablesPerCheckpoint.headMap(checkpointId, true).entrySet().iterator();
+
+		while (it.hasNext()) {
+

Review comment:
       Remove this empty line?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13678:
URL: https://github.com/apache/flink/pull/13678#issuecomment-711439155


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7816",
       "triggerID" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7876",
       "triggerID" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7892",
       "triggerID" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7904",
       "triggerID" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1816d296b693f771d3a5e0cf417bd1f30f302aad",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7955",
       "triggerID" : "1816d296b693f771d3a5e0cf417bd1f30f302aad",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc7d36d9e7e23f81de0d5cd090fbb818f41f5edd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8038",
       "triggerID" : "fc7d36d9e7e23f81de0d5cd090fbb818f41f5edd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2798c88d1f869baf79e35af9ae67ac954f636857",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8051",
       "triggerID" : "714048464",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "2798c88d1f869baf79e35af9ae67ac954f636857",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8051",
       "triggerID" : "2798c88d1f869baf79e35af9ae67ac954f636857",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "714048464",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "966c1aca195e5f2af7f32a485a48e0f5eebb35d0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8055",
       "triggerID" : "966c1aca195e5f2af7f32a485a48e0f5eebb35d0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "463b5c8ed21f93caaeb7b938aa9e72abb35619b2",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "714246544",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "",
       "status" : "CANCELED",
       "url" : "TBD",
       "triggerID" : "714246544",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "463b5c8ed21f93caaeb7b938aa9e72abb35619b2",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "463b5c8ed21f93caaeb7b938aa9e72abb35619b2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2ba5bda5c5a15c370c231e1cedfa03fd0bc0b41b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8076",
       "triggerID" : "2ba5bda5c5a15c370c231e1cedfa03fd0bc0b41b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   *  Unknown: [CANCELED](TBD) 
   * 463b5c8ed21f93caaeb7b938aa9e72abb35619b2 UNKNOWN
   * 2ba5bda5c5a15c370c231e1cedfa03fd0bc0b41b Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8076) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13678:
URL: https://github.com/apache/flink/pull/13678#issuecomment-711439155


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7816",
       "triggerID" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 44eb4690cd88de72a900fbf6eebbcddb38f5e684 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7816) 
   * b31d0fdb8b5336a91231fad0db55afc072400932 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] kl0u commented on a change in pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
kl0u commented on a change in pull request #13678:
URL: https://github.com/apache/flink/pull/13678#discussion_r507834050



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalStreamingCommitterOperatorFactory.java
##########
@@ -0,0 +1,36 @@
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+
+/**
+ * A {@link org.apache.flink.streaming.api.operators.StreamOperatorFactory} for {@link GlobalStreamingCommitterOperator}.
+ *
+ * @param <CommT> The committable type of the {@link GlobalCommitter}.
+ * @param <GlobalCommT> The global committable type of the {@link GlobalCommitter}.
+ */
+public class GlobalStreamingCommitterOperatorFactory<CommT, GlobalCommT> extends AbstractStreamingCommitterOperatorFactory<CommT, GlobalCommT> {
+
+	private final Sink<?, CommT, ?, GlobalCommT> sink;
+
+	public GlobalStreamingCommitterOperatorFactory(Sink<?, CommT, ?, GlobalCommT> sink) {
+		this.sink = sink;
+	}
+
+	@Override
+	AbstractStreamingCommitterOperator<CommT, GlobalCommT> createStreamingCommitterOperator() {
+		return new GlobalStreamingCommitterOperator<>(
+				sink.createGlobalCommitter()
+						.orElseThrow(() -> new IllegalArgumentException(

Review comment:
       Probably throw an `IllegalStateException` instead of an `IllegalArgumentException`. What do you think @guoweiM ?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalStreamingCommitterOperatorFactory.java
##########
@@ -0,0 +1,36 @@
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+
+/**
+ * A {@link org.apache.flink.streaming.api.operators.StreamOperatorFactory} for {@link GlobalStreamingCommitterOperator}.
+ *
+ * @param <CommT> The committable type of the {@link GlobalCommitter}.
+ * @param <GlobalCommT> The global committable type of the {@link GlobalCommitter}.
+ */
+public class GlobalStreamingCommitterOperatorFactory<CommT, GlobalCommT> extends AbstractStreamingCommitterOperatorFactory<CommT, GlobalCommT> {
+
+	private final Sink<?, CommT, ?, GlobalCommT> sink;
+
+	public GlobalStreamingCommitterOperatorFactory(Sink<?, CommT, ?, GlobalCommT> sink) {
+		this.sink = sink;
+	}
+
+	@Override
+	AbstractStreamingCommitterOperator<CommT, GlobalCommT> createStreamingCommitterOperator() {
+		return new GlobalStreamingCommitterOperator<>(
+				sink.createGlobalCommitter()
+						.orElseThrow(() -> new IllegalArgumentException(
+								"Could not create global committer from the sink")),
+				sink.getGlobalCommittableSerializer()
+						.orElseThrow(() -> new IllegalArgumentException(

Review comment:
       Same as above.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] guoweiM commented on a change in pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
guoweiM commented on a change in pull request #13678:
URL: https://github.com/apache/flink/pull/13678#discussion_r507868806



##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/StreamingCommitterStateTest.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * Test {@link StreamingCommitterState}.
+ */
+public class StreamingCommitterStateTest {
+
+	@Test
+	public void constructFromMap() {
+		final NavigableMap<Long, List<Integer>> r = new TreeMap<>();
+		final List<Integer> expectedList = Arrays.asList(
+				0,

Review comment:
       I use the new formatter 😅




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13678:
URL: https://github.com/apache/flink/pull/13678#issuecomment-711439155


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 44eb4690cd88de72a900fbf6eebbcddb38f5e684 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13678:
URL: https://github.com/apache/flink/pull/13678#issuecomment-711439155


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7816",
       "triggerID" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7876",
       "triggerID" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7892",
       "triggerID" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7904",
       "triggerID" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 82614f698d02d8ce751d0fad7820b85c12e54fff Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7904) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] kl0u commented on a change in pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
kl0u commented on a change in pull request #13678:
URL: https://github.com/apache/flink/pull/13678#discussion_r507834350



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StreamingCommitterOperatorFactory.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+
+/**
+ * A {@link org.apache.flink.streaming.api.operators.StreamOperatorFactory} for {@link StreamingCommitterOperator}.
+ *
+ * @param <CommT> The committable type of the {@link Committer}.
+ */
+@Internal
+public class StreamingCommitterOperatorFactory<CommT> extends AbstractStreamingCommitterOperatorFactory<CommT, CommT> {
+
+	private final Sink<?, CommT, ?, ?> sink;
+
+	public StreamingCommitterOperatorFactory(Sink<?, CommT, ?, ?> sink) {
+		this.sink = sink;
+	}
+
+	@Override
+	AbstractStreamingCommitterOperator<CommT, CommT> createStreamingCommitterOperator() {
+		return new StreamingCommitterOperator<>(
+				sink.createCommitter()
+						.orElseThrow(() -> new IllegalArgumentException(
+								"Could not create committer from the sink")),

Review comment:
       Same as in the other factory.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13678:
URL: https://github.com/apache/flink/pull/13678#issuecomment-711439155


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7816",
       "triggerID" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7876",
       "triggerID" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7892",
       "triggerID" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7904",
       "triggerID" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1816d296b693f771d3a5e0cf417bd1f30f302aad",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7955",
       "triggerID" : "1816d296b693f771d3a5e0cf417bd1f30f302aad",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc7d36d9e7e23f81de0d5cd090fbb818f41f5edd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8038",
       "triggerID" : "fc7d36d9e7e23f81de0d5cd090fbb818f41f5edd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2798c88d1f869baf79e35af9ae67ac954f636857",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8051",
       "triggerID" : "714048464",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "2798c88d1f869baf79e35af9ae67ac954f636857",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8051",
       "triggerID" : "2798c88d1f869baf79e35af9ae67ac954f636857",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "714048464",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "966c1aca195e5f2af7f32a485a48e0f5eebb35d0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8055",
       "triggerID" : "966c1aca195e5f2af7f32a485a48e0f5eebb35d0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "463b5c8ed21f93caaeb7b938aa9e72abb35619b2",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "714246544",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "463b5c8ed21f93caaeb7b938aa9e72abb35619b2",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "463b5c8ed21f93caaeb7b938aa9e72abb35619b2",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2ba5bda5c5a15c370c231e1cedfa03fd0bc0b41b",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8076",
       "triggerID" : "2ba5bda5c5a15c370c231e1cedfa03fd0bc0b41b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "714246544",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "9d7e355abedbffb48ef6fa865a3606148d2207d2",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "9d7e355abedbffb48ef6fa865a3606148d2207d2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 463b5c8ed21f93caaeb7b938aa9e72abb35619b2 UNKNOWN
   * 2ba5bda5c5a15c370c231e1cedfa03fd0bc0b41b Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8076) 
   * 9d7e355abedbffb48ef6fa865a3606148d2207d2 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13678:
URL: https://github.com/apache/flink/pull/13678#issuecomment-711439155


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7816",
       "triggerID" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7876",
       "triggerID" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7892",
       "triggerID" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7904",
       "triggerID" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b31d0fdb8b5336a91231fad0db55afc072400932 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7876) 
   * 4873fb92710324b9d47682dcdfb707815d71bfc3 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7892) 
   * 82614f698d02d8ce751d0fad7820b85c12e54fff Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7904) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13678:
URL: https://github.com/apache/flink/pull/13678#issuecomment-711439155


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7816",
       "triggerID" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7876",
       "triggerID" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7892",
       "triggerID" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7904",
       "triggerID" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1816d296b693f771d3a5e0cf417bd1f30f302aad",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7955",
       "triggerID" : "1816d296b693f771d3a5e0cf417bd1f30f302aad",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fc7d36d9e7e23f81de0d5cd090fbb818f41f5edd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8038",
       "triggerID" : "fc7d36d9e7e23f81de0d5cd090fbb818f41f5edd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2798c88d1f869baf79e35af9ae67ac954f636857",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8051",
       "triggerID" : "714048464",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "2798c88d1f869baf79e35af9ae67ac954f636857",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8051",
       "triggerID" : "2798c88d1f869baf79e35af9ae67ac954f636857",
       "triggerType" : "PUSH"
     }, {
       "hash" : "",
       "status" : "DELETED",
       "url" : "TBD",
       "triggerID" : "714048464",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "966c1aca195e5f2af7f32a485a48e0f5eebb35d0",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8055",
       "triggerID" : "966c1aca195e5f2af7f32a485a48e0f5eebb35d0",
       "triggerType" : "PUSH"
     }, {
       "hash" : "463b5c8ed21f93caaeb7b938aa9e72abb35619b2",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "714246544",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "",
       "status" : "CANCELED",
       "url" : "TBD",
       "triggerID" : "714246544",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "463b5c8ed21f93caaeb7b938aa9e72abb35619b2",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "463b5c8ed21f93caaeb7b938aa9e72abb35619b2",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   *  Unknown: [CANCELED](TBD) 
   * 463b5c8ed21f93caaeb7b938aa9e72abb35619b2 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13678:
URL: https://github.com/apache/flink/pull/13678#issuecomment-711439155


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7816",
       "triggerID" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7876",
       "triggerID" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 44eb4690cd88de72a900fbf6eebbcddb38f5e684 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7816) 
   * b31d0fdb8b5336a91231fad0db55afc072400932 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7876) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13678:
URL: https://github.com/apache/flink/pull/13678#issuecomment-711439155


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7816",
       "triggerID" : "44eb4690cd88de72a900fbf6eebbcddb38f5e684",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7876",
       "triggerID" : "b31d0fdb8b5336a91231fad0db55afc072400932",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7892",
       "triggerID" : "4873fb92710324b9d47682dcdfb707815d71bfc3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7904",
       "triggerID" : "82614f698d02d8ce751d0fad7820b85c12e54fff",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1816d296b693f771d3a5e0cf417bd1f30f302aad",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7955",
       "triggerID" : "1816d296b693f771d3a5e0cf417bd1f30f302aad",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1816d296b693f771d3a5e0cf417bd1f30f302aad Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=7955) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on a change in pull request #13678: [FLINK-19586] Add stream committer operators for new Sink API

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on a change in pull request #13678:
URL: https://github.com/apache/flink/pull/13678#discussion_r507612814



##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/StreamingCommitterStateTest.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * Test {@link StreamingCommitterState}.
+ */
+public class StreamingCommitterStateTest {
+
+	@Test
+	public void constructFromMap() {
+		final NavigableMap<Long, List<Integer>> r = new TreeMap<>();
+		final List<Integer> expectedList = Arrays.asList(
+				0,

Review comment:
       I'm more tend to make the numbers in one line.

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/StreamingCommitterStateSerializerTest.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Test {@link StreamingCommitterStateSerializer}.
+ */
+public class StreamingCommitterStateSerializerTest {
+
+	@Test
+	public void serializeNonEmptyState() throws IOException {
+		final StreamingCommitterState<String> expectedStreamingCommitterState =
+				new StreamingCommitterState<>(Arrays.asList("city", "great", "temper", "valley"));
+		final StreamingCommitterStateSerializer<String> streamingCommitterStateSerializer =
+				new StreamingCommitterStateSerializer<>(SimpleVersionedStringSerializer.INSTANCE);
+
+		final byte[] serialize = streamingCommitterStateSerializer.serialize(
+				expectedStreamingCommitterState);
+		final StreamingCommitterState<String> streamingCommitterState =
+				streamingCommitterStateSerializer.deserialize(
+						streamingCommitterStateSerializer.getVersion(),
+						serialize);
+
+		assertThat(
+				streamingCommitterState.getCommittables(),
+				equalTo(
+						expectedStreamingCommitterState.getCommittables()));
+	}
+
+	@Test
+	public void serializeEmptyState() throws IOException {
+		final StreamingCommitterState<String> expectedStreamingCommitterState =
+				new StreamingCommitterState<>(Collections.emptyList());
+		final StreamingCommitterStateSerializer<String> streamingCommitterStateSerializer =
+				new StreamingCommitterStateSerializer<>(SimpleVersionedStringSerializer.INSTANCE);
+
+		final byte[] serialize = streamingCommitterStateSerializer.serialize(
+				expectedStreamingCommitterState);
+		final StreamingCommitterState<String> streamingCommitterState =
+				streamingCommitterStateSerializer.deserialize(
+						streamingCommitterStateSerializer.getVersion(),
+						serialize);
+
+		assertThat(

Review comment:
       Simplified to `assertEquals(expectedStreamingCommitterState, streamingCommitterState);` ?

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/StreamingCommitterStateTest.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * Test {@link StreamingCommitterState}.
+ */
+public class StreamingCommitterStateTest {
+
+	@Test
+	public void constructFromMap() {
+		final NavigableMap<Long, List<Integer>> r = new TreeMap<>();
+		final List<Integer> expectedList = Arrays.asList(
+				0,
+				1,
+				2,
+				3,
+				10,
+				11,
+				12,
+				13,
+				30,
+				31,
+				32,
+				33);
+
+		r.put(1L, Arrays.asList(10, 11, 12, 13));
+		r.put(0L, Arrays.asList(0, 1, 2, 3));
+		r.put(3L, Arrays.asList(30, 31, 32, 33));
+
+		final StreamingCommitterState<Integer> streamingCommitterState = new StreamingCommitterState<>(
+				r);
+
+		Assert.assertArrayEquals(

Review comment:
       May be simplified as `Assert.assertEquals(expectedList, streamingCommitterState.getCommittables());`

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/StreamingCommitterTestBase.java
##########
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.FunctionUtils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+/**
+ * Base class for Tests for subclasses of {@link AbstractWriterOperator}.
+ */
+public abstract class StreamingCommitterTestBase extends TestLogger {
+
+	private static final TestSink.DefaultWriter<String> DEFAULT_WRITER = new TestSink.DefaultWriter<>();
+
+	@Test(expected = IllegalArgumentException.class)
+	public void throwExceptionWithoutSerializer() throws Exception {
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createWithoutSerializer();
+		testHarness.initializeEmptyState();
+		testHarness.open();
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void throwExceptionWithoutCommitter() throws Exception {
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createWithoutCommitter();
+		testHarness.initializeEmptyState();
+		testHarness.open();
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void doNotSupportRetry() throws Exception {
+		final List<String> input = Arrays.asList("lazy", "leaf");
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createAlwaysRetryTestHarness();
+
+		testHarness.initializeEmptyState();
+		testHarness.open();
+
+		final List<String> expectedCommittables = getExpectedCommittables(input, testHarness);
+
+		process(testHarness, input);
+		snapshot(testHarness, 1, 1, expectedCommittables);
+		completeCheckpoint(testHarness, 1, expectedCommittables);
+
+		testHarness.close();
+	}
+
+	@Test
+	public void closeCommitter() throws Exception {
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness();
+		testHarness.initializeEmptyState();
+		testHarness.open();
+		testHarness.close();
+		assertThat(

Review comment:
       Might simplfied to `assertTrue`.

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/StreamingCommitterTestBase.java
##########
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.FunctionUtils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+/**
+ * Base class for Tests for subclasses of {@link AbstractWriterOperator}.
+ */
+public abstract class StreamingCommitterTestBase extends TestLogger {
+
+	private static final TestSink.DefaultWriter<String> DEFAULT_WRITER = new TestSink.DefaultWriter<>();
+
+	@Test(expected = IllegalArgumentException.class)
+	public void throwExceptionWithoutSerializer() throws Exception {
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createWithoutSerializer();
+		testHarness.initializeEmptyState();
+		testHarness.open();
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void throwExceptionWithoutCommitter() throws Exception {
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createWithoutCommitter();
+		testHarness.initializeEmptyState();
+		testHarness.open();
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void doNotSupportRetry() throws Exception {
+		final List<String> input = Arrays.asList("lazy", "leaf");
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createAlwaysRetryTestHarness();
+
+		testHarness.initializeEmptyState();
+		testHarness.open();
+
+		final List<String> expectedCommittables = getExpectedCommittables(input, testHarness);
+
+		process(testHarness, input);
+		snapshot(testHarness, 1, 1, expectedCommittables);
+		completeCheckpoint(testHarness, 1, expectedCommittables);
+
+		testHarness.close();
+	}
+
+	@Test
+	public void closeCommitter() throws Exception {
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness();
+		testHarness.initializeEmptyState();
+		testHarness.open();
+		testHarness.close();
+		assertThat(
+				getCommitter(testHarness).isClosed(),
+				is(true));
+	}
+
+	@Test
+	public void restoredFromMergedState() throws Exception {
+

Review comment:
       Remove the empty line ?

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/StreamingCommitterStateSerializerTest.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Test {@link StreamingCommitterStateSerializer}.
+ */
+public class StreamingCommitterStateSerializerTest {
+
+	@Test
+	public void serializeNonEmptyState() throws IOException {
+		final StreamingCommitterState<String> expectedStreamingCommitterState =
+				new StreamingCommitterState<>(Arrays.asList("city", "great", "temper", "valley"));
+		final StreamingCommitterStateSerializer<String> streamingCommitterStateSerializer =
+				new StreamingCommitterStateSerializer<>(SimpleVersionedStringSerializer.INSTANCE);
+
+		final byte[] serialize = streamingCommitterStateSerializer.serialize(
+				expectedStreamingCommitterState);
+		final StreamingCommitterState<String> streamingCommitterState =
+				streamingCommitterStateSerializer.deserialize(
+						streamingCommitterStateSerializer.getVersion(),
+						serialize);
+
+		assertThat(

Review comment:
       Simplified to `assertEquals(expectedStreamingCommitterState, streamingCommitterState);` ?

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/StreamingCommitterTestBase.java
##########
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.FunctionUtils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+/**
+ * Base class for Tests for subclasses of {@link AbstractWriterOperator}.
+ */
+public abstract class StreamingCommitterTestBase extends TestLogger {
+
+	private static final TestSink.DefaultWriter<String> DEFAULT_WRITER = new TestSink.DefaultWriter<>();
+
+	@Test(expected = IllegalArgumentException.class)
+	public void throwExceptionWithoutSerializer() throws Exception {
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createWithoutSerializer();
+		testHarness.initializeEmptyState();
+		testHarness.open();
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void throwExceptionWithoutCommitter() throws Exception {
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createWithoutCommitter();
+		testHarness.initializeEmptyState();
+		testHarness.open();
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void doNotSupportRetry() throws Exception {
+		final List<String> input = Arrays.asList("lazy", "leaf");
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createAlwaysRetryTestHarness();
+
+		testHarness.initializeEmptyState();
+		testHarness.open();
+
+		final List<String> expectedCommittables = getExpectedCommittables(input, testHarness);
+
+		process(testHarness, input);
+		snapshot(testHarness, 1, 1, expectedCommittables);
+		completeCheckpoint(testHarness, 1, expectedCommittables);
+
+		testHarness.close();
+	}
+
+	@Test
+	public void closeCommitter() throws Exception {
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness();
+		testHarness.initializeEmptyState();
+		testHarness.open();
+		testHarness.close();
+		assertThat(
+				getCommitter(testHarness).isClosed(),
+				is(true));
+	}
+
+	@Test
+	public void restoredFromMergedState() throws Exception {
+
+		final List<String> input1 = Arrays.asList("today", "whom");
+		final OperatorSubtaskState operatorSubtaskState1 = buildSubtaskState(input1);
+
+		final List<String> input2 = Arrays.asList("future", "evil", "how");
+		final OperatorSubtaskState operatorSubtaskState2 = buildSubtaskState(input2);
+
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness();
+
+		final OperatorSubtaskState mergedOperatorSubtaskState =
+				OneInputStreamOperatorTestHarness.repackageState(
+						operatorSubtaskState1,
+						operatorSubtaskState2);
+
+		testHarness.initializeState(mergedOperatorSubtaskState);
+
+		final AbstractStreamingCommitterOperator<String, String> streamingCommitterOperator =
+				(AbstractStreamingCommitterOperator<String, String>) testHarness.getOneInputOperator();
+
+		final List<String> expectedStates = new ArrayList<>();
+		expectedStates.addAll(getExpectedCommittables(input1, testHarness));
+		expectedStates.addAll(getExpectedCommittables(input2, testHarness));
+		assertThat(
+				expectedStates,
+				equalTo(
+						streamingCommitterOperator.getCurrentCommittables()));
+		testHarness.close();
+	}
+
+	@Test
+	public void commitMultipleStagesTogether() throws Exception {
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness();
+		testHarness.initializeEmptyState();
+		testHarness.open();
+
+		final List<String> expectedOutput = new ArrayList<>();
+		expectedOutput.addAll(preCommit(testHarness, 1L, Arrays.asList("cautious", "nature")));
+		expectedOutput.addAll(preCommit(testHarness, 2L, Arrays.asList("cautious", "nature")));
+		expectedOutput.addAll(preCommit(testHarness, 3L, Arrays.asList("lawyer", "grammar")));
+
+		completeCheckpoint(testHarness, 3, expectedOutput);
+
+		testHarness.close();
+	}
+
+	/**
+	 * Process the input and return the snapshot.
+	 */
+	protected OperatorSubtaskState buildSubtaskState(List<String> input) throws Exception {
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness();
+		testHarness.initializeEmptyState();
+		testHarness.open();
+
+		final List<String> committables = getExpectedCommittables(input, testHarness);
+		process(testHarness, input);
+		final OperatorSubtaskState operatorSubtaskState = snapshot(testHarness, 1, 1, committables);
+		testHarness.close();
+
+		return operatorSubtaskState;
+	}
+
+	private List<String> getExpectedCommittables(
+			List<String> committables,
+			OneInputStreamOperatorTestHarness<String, String> testHarness) {
+		final AbstractTestCommitter<String> committer = getCommitter(testHarness);
+		if (committer instanceof GlobalCommitter) {
+			final GlobalCommitter<String, String> globalCommitter = (GlobalCommitter<String, String>) committer;
+			return Collections.singletonList(globalCommitter.combine(committables));
+		}
+		return committables;
+	}
+
+	// ------------------------ The sub class should override or implement following method ----------------------------
+
+	abstract AbstractStreamingCommitterOperatorFactory<String, String> createStreamingCommitterOperator(
+			TestSink<?, String, String, String> sink);
+
+	GlobalCommitter<String, String> createGlobalCommitter() {
+		throw new RuntimeException("Sub class should override this method");
+	}
+
+	Committer<String> createCommitter() {
+		throw new RuntimeException("Sub class should override this method");
+	}
+
+	GlobalCommitter<String, String> createAlwaysRetryGlobalCommitter() {
+		throw new RuntimeException("Sub class should override this method");
+	}
+
+	Committer<String> createAlwaysRetryCommitter() {
+		throw new RuntimeException("Sub class should override this method");
+	}
+
+	// -------------------------------- OneInputStreamOperatorTestHarness Factory --------------------------------------
+
+	protected OneInputStreamOperatorTestHarness<String, String> createTestHarness() throws Exception {
+		return new OneInputStreamOperatorTestHarness<>(
+				createStreamingCommitterOperator(
+						TestSink.create(
+								() -> DEFAULT_WRITER,
+								() -> Optional.of(createCommitter()),
+								() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE),
+								() -> Optional.of(createGlobalCommitter()),
+								() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))),
+				StringSerializer.INSTANCE);
+	}
+
+	protected OneInputStreamOperatorTestHarness<String, String> createTestHarness(
+			OperatorSubtaskState operatorSubtaskState) throws Exception {
+
+		final OneInputStreamOperatorTestHarness<String, String> operatorTestHarness =
+				new OneInputStreamOperatorTestHarness<>(
+						createStreamingCommitterOperator(
+								TestSink.create(
+										() -> DEFAULT_WRITER,
+										() -> Optional.of(createCommitter()),
+										() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE),
+										() -> Optional.of(createGlobalCommitter()),
+										() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))),
+						StringSerializer.INSTANCE);
+		operatorTestHarness.initializeState(operatorSubtaskState);
+		return operatorTestHarness;
+	}
+
+	OneInputStreamOperatorTestHarness<String, String> createAlwaysRetryTestHarness() throws Exception {
+
+		return new OneInputStreamOperatorTestHarness<>(
+				createStreamingCommitterOperator(
+						TestSink.create(
+								() -> DEFAULT_WRITER,
+								() -> Optional.of(createAlwaysRetryCommitter()),
+								() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE),
+								() -> Optional.of(createAlwaysRetryGlobalCommitter()),
+								() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))),
+				StringSerializer.INSTANCE);
+	}
+
+	OneInputStreamOperatorTestHarness<String, String> createWithoutSerializer() throws Exception {
+		return new OneInputStreamOperatorTestHarness<>(
+				createStreamingCommitterOperator(
+						TestSink.create(
+								() -> DEFAULT_WRITER,
+								() -> Optional.of(createCommitter()),
+								() -> Optional.empty(),
+								() -> Optional.of(createGlobalCommitter()),
+								() -> Optional.empty())), StringSerializer.INSTANCE);
+	}
+
+	OneInputStreamOperatorTestHarness<String, String> createWithoutCommitter() throws Exception {
+		return new OneInputStreamOperatorTestHarness<>(
+				createStreamingCommitterOperator(
+						TestSink.create(
+								() -> DEFAULT_WRITER,
+								() -> Optional.empty(),
+								() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE),
+								() -> Optional.empty(),
+								() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))),
+				StringSerializer.INSTANCE);
+	}
+
+	/**
+	 * Base class for out testing {@link Committer} and {@link org.apache.flink.api.connector.sink.GlobalCommitter}.
+	 */
+	abstract static class AbstractTestCommitter<CommT> implements Committer<CommT> {
+
+		protected List<CommT> committedData;
+
+		private boolean isClosed;
+
+		public AbstractTestCommitter() {
+			this.committedData = new ArrayList<>();
+			this.isClosed = false;
+		}
+
+		public List<CommT> getCommittedData() {
+			return committedData;
+		}
+
+		@Override
+		public void close() throws Exception {
+			isClosed = true;
+		}
+
+		public boolean isClosed() {
+			return isClosed;
+		}
+	}
+
+	// ------------------------------------------- Utils for testing ---------------------------------------------------
+
+	/**
+	 * Process the input and return the pre-commit results.
+	 */
+	private List<String> preCommit(
+			OneInputStreamOperatorTestHarness<String, String> testHarness,
+			long checkpointId,
+			List<String> input) throws Exception {
+
+		process(testHarness, input);
+		final List<String> expectedCommittables = getExpectedCommittables(input, testHarness);
+		snapshot(testHarness, checkpointId, 1L, expectedCommittables);
+		return expectedCommittables;
+	}
+
+	private <InputT, CommT> void process(
+			OneInputStreamOperatorTestHarness<InputT, CommT> testHarness,
+			List<InputT> inputs) {
+
+		final AbstractStreamingCommitterOperator<String, String> streamingCommitterOperator = (AbstractStreamingCommitterOperator<String, String>) testHarness
+				.getOneInputOperator();
+		final List<StreamRecord<InputT>> inputStreamRecords = inputs
+				.stream()
+				.map(StreamRecord::new)
+				.collect(Collectors.toList());
+
+		// verify currently received committables
+		inputStreamRecords.forEach(FunctionUtils.uncheckedConsumer(testHarness::processElement));
+
+		Assert.assertArrayEquals(
+				inputs.toArray(),
+				streamingCommitterOperator.getCurrentInputs().toArray());
+	}
+
+	<InputT, CommT> OperatorSubtaskState snapshot(
+			OneInputStreamOperatorTestHarness<InputT, CommT> testHarness,
+			long checkpointId,
+			long checkpointTimestamp,
+			List<CommT> expectedCommittables) throws Exception {
+
+		final AbstractStreamingCommitterOperator<InputT, CommT> streamingCommitterOperator =
+				(AbstractStreamingCommitterOperator<InputT, CommT>) testHarness.getOneInputOperator();
+
+		final List<CommT> expectedState =
+				streamingCommitterOperator.getCommittablesPerCheckpoint().values().stream()
+						.flatMap(Collection::stream)
+						.collect(Collectors.toList());
+		expectedState.addAll(expectedCommittables);
+
+		final NavigableMap<Long, List<CommT>> expectedCommittablesPerCheckpoint = new TreeMap<>(
+				streamingCommitterOperator.getCommittablesPerCheckpoint());
+		final List<StreamingCommitterState<CommT>> expectedStreamingCommitterState = Collections.singletonList(
+				new StreamingCommitterState<>(expectedState));
+
+		expectedCommittablesPerCheckpoint.put(
+				checkpointId,
+				streamingCommitterOperator.getCurrentCommittables());
+
+		final OperatorSubtaskState operatorSubtaskState = testHarness.snapshot(
+				checkpointId,
+				checkpointTimestamp);
+
+		// verify current committables are cleaned up after checkpointing
+		assertThat(
+				streamingCommitterOperator.getCurrentInputs(),
+				is(
+						empty()));
+
+		// verify the state
+		final List<StreamingCommitterState<CommT>> operatorState =
+				CollectionUtil.iterableToList(streamingCommitterOperator.getState().get());
+
+		assertThat(
+				operatorState,
+				equalTo(
+						expectedStreamingCommitterState));
+
+		// verify committables per checkpoint
+		assertThat(
+				streamingCommitterOperator.getCommittablesPerCheckpoint(),
+				equalTo(
+						expectedCommittablesPerCheckpoint));
+		return operatorSubtaskState;
+	}
+
+	<InputT, CommT> void completeCheckpoint(
+			OneInputStreamOperatorTestHarness<InputT, CommT> testHarness,
+			long checkpoint,
+			List<CommT> expectedOutput) throws Exception {
+
+		final AbstractStreamingCommitterOperator<InputT, CommT> streamingCommitterOperator =
+				(AbstractStreamingCommitterOperator<InputT, CommT>) testHarness.getOneInputOperator();
+
+		final NavigableMap<Long, List<CommT>> expectedCommittablesPerCheckpoint =
+				new TreeMap<>(streamingCommitterOperator.getCommittablesPerCheckpoint());
+
+		expectedCommittablesPerCheckpoint.entrySet().removeIf(it -> it.getKey() <= checkpoint);
+		final List<CommT> sinkOutput = getCommitter(testHarness).getCommittedData();
+
+		testHarness.notifyOfCompletedCheckpoint(checkpoint);
+
+		// verify committables per checkpoint
+		assertThat(

Review comment:
       Similarly might change to `assertEquals` for these three asserts

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/StreamingCommitterTestBase.java
##########
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.FunctionUtils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+/**
+ * Base class for Tests for subclasses of {@link AbstractWriterOperator}.
+ */
+public abstract class StreamingCommitterTestBase extends TestLogger {
+
+	private static final TestSink.DefaultWriter<String> DEFAULT_WRITER = new TestSink.DefaultWriter<>();
+
+	@Test(expected = IllegalArgumentException.class)
+	public void throwExceptionWithoutSerializer() throws Exception {
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createWithoutSerializer();
+		testHarness.initializeEmptyState();
+		testHarness.open();
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void throwExceptionWithoutCommitter() throws Exception {
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createWithoutCommitter();
+		testHarness.initializeEmptyState();
+		testHarness.open();
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void doNotSupportRetry() throws Exception {
+		final List<String> input = Arrays.asList("lazy", "leaf");
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createAlwaysRetryTestHarness();
+
+		testHarness.initializeEmptyState();
+		testHarness.open();
+
+		final List<String> expectedCommittables = getExpectedCommittables(input, testHarness);
+
+		process(testHarness, input);
+		snapshot(testHarness, 1, 1, expectedCommittables);
+		completeCheckpoint(testHarness, 1, expectedCommittables);
+
+		testHarness.close();
+	}
+
+	@Test
+	public void closeCommitter() throws Exception {
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness();
+		testHarness.initializeEmptyState();
+		testHarness.open();
+		testHarness.close();
+		assertThat(
+				getCommitter(testHarness).isClosed(),
+				is(true));
+	}
+
+	@Test
+	public void restoredFromMergedState() throws Exception {
+
+		final List<String> input1 = Arrays.asList("today", "whom");
+		final OperatorSubtaskState operatorSubtaskState1 = buildSubtaskState(input1);
+
+		final List<String> input2 = Arrays.asList("future", "evil", "how");
+		final OperatorSubtaskState operatorSubtaskState2 = buildSubtaskState(input2);
+
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness();
+
+		final OperatorSubtaskState mergedOperatorSubtaskState =
+				OneInputStreamOperatorTestHarness.repackageState(
+						operatorSubtaskState1,
+						operatorSubtaskState2);
+
+		testHarness.initializeState(mergedOperatorSubtaskState);
+
+		final AbstractStreamingCommitterOperator<String, String> streamingCommitterOperator =
+				(AbstractStreamingCommitterOperator<String, String>) testHarness.getOneInputOperator();
+
+		final List<String> expectedStates = new ArrayList<>();
+		expectedStates.addAll(getExpectedCommittables(input1, testHarness));
+		expectedStates.addAll(getExpectedCommittables(input2, testHarness));
+		assertThat(
+				expectedStates,
+				equalTo(
+						streamingCommitterOperator.getCurrentCommittables()));
+		testHarness.close();
+	}
+
+	@Test
+	public void commitMultipleStagesTogether() throws Exception {
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness();
+		testHarness.initializeEmptyState();
+		testHarness.open();
+
+		final List<String> expectedOutput = new ArrayList<>();
+		expectedOutput.addAll(preCommit(testHarness, 1L, Arrays.asList("cautious", "nature")));
+		expectedOutput.addAll(preCommit(testHarness, 2L, Arrays.asList("cautious", "nature")));
+		expectedOutput.addAll(preCommit(testHarness, 3L, Arrays.asList("lawyer", "grammar")));
+
+		completeCheckpoint(testHarness, 3, expectedOutput);
+
+		testHarness.close();
+	}
+
+	/**
+	 * Process the input and return the snapshot.
+	 */
+	protected OperatorSubtaskState buildSubtaskState(List<String> input) throws Exception {
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness();
+		testHarness.initializeEmptyState();
+		testHarness.open();
+
+		final List<String> committables = getExpectedCommittables(input, testHarness);
+		process(testHarness, input);
+		final OperatorSubtaskState operatorSubtaskState = snapshot(testHarness, 1, 1, committables);
+		testHarness.close();
+
+		return operatorSubtaskState;
+	}
+
+	private List<String> getExpectedCommittables(
+			List<String> committables,
+			OneInputStreamOperatorTestHarness<String, String> testHarness) {
+		final AbstractTestCommitter<String> committer = getCommitter(testHarness);
+		if (committer instanceof GlobalCommitter) {
+			final GlobalCommitter<String, String> globalCommitter = (GlobalCommitter<String, String>) committer;
+			return Collections.singletonList(globalCommitter.combine(committables));
+		}
+		return committables;
+	}
+
+	// ------------------------ The sub class should override or implement following method ----------------------------
+
+	abstract AbstractStreamingCommitterOperatorFactory<String, String> createStreamingCommitterOperator(
+			TestSink<?, String, String, String> sink);
+
+	GlobalCommitter<String, String> createGlobalCommitter() {
+		throw new RuntimeException("Sub class should override this method");
+	}
+
+	Committer<String> createCommitter() {
+		throw new RuntimeException("Sub class should override this method");
+	}
+
+	GlobalCommitter<String, String> createAlwaysRetryGlobalCommitter() {
+		throw new RuntimeException("Sub class should override this method");
+	}
+
+	Committer<String> createAlwaysRetryCommitter() {
+		throw new RuntimeException("Sub class should override this method");
+	}
+
+	// -------------------------------- OneInputStreamOperatorTestHarness Factory --------------------------------------
+
+	protected OneInputStreamOperatorTestHarness<String, String> createTestHarness() throws Exception {
+		return new OneInputStreamOperatorTestHarness<>(
+				createStreamingCommitterOperator(
+						TestSink.create(
+								() -> DEFAULT_WRITER,
+								() -> Optional.of(createCommitter()),
+								() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE),
+								() -> Optional.of(createGlobalCommitter()),
+								() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))),
+				StringSerializer.INSTANCE);
+	}
+
+	protected OneInputStreamOperatorTestHarness<String, String> createTestHarness(
+			OperatorSubtaskState operatorSubtaskState) throws Exception {
+
+		final OneInputStreamOperatorTestHarness<String, String> operatorTestHarness =
+				new OneInputStreamOperatorTestHarness<>(
+						createStreamingCommitterOperator(
+								TestSink.create(
+										() -> DEFAULT_WRITER,
+										() -> Optional.of(createCommitter()),
+										() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE),
+										() -> Optional.of(createGlobalCommitter()),
+										() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))),
+						StringSerializer.INSTANCE);
+		operatorTestHarness.initializeState(operatorSubtaskState);
+		return operatorTestHarness;
+	}
+
+	OneInputStreamOperatorTestHarness<String, String> createAlwaysRetryTestHarness() throws Exception {
+
+		return new OneInputStreamOperatorTestHarness<>(
+				createStreamingCommitterOperator(
+						TestSink.create(
+								() -> DEFAULT_WRITER,
+								() -> Optional.of(createAlwaysRetryCommitter()),
+								() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE),
+								() -> Optional.of(createAlwaysRetryGlobalCommitter()),
+								() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))),
+				StringSerializer.INSTANCE);
+	}
+
+	OneInputStreamOperatorTestHarness<String, String> createWithoutSerializer() throws Exception {
+		return new OneInputStreamOperatorTestHarness<>(
+				createStreamingCommitterOperator(
+						TestSink.create(
+								() -> DEFAULT_WRITER,
+								() -> Optional.of(createCommitter()),
+								() -> Optional.empty(),
+								() -> Optional.of(createGlobalCommitter()),
+								() -> Optional.empty())), StringSerializer.INSTANCE);
+	}
+
+	OneInputStreamOperatorTestHarness<String, String> createWithoutCommitter() throws Exception {
+		return new OneInputStreamOperatorTestHarness<>(
+				createStreamingCommitterOperator(
+						TestSink.create(
+								() -> DEFAULT_WRITER,
+								() -> Optional.empty(),
+								() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE),
+								() -> Optional.empty(),
+								() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))),
+				StringSerializer.INSTANCE);
+	}
+
+	/**
+	 * Base class for out testing {@link Committer} and {@link org.apache.flink.api.connector.sink.GlobalCommitter}.
+	 */
+	abstract static class AbstractTestCommitter<CommT> implements Committer<CommT> {
+
+		protected List<CommT> committedData;
+
+		private boolean isClosed;
+
+		public AbstractTestCommitter() {
+			this.committedData = new ArrayList<>();
+			this.isClosed = false;
+		}
+
+		public List<CommT> getCommittedData() {
+			return committedData;
+		}
+
+		@Override
+		public void close() throws Exception {
+			isClosed = true;
+		}
+
+		public boolean isClosed() {
+			return isClosed;
+		}
+	}
+
+	// ------------------------------------------- Utils for testing ---------------------------------------------------
+
+	/**
+	 * Process the input and return the pre-commit results.
+	 */
+	private List<String> preCommit(
+			OneInputStreamOperatorTestHarness<String, String> testHarness,
+			long checkpointId,
+			List<String> input) throws Exception {
+
+		process(testHarness, input);
+		final List<String> expectedCommittables = getExpectedCommittables(input, testHarness);
+		snapshot(testHarness, checkpointId, 1L, expectedCommittables);
+		return expectedCommittables;
+	}
+
+	private <InputT, CommT> void process(
+			OneInputStreamOperatorTestHarness<InputT, CommT> testHarness,
+			List<InputT> inputs) {
+
+		final AbstractStreamingCommitterOperator<String, String> streamingCommitterOperator = (AbstractStreamingCommitterOperator<String, String>) testHarness
+				.getOneInputOperator();
+		final List<StreamRecord<InputT>> inputStreamRecords = inputs
+				.stream()
+				.map(StreamRecord::new)
+				.collect(Collectors.toList());
+
+		// verify currently received committables
+		inputStreamRecords.forEach(FunctionUtils.uncheckedConsumer(testHarness::processElement));
+
+		Assert.assertArrayEquals(

Review comment:
       might change to `assertEquals`

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/StreamingCommitterTestBase.java
##########
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.FunctionUtils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+/**
+ * Base class for Tests for subclasses of {@link AbstractWriterOperator}.
+ */
+public abstract class StreamingCommitterTestBase extends TestLogger {
+
+	private static final TestSink.DefaultWriter<String> DEFAULT_WRITER = new TestSink.DefaultWriter<>();
+
+	@Test(expected = IllegalArgumentException.class)
+	public void throwExceptionWithoutSerializer() throws Exception {
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createWithoutSerializer();
+		testHarness.initializeEmptyState();
+		testHarness.open();
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void throwExceptionWithoutCommitter() throws Exception {
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createWithoutCommitter();
+		testHarness.initializeEmptyState();
+		testHarness.open();
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void doNotSupportRetry() throws Exception {
+		final List<String> input = Arrays.asList("lazy", "leaf");
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createAlwaysRetryTestHarness();
+
+		testHarness.initializeEmptyState();
+		testHarness.open();
+
+		final List<String> expectedCommittables = getExpectedCommittables(input, testHarness);
+
+		process(testHarness, input);
+		snapshot(testHarness, 1, 1, expectedCommittables);
+		completeCheckpoint(testHarness, 1, expectedCommittables);
+
+		testHarness.close();
+	}
+
+	@Test
+	public void closeCommitter() throws Exception {
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness();
+		testHarness.initializeEmptyState();
+		testHarness.open();
+		testHarness.close();
+		assertThat(
+				getCommitter(testHarness).isClosed(),
+				is(true));
+	}
+
+	@Test
+	public void restoredFromMergedState() throws Exception {
+
+		final List<String> input1 = Arrays.asList("today", "whom");
+		final OperatorSubtaskState operatorSubtaskState1 = buildSubtaskState(input1);
+
+		final List<String> input2 = Arrays.asList("future", "evil", "how");
+		final OperatorSubtaskState operatorSubtaskState2 = buildSubtaskState(input2);
+
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness();
+
+		final OperatorSubtaskState mergedOperatorSubtaskState =
+				OneInputStreamOperatorTestHarness.repackageState(
+						operatorSubtaskState1,
+						operatorSubtaskState2);
+
+		testHarness.initializeState(mergedOperatorSubtaskState);
+
+		final AbstractStreamingCommitterOperator<String, String> streamingCommitterOperator =
+				(AbstractStreamingCommitterOperator<String, String>) testHarness.getOneInputOperator();
+
+		final List<String> expectedStates = new ArrayList<>();
+		expectedStates.addAll(getExpectedCommittables(input1, testHarness));
+		expectedStates.addAll(getExpectedCommittables(input2, testHarness));
+		assertThat(
+				expectedStates,
+				equalTo(
+						streamingCommitterOperator.getCurrentCommittables()));
+		testHarness.close();
+	}
+
+	@Test
+	public void commitMultipleStagesTogether() throws Exception {
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness();
+		testHarness.initializeEmptyState();
+		testHarness.open();
+
+		final List<String> expectedOutput = new ArrayList<>();
+		expectedOutput.addAll(preCommit(testHarness, 1L, Arrays.asList("cautious", "nature")));
+		expectedOutput.addAll(preCommit(testHarness, 2L, Arrays.asList("cautious", "nature")));
+		expectedOutput.addAll(preCommit(testHarness, 3L, Arrays.asList("lawyer", "grammar")));
+
+		completeCheckpoint(testHarness, 3, expectedOutput);
+
+		testHarness.close();
+	}
+
+	/**
+	 * Process the input and return the snapshot.
+	 */
+	protected OperatorSubtaskState buildSubtaskState(List<String> input) throws Exception {
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness();
+		testHarness.initializeEmptyState();
+		testHarness.open();
+
+		final List<String> committables = getExpectedCommittables(input, testHarness);
+		process(testHarness, input);
+		final OperatorSubtaskState operatorSubtaskState = snapshot(testHarness, 1, 1, committables);
+		testHarness.close();
+
+		return operatorSubtaskState;
+	}
+
+	private List<String> getExpectedCommittables(
+			List<String> committables,
+			OneInputStreamOperatorTestHarness<String, String> testHarness) {
+		final AbstractTestCommitter<String> committer = getCommitter(testHarness);
+		if (committer instanceof GlobalCommitter) {
+			final GlobalCommitter<String, String> globalCommitter = (GlobalCommitter<String, String>) committer;
+			return Collections.singletonList(globalCommitter.combine(committables));
+		}
+		return committables;
+	}
+
+	// ------------------------ The sub class should override or implement following method ----------------------------
+
+	abstract AbstractStreamingCommitterOperatorFactory<String, String> createStreamingCommitterOperator(
+			TestSink<?, String, String, String> sink);
+
+	GlobalCommitter<String, String> createGlobalCommitter() {
+		throw new RuntimeException("Sub class should override this method");
+	}
+
+	Committer<String> createCommitter() {
+		throw new RuntimeException("Sub class should override this method");
+	}
+
+	GlobalCommitter<String, String> createAlwaysRetryGlobalCommitter() {
+		throw new RuntimeException("Sub class should override this method");
+	}
+
+	Committer<String> createAlwaysRetryCommitter() {
+		throw new RuntimeException("Sub class should override this method");
+	}
+
+	// -------------------------------- OneInputStreamOperatorTestHarness Factory --------------------------------------
+
+	protected OneInputStreamOperatorTestHarness<String, String> createTestHarness() throws Exception {
+		return new OneInputStreamOperatorTestHarness<>(
+				createStreamingCommitterOperator(
+						TestSink.create(
+								() -> DEFAULT_WRITER,
+								() -> Optional.of(createCommitter()),
+								() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE),
+								() -> Optional.of(createGlobalCommitter()),
+								() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))),
+				StringSerializer.INSTANCE);
+	}
+
+	protected OneInputStreamOperatorTestHarness<String, String> createTestHarness(
+			OperatorSubtaskState operatorSubtaskState) throws Exception {
+
+		final OneInputStreamOperatorTestHarness<String, String> operatorTestHarness =
+				new OneInputStreamOperatorTestHarness<>(
+						createStreamingCommitterOperator(
+								TestSink.create(
+										() -> DEFAULT_WRITER,
+										() -> Optional.of(createCommitter()),
+										() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE),
+										() -> Optional.of(createGlobalCommitter()),
+										() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))),
+						StringSerializer.INSTANCE);
+		operatorTestHarness.initializeState(operatorSubtaskState);
+		return operatorTestHarness;
+	}
+
+	OneInputStreamOperatorTestHarness<String, String> createAlwaysRetryTestHarness() throws Exception {
+
+		return new OneInputStreamOperatorTestHarness<>(
+				createStreamingCommitterOperator(
+						TestSink.create(
+								() -> DEFAULT_WRITER,
+								() -> Optional.of(createAlwaysRetryCommitter()),
+								() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE),
+								() -> Optional.of(createAlwaysRetryGlobalCommitter()),
+								() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))),
+				StringSerializer.INSTANCE);
+	}
+
+	OneInputStreamOperatorTestHarness<String, String> createWithoutSerializer() throws Exception {
+		return new OneInputStreamOperatorTestHarness<>(
+				createStreamingCommitterOperator(
+						TestSink.create(
+								() -> DEFAULT_WRITER,
+								() -> Optional.of(createCommitter()),
+								() -> Optional.empty(),
+								() -> Optional.of(createGlobalCommitter()),
+								() -> Optional.empty())), StringSerializer.INSTANCE);
+	}
+
+	OneInputStreamOperatorTestHarness<String, String> createWithoutCommitter() throws Exception {
+		return new OneInputStreamOperatorTestHarness<>(
+				createStreamingCommitterOperator(
+						TestSink.create(
+								() -> DEFAULT_WRITER,
+								() -> Optional.empty(),
+								() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE),
+								() -> Optional.empty(),
+								() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))),
+				StringSerializer.INSTANCE);
+	}
+
+	/**
+	 * Base class for out testing {@link Committer} and {@link org.apache.flink.api.connector.sink.GlobalCommitter}.
+	 */
+	abstract static class AbstractTestCommitter<CommT> implements Committer<CommT> {
+
+		protected List<CommT> committedData;
+
+		private boolean isClosed;
+
+		public AbstractTestCommitter() {
+			this.committedData = new ArrayList<>();
+			this.isClosed = false;
+		}
+
+		public List<CommT> getCommittedData() {
+			return committedData;
+		}
+
+		@Override
+		public void close() throws Exception {
+			isClosed = true;
+		}
+
+		public boolean isClosed() {
+			return isClosed;
+		}
+	}
+
+	// ------------------------------------------- Utils for testing ---------------------------------------------------
+
+	/**
+	 * Process the input and return the pre-commit results.
+	 */
+	private List<String> preCommit(
+			OneInputStreamOperatorTestHarness<String, String> testHarness,
+			long checkpointId,
+			List<String> input) throws Exception {
+
+		process(testHarness, input);
+		final List<String> expectedCommittables = getExpectedCommittables(input, testHarness);
+		snapshot(testHarness, checkpointId, 1L, expectedCommittables);
+		return expectedCommittables;
+	}
+
+	private <InputT, CommT> void process(
+			OneInputStreamOperatorTestHarness<InputT, CommT> testHarness,
+			List<InputT> inputs) {
+
+		final AbstractStreamingCommitterOperator<String, String> streamingCommitterOperator = (AbstractStreamingCommitterOperator<String, String>) testHarness
+				.getOneInputOperator();
+		final List<StreamRecord<InputT>> inputStreamRecords = inputs
+				.stream()
+				.map(StreamRecord::new)
+				.collect(Collectors.toList());
+
+		// verify currently received committables
+		inputStreamRecords.forEach(FunctionUtils.uncheckedConsumer(testHarness::processElement));
+
+		Assert.assertArrayEquals(
+				inputs.toArray(),
+				streamingCommitterOperator.getCurrentInputs().toArray());
+	}
+
+	<InputT, CommT> OperatorSubtaskState snapshot(
+			OneInputStreamOperatorTestHarness<InputT, CommT> testHarness,
+			long checkpointId,
+			long checkpointTimestamp,
+			List<CommT> expectedCommittables) throws Exception {
+
+		final AbstractStreamingCommitterOperator<InputT, CommT> streamingCommitterOperator =
+				(AbstractStreamingCommitterOperator<InputT, CommT>) testHarness.getOneInputOperator();
+
+		final List<CommT> expectedState =
+				streamingCommitterOperator.getCommittablesPerCheckpoint().values().stream()
+						.flatMap(Collection::stream)
+						.collect(Collectors.toList());
+		expectedState.addAll(expectedCommittables);
+
+		final NavigableMap<Long, List<CommT>> expectedCommittablesPerCheckpoint = new TreeMap<>(
+				streamingCommitterOperator.getCommittablesPerCheckpoint());
+		final List<StreamingCommitterState<CommT>> expectedStreamingCommitterState = Collections.singletonList(
+				new StreamingCommitterState<>(expectedState));
+
+		expectedCommittablesPerCheckpoint.put(
+				checkpointId,
+				streamingCommitterOperator.getCurrentCommittables());
+
+		final OperatorSubtaskState operatorSubtaskState = testHarness.snapshot(
+				checkpointId,
+				checkpointTimestamp);
+
+		// verify current committables are cleaned up after checkpointing
+		assertThat(
+				streamingCommitterOperator.getCurrentInputs(),
+				is(
+						empty()));
+
+		// verify the state
+		final List<StreamingCommitterState<CommT>> operatorState =
+				CollectionUtil.iterableToList(streamingCommitterOperator.getState().get());
+
+		assertThat(

Review comment:
       might change to `assertEquals`

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/StreamingCommitterTestBase.java
##########
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.FunctionUtils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+/**
+ * Base class for Tests for subclasses of {@link AbstractWriterOperator}.
+ */
+public abstract class StreamingCommitterTestBase extends TestLogger {
+
+	private static final TestSink.DefaultWriter<String> DEFAULT_WRITER = new TestSink.DefaultWriter<>();
+
+	@Test(expected = IllegalArgumentException.class)
+	public void throwExceptionWithoutSerializer() throws Exception {
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createWithoutSerializer();
+		testHarness.initializeEmptyState();
+		testHarness.open();
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void throwExceptionWithoutCommitter() throws Exception {
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createWithoutCommitter();
+		testHarness.initializeEmptyState();
+		testHarness.open();
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void doNotSupportRetry() throws Exception {
+		final List<String> input = Arrays.asList("lazy", "leaf");
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createAlwaysRetryTestHarness();
+
+		testHarness.initializeEmptyState();
+		testHarness.open();
+
+		final List<String> expectedCommittables = getExpectedCommittables(input, testHarness);
+
+		process(testHarness, input);
+		snapshot(testHarness, 1, 1, expectedCommittables);
+		completeCheckpoint(testHarness, 1, expectedCommittables);
+
+		testHarness.close();
+	}
+
+	@Test
+	public void closeCommitter() throws Exception {
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness();
+		testHarness.initializeEmptyState();
+		testHarness.open();
+		testHarness.close();
+		assertThat(
+				getCommitter(testHarness).isClosed(),
+				is(true));
+	}
+
+	@Test
+	public void restoredFromMergedState() throws Exception {
+
+		final List<String> input1 = Arrays.asList("today", "whom");
+		final OperatorSubtaskState operatorSubtaskState1 = buildSubtaskState(input1);
+
+		final List<String> input2 = Arrays.asList("future", "evil", "how");
+		final OperatorSubtaskState operatorSubtaskState2 = buildSubtaskState(input2);
+
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness();
+
+		final OperatorSubtaskState mergedOperatorSubtaskState =
+				OneInputStreamOperatorTestHarness.repackageState(
+						operatorSubtaskState1,
+						operatorSubtaskState2);
+
+		testHarness.initializeState(mergedOperatorSubtaskState);
+
+		final AbstractStreamingCommitterOperator<String, String> streamingCommitterOperator =
+				(AbstractStreamingCommitterOperator<String, String>) testHarness.getOneInputOperator();
+
+		final List<String> expectedStates = new ArrayList<>();
+		expectedStates.addAll(getExpectedCommittables(input1, testHarness));
+		expectedStates.addAll(getExpectedCommittables(input2, testHarness));
+		assertThat(
+				expectedStates,
+				equalTo(
+						streamingCommitterOperator.getCurrentCommittables()));
+		testHarness.close();
+	}
+
+	@Test
+	public void commitMultipleStagesTogether() throws Exception {
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness();
+		testHarness.initializeEmptyState();
+		testHarness.open();
+
+		final List<String> expectedOutput = new ArrayList<>();
+		expectedOutput.addAll(preCommit(testHarness, 1L, Arrays.asList("cautious", "nature")));
+		expectedOutput.addAll(preCommit(testHarness, 2L, Arrays.asList("cautious", "nature")));
+		expectedOutput.addAll(preCommit(testHarness, 3L, Arrays.asList("lawyer", "grammar")));
+
+		completeCheckpoint(testHarness, 3, expectedOutput);
+
+		testHarness.close();
+	}
+
+	/**
+	 * Process the input and return the snapshot.
+	 */
+	protected OperatorSubtaskState buildSubtaskState(List<String> input) throws Exception {
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness();
+		testHarness.initializeEmptyState();
+		testHarness.open();
+
+		final List<String> committables = getExpectedCommittables(input, testHarness);
+		process(testHarness, input);
+		final OperatorSubtaskState operatorSubtaskState = snapshot(testHarness, 1, 1, committables);
+		testHarness.close();
+
+		return operatorSubtaskState;
+	}
+
+	private List<String> getExpectedCommittables(
+			List<String> committables,
+			OneInputStreamOperatorTestHarness<String, String> testHarness) {
+		final AbstractTestCommitter<String> committer = getCommitter(testHarness);
+		if (committer instanceof GlobalCommitter) {
+			final GlobalCommitter<String, String> globalCommitter = (GlobalCommitter<String, String>) committer;
+			return Collections.singletonList(globalCommitter.combine(committables));
+		}
+		return committables;
+	}
+
+	// ------------------------ The sub class should override or implement following method ----------------------------
+
+	abstract AbstractStreamingCommitterOperatorFactory<String, String> createStreamingCommitterOperator(
+			TestSink<?, String, String, String> sink);
+
+	GlobalCommitter<String, String> createGlobalCommitter() {
+		throw new RuntimeException("Sub class should override this method");
+	}
+
+	Committer<String> createCommitter() {
+		throw new RuntimeException("Sub class should override this method");
+	}
+
+	GlobalCommitter<String, String> createAlwaysRetryGlobalCommitter() {
+		throw new RuntimeException("Sub class should override this method");
+	}
+
+	Committer<String> createAlwaysRetryCommitter() {
+		throw new RuntimeException("Sub class should override this method");
+	}
+
+	// -------------------------------- OneInputStreamOperatorTestHarness Factory --------------------------------------
+
+	protected OneInputStreamOperatorTestHarness<String, String> createTestHarness() throws Exception {
+		return new OneInputStreamOperatorTestHarness<>(
+				createStreamingCommitterOperator(
+						TestSink.create(
+								() -> DEFAULT_WRITER,
+								() -> Optional.of(createCommitter()),
+								() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE),
+								() -> Optional.of(createGlobalCommitter()),
+								() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))),
+				StringSerializer.INSTANCE);
+	}
+
+	protected OneInputStreamOperatorTestHarness<String, String> createTestHarness(
+			OperatorSubtaskState operatorSubtaskState) throws Exception {
+
+		final OneInputStreamOperatorTestHarness<String, String> operatorTestHarness =
+				new OneInputStreamOperatorTestHarness<>(
+						createStreamingCommitterOperator(
+								TestSink.create(
+										() -> DEFAULT_WRITER,
+										() -> Optional.of(createCommitter()),
+										() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE),
+										() -> Optional.of(createGlobalCommitter()),
+										() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))),
+						StringSerializer.INSTANCE);
+		operatorTestHarness.initializeState(operatorSubtaskState);
+		return operatorTestHarness;
+	}
+
+	OneInputStreamOperatorTestHarness<String, String> createAlwaysRetryTestHarness() throws Exception {
+
+		return new OneInputStreamOperatorTestHarness<>(
+				createStreamingCommitterOperator(
+						TestSink.create(
+								() -> DEFAULT_WRITER,
+								() -> Optional.of(createAlwaysRetryCommitter()),
+								() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE),
+								() -> Optional.of(createAlwaysRetryGlobalCommitter()),
+								() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))),
+				StringSerializer.INSTANCE);
+	}
+
+	OneInputStreamOperatorTestHarness<String, String> createWithoutSerializer() throws Exception {
+		return new OneInputStreamOperatorTestHarness<>(
+				createStreamingCommitterOperator(
+						TestSink.create(
+								() -> DEFAULT_WRITER,
+								() -> Optional.of(createCommitter()),
+								() -> Optional.empty(),
+								() -> Optional.of(createGlobalCommitter()),
+								() -> Optional.empty())), StringSerializer.INSTANCE);
+	}
+
+	OneInputStreamOperatorTestHarness<String, String> createWithoutCommitter() throws Exception {
+		return new OneInputStreamOperatorTestHarness<>(
+				createStreamingCommitterOperator(
+						TestSink.create(
+								() -> DEFAULT_WRITER,
+								() -> Optional.empty(),
+								() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE),
+								() -> Optional.empty(),
+								() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))),
+				StringSerializer.INSTANCE);
+	}
+
+	/**
+	 * Base class for out testing {@link Committer} and {@link org.apache.flink.api.connector.sink.GlobalCommitter}.
+	 */
+	abstract static class AbstractTestCommitter<CommT> implements Committer<CommT> {
+
+		protected List<CommT> committedData;
+
+		private boolean isClosed;
+
+		public AbstractTestCommitter() {
+			this.committedData = new ArrayList<>();
+			this.isClosed = false;
+		}
+
+		public List<CommT> getCommittedData() {
+			return committedData;
+		}
+
+		@Override
+		public void close() throws Exception {
+			isClosed = true;
+		}
+
+		public boolean isClosed() {
+			return isClosed;
+		}
+	}
+
+	// ------------------------------------------- Utils for testing ---------------------------------------------------
+
+	/**
+	 * Process the input and return the pre-commit results.
+	 */
+	private List<String> preCommit(
+			OneInputStreamOperatorTestHarness<String, String> testHarness,
+			long checkpointId,
+			List<String> input) throws Exception {
+
+		process(testHarness, input);
+		final List<String> expectedCommittables = getExpectedCommittables(input, testHarness);
+		snapshot(testHarness, checkpointId, 1L, expectedCommittables);
+		return expectedCommittables;
+	}
+
+	private <InputT, CommT> void process(
+			OneInputStreamOperatorTestHarness<InputT, CommT> testHarness,
+			List<InputT> inputs) {
+
+		final AbstractStreamingCommitterOperator<String, String> streamingCommitterOperator = (AbstractStreamingCommitterOperator<String, String>) testHarness
+				.getOneInputOperator();
+		final List<StreamRecord<InputT>> inputStreamRecords = inputs
+				.stream()
+				.map(StreamRecord::new)
+				.collect(Collectors.toList());
+
+		// verify currently received committables
+		inputStreamRecords.forEach(FunctionUtils.uncheckedConsumer(testHarness::processElement));
+
+		Assert.assertArrayEquals(
+				inputs.toArray(),
+				streamingCommitterOperator.getCurrentInputs().toArray());
+	}
+
+	<InputT, CommT> OperatorSubtaskState snapshot(
+			OneInputStreamOperatorTestHarness<InputT, CommT> testHarness,
+			long checkpointId,
+			long checkpointTimestamp,
+			List<CommT> expectedCommittables) throws Exception {
+
+		final AbstractStreamingCommitterOperator<InputT, CommT> streamingCommitterOperator =
+				(AbstractStreamingCommitterOperator<InputT, CommT>) testHarness.getOneInputOperator();
+
+		final List<CommT> expectedState =
+				streamingCommitterOperator.getCommittablesPerCheckpoint().values().stream()
+						.flatMap(Collection::stream)
+						.collect(Collectors.toList());
+		expectedState.addAll(expectedCommittables);
+
+		final NavigableMap<Long, List<CommT>> expectedCommittablesPerCheckpoint = new TreeMap<>(
+				streamingCommitterOperator.getCommittablesPerCheckpoint());
+		final List<StreamingCommitterState<CommT>> expectedStreamingCommitterState = Collections.singletonList(
+				new StreamingCommitterState<>(expectedState));
+
+		expectedCommittablesPerCheckpoint.put(
+				checkpointId,
+				streamingCommitterOperator.getCurrentCommittables());
+
+		final OperatorSubtaskState operatorSubtaskState = testHarness.snapshot(
+				checkpointId,
+				checkpointTimestamp);
+
+		// verify current committables are cleaned up after checkpointing
+		assertThat(
+				streamingCommitterOperator.getCurrentInputs(),
+				is(
+						empty()));
+
+		// verify the state
+		final List<StreamingCommitterState<CommT>> operatorState =
+				CollectionUtil.iterableToList(streamingCommitterOperator.getState().get());
+
+		assertThat(
+				operatorState,
+				equalTo(
+						expectedStreamingCommitterState));
+
+		// verify committables per checkpoint
+		assertThat(

Review comment:
       might change to `assertEquals`

##########
File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/StreamingCommitterTestBase.java
##########
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.FunctionUtils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+/**
+ * Base class for Tests for subclasses of {@link AbstractWriterOperator}.
+ */
+public abstract class StreamingCommitterTestBase extends TestLogger {
+
+	private static final TestSink.DefaultWriter<String> DEFAULT_WRITER = new TestSink.DefaultWriter<>();
+
+	@Test(expected = IllegalArgumentException.class)
+	public void throwExceptionWithoutSerializer() throws Exception {
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createWithoutSerializer();
+		testHarness.initializeEmptyState();
+		testHarness.open();
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void throwExceptionWithoutCommitter() throws Exception {
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createWithoutCommitter();
+		testHarness.initializeEmptyState();
+		testHarness.open();
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void doNotSupportRetry() throws Exception {
+		final List<String> input = Arrays.asList("lazy", "leaf");
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createAlwaysRetryTestHarness();
+
+		testHarness.initializeEmptyState();
+		testHarness.open();
+
+		final List<String> expectedCommittables = getExpectedCommittables(input, testHarness);
+
+		process(testHarness, input);
+		snapshot(testHarness, 1, 1, expectedCommittables);
+		completeCheckpoint(testHarness, 1, expectedCommittables);
+
+		testHarness.close();
+	}
+
+	@Test
+	public void closeCommitter() throws Exception {
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness();
+		testHarness.initializeEmptyState();
+		testHarness.open();
+		testHarness.close();
+		assertThat(
+				getCommitter(testHarness).isClosed(),
+				is(true));
+	}
+
+	@Test
+	public void restoredFromMergedState() throws Exception {
+
+		final List<String> input1 = Arrays.asList("today", "whom");
+		final OperatorSubtaskState operatorSubtaskState1 = buildSubtaskState(input1);
+
+		final List<String> input2 = Arrays.asList("future", "evil", "how");
+		final OperatorSubtaskState operatorSubtaskState2 = buildSubtaskState(input2);
+
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness();
+
+		final OperatorSubtaskState mergedOperatorSubtaskState =
+				OneInputStreamOperatorTestHarness.repackageState(
+						operatorSubtaskState1,
+						operatorSubtaskState2);
+
+		testHarness.initializeState(mergedOperatorSubtaskState);
+
+		final AbstractStreamingCommitterOperator<String, String> streamingCommitterOperator =
+				(AbstractStreamingCommitterOperator<String, String>) testHarness.getOneInputOperator();
+
+		final List<String> expectedStates = new ArrayList<>();
+		expectedStates.addAll(getExpectedCommittables(input1, testHarness));
+		expectedStates.addAll(getExpectedCommittables(input2, testHarness));
+		assertThat(
+				expectedStates,
+				equalTo(
+						streamingCommitterOperator.getCurrentCommittables()));
+		testHarness.close();
+	}
+
+	@Test
+	public void commitMultipleStagesTogether() throws Exception {
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness();
+		testHarness.initializeEmptyState();
+		testHarness.open();
+
+		final List<String> expectedOutput = new ArrayList<>();
+		expectedOutput.addAll(preCommit(testHarness, 1L, Arrays.asList("cautious", "nature")));
+		expectedOutput.addAll(preCommit(testHarness, 2L, Arrays.asList("cautious", "nature")));
+		expectedOutput.addAll(preCommit(testHarness, 3L, Arrays.asList("lawyer", "grammar")));
+
+		completeCheckpoint(testHarness, 3, expectedOutput);
+
+		testHarness.close();
+	}
+
+	/**
+	 * Process the input and return the snapshot.
+	 */
+	protected OperatorSubtaskState buildSubtaskState(List<String> input) throws Exception {
+		final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness();
+		testHarness.initializeEmptyState();
+		testHarness.open();
+
+		final List<String> committables = getExpectedCommittables(input, testHarness);
+		process(testHarness, input);
+		final OperatorSubtaskState operatorSubtaskState = snapshot(testHarness, 1, 1, committables);
+		testHarness.close();
+
+		return operatorSubtaskState;
+	}
+
+	private List<String> getExpectedCommittables(
+			List<String> committables,
+			OneInputStreamOperatorTestHarness<String, String> testHarness) {
+		final AbstractTestCommitter<String> committer = getCommitter(testHarness);
+		if (committer instanceof GlobalCommitter) {
+			final GlobalCommitter<String, String> globalCommitter = (GlobalCommitter<String, String>) committer;
+			return Collections.singletonList(globalCommitter.combine(committables));
+		}
+		return committables;
+	}
+
+	// ------------------------ The sub class should override or implement following method ----------------------------
+
+	abstract AbstractStreamingCommitterOperatorFactory<String, String> createStreamingCommitterOperator(
+			TestSink<?, String, String, String> sink);
+
+	GlobalCommitter<String, String> createGlobalCommitter() {
+		throw new RuntimeException("Sub class should override this method");
+	}
+
+	Committer<String> createCommitter() {
+		throw new RuntimeException("Sub class should override this method");
+	}
+
+	GlobalCommitter<String, String> createAlwaysRetryGlobalCommitter() {
+		throw new RuntimeException("Sub class should override this method");
+	}
+
+	Committer<String> createAlwaysRetryCommitter() {
+		throw new RuntimeException("Sub class should override this method");
+	}
+
+	// -------------------------------- OneInputStreamOperatorTestHarness Factory --------------------------------------
+
+	protected OneInputStreamOperatorTestHarness<String, String> createTestHarness() throws Exception {
+		return new OneInputStreamOperatorTestHarness<>(
+				createStreamingCommitterOperator(
+						TestSink.create(
+								() -> DEFAULT_WRITER,
+								() -> Optional.of(createCommitter()),
+								() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE),
+								() -> Optional.of(createGlobalCommitter()),
+								() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))),
+				StringSerializer.INSTANCE);
+	}
+
+	protected OneInputStreamOperatorTestHarness<String, String> createTestHarness(
+			OperatorSubtaskState operatorSubtaskState) throws Exception {
+
+		final OneInputStreamOperatorTestHarness<String, String> operatorTestHarness =
+				new OneInputStreamOperatorTestHarness<>(
+						createStreamingCommitterOperator(
+								TestSink.create(
+										() -> DEFAULT_WRITER,
+										() -> Optional.of(createCommitter()),
+										() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE),
+										() -> Optional.of(createGlobalCommitter()),
+										() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))),
+						StringSerializer.INSTANCE);
+		operatorTestHarness.initializeState(operatorSubtaskState);
+		return operatorTestHarness;
+	}
+
+	OneInputStreamOperatorTestHarness<String, String> createAlwaysRetryTestHarness() throws Exception {
+
+		return new OneInputStreamOperatorTestHarness<>(
+				createStreamingCommitterOperator(
+						TestSink.create(
+								() -> DEFAULT_WRITER,
+								() -> Optional.of(createAlwaysRetryCommitter()),
+								() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE),
+								() -> Optional.of(createAlwaysRetryGlobalCommitter()),
+								() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))),
+				StringSerializer.INSTANCE);
+	}
+
+	OneInputStreamOperatorTestHarness<String, String> createWithoutSerializer() throws Exception {
+		return new OneInputStreamOperatorTestHarness<>(
+				createStreamingCommitterOperator(
+						TestSink.create(
+								() -> DEFAULT_WRITER,
+								() -> Optional.of(createCommitter()),
+								() -> Optional.empty(),
+								() -> Optional.of(createGlobalCommitter()),
+								() -> Optional.empty())), StringSerializer.INSTANCE);
+	}
+
+	OneInputStreamOperatorTestHarness<String, String> createWithoutCommitter() throws Exception {
+		return new OneInputStreamOperatorTestHarness<>(
+				createStreamingCommitterOperator(
+						TestSink.create(
+								() -> DEFAULT_WRITER,
+								() -> Optional.empty(),
+								() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE),
+								() -> Optional.empty(),
+								() -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))),
+				StringSerializer.INSTANCE);
+	}
+
+	/**
+	 * Base class for out testing {@link Committer} and {@link org.apache.flink.api.connector.sink.GlobalCommitter}.
+	 */
+	abstract static class AbstractTestCommitter<CommT> implements Committer<CommT> {
+
+		protected List<CommT> committedData;
+
+		private boolean isClosed;
+
+		public AbstractTestCommitter() {
+			this.committedData = new ArrayList<>();
+			this.isClosed = false;
+		}
+
+		public List<CommT> getCommittedData() {
+			return committedData;
+		}
+
+		@Override
+		public void close() throws Exception {
+			isClosed = true;
+		}
+
+		public boolean isClosed() {
+			return isClosed;
+		}
+	}
+
+	// ------------------------------------------- Utils for testing ---------------------------------------------------
+
+	/**
+	 * Process the input and return the pre-commit results.
+	 */
+	private List<String> preCommit(
+			OneInputStreamOperatorTestHarness<String, String> testHarness,
+			long checkpointId,
+			List<String> input) throws Exception {
+
+		process(testHarness, input);
+		final List<String> expectedCommittables = getExpectedCommittables(input, testHarness);
+		snapshot(testHarness, checkpointId, 1L, expectedCommittables);
+		return expectedCommittables;
+	}
+
+	private <InputT, CommT> void process(
+			OneInputStreamOperatorTestHarness<InputT, CommT> testHarness,
+			List<InputT> inputs) {
+
+		final AbstractStreamingCommitterOperator<String, String> streamingCommitterOperator = (AbstractStreamingCommitterOperator<String, String>) testHarness
+				.getOneInputOperator();
+		final List<StreamRecord<InputT>> inputStreamRecords = inputs
+				.stream()
+				.map(StreamRecord::new)
+				.collect(Collectors.toList());
+
+		// verify currently received committables
+		inputStreamRecords.forEach(FunctionUtils.uncheckedConsumer(testHarness::processElement));
+
+		Assert.assertArrayEquals(
+				inputs.toArray(),
+				streamingCommitterOperator.getCurrentInputs().toArray());
+	}
+
+	<InputT, CommT> OperatorSubtaskState snapshot(
+			OneInputStreamOperatorTestHarness<InputT, CommT> testHarness,
+			long checkpointId,
+			long checkpointTimestamp,
+			List<CommT> expectedCommittables) throws Exception {
+
+		final AbstractStreamingCommitterOperator<InputT, CommT> streamingCommitterOperator =
+				(AbstractStreamingCommitterOperator<InputT, CommT>) testHarness.getOneInputOperator();
+
+		final List<CommT> expectedState =
+				streamingCommitterOperator.getCommittablesPerCheckpoint().values().stream()
+						.flatMap(Collection::stream)
+						.collect(Collectors.toList());
+		expectedState.addAll(expectedCommittables);
+
+		final NavigableMap<Long, List<CommT>> expectedCommittablesPerCheckpoint = new TreeMap<>(
+				streamingCommitterOperator.getCommittablesPerCheckpoint());
+		final List<StreamingCommitterState<CommT>> expectedStreamingCommitterState = Collections.singletonList(
+				new StreamingCommitterState<>(expectedState));
+
+		expectedCommittablesPerCheckpoint.put(
+				checkpointId,
+				streamingCommitterOperator.getCurrentCommittables());
+
+		final OperatorSubtaskState operatorSubtaskState = testHarness.snapshot(
+				checkpointId,
+				checkpointTimestamp);
+
+		// verify current committables are cleaned up after checkpointing
+		assertThat(
+				streamingCommitterOperator.getCurrentInputs(),
+				is(

Review comment:
       No need to change line here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org