You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/22 06:17:51 UTC

[GitHub] [flink] gaoyunhaii opened a new pull request #13740: [FLINK-19758][filesystem] Implement a new unified file sink based on the new sink API

gaoyunhaii opened a new pull request #13740:
URL: https://github.com/apache/flink/pull/13740


   ## What is the purpose of the change
   
   This pull requests implementations a new File sink based on the new sink API.
   
   
   ## Brief change log
   
   af69ad89405936040b55d54acb0ef18e6141e5ad implements the sink.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
    - Unit tests for each components for the new sink.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): **no**
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **yes**
     - The serializers: **no**
     - The runtime per-record code paths (performance sensitive): **no**
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: **no**
     - The S3 file system connector:  **no**
   
   ## Documentation
   
     - Does this pull request introduce a new feature? **no**
     - If yes, how is the feature documented? **not applicable**
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13740: [FLINK-19758][filesystem] Implement a new unified file sink based on the new sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13740:
URL: https://github.com/apache/flink/pull/13740#issuecomment-714266782






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13740: [FLINK-19758][filesystem] Implement a new unified file sink based on the new sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13740:
URL: https://github.com/apache/flink/pull/13740#issuecomment-714266782


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "af69ad89405936040b55d54acb0ef18e6141e5ad",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8078",
       "triggerID" : "af69ad89405936040b55d54acb0ef18e6141e5ad",
       "triggerType" : "PUSH"
     }, {
       "hash" : "db5d2e491af1b183719c3365c57c3d4e5ee55b2a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "db5d2e491af1b183719c3365c57c3d4e5ee55b2a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * af69ad89405936040b55d54acb0ef18e6141e5ad Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8078) 
   * db5d2e491af1b183719c3365c57c3d4e5ee55b2a UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13740: [FLINK-19758][filesystem] Implement a new unified file sink based on the new sink API

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13740:
URL: https://github.com/apache/flink/pull/13740#issuecomment-714258749


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit af69ad89405936040b55d54acb0ef18e6141e5ad (Thu Oct 22 06:20:11 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
    * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-19758).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work.
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13740: [FLINK-19758][filesystem] Implement a new unified file sink based on the new sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13740:
URL: https://github.com/apache/flink/pull/13740#issuecomment-714266782


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "af69ad89405936040b55d54acb0ef18e6141e5ad",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8078",
       "triggerID" : "af69ad89405936040b55d54acb0ef18e6141e5ad",
       "triggerType" : "PUSH"
     }, {
       "hash" : "db5d2e491af1b183719c3365c57c3d4e5ee55b2a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8526",
       "triggerID" : "db5d2e491af1b183719c3365c57c3d4e5ee55b2a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef1570ccdb0c2f1d148ce20d2cfb17bf02afafdd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8534",
       "triggerID" : "ef1570ccdb0c2f1d148ce20d2cfb17bf02afafdd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8ad8ba6a6974bdc2295b372d538a95ec1a086b63",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8566",
       "triggerID" : "8ad8ba6a6974bdc2295b372d538a95ec1a086b63",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0986c4559d84792d12e6652b323c8a0b93f3b415",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "0986c4559d84792d12e6652b323c8a0b93f3b415",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fa784a03a5e1f2b3e1635b1151187129d04ef00d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8820",
       "triggerID" : "fa784a03a5e1f2b3e1635b1151187129d04ef00d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b8f151e4cedd920d57e4ec705cf7a79b534e0b2c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8808",
       "triggerID" : "b8f151e4cedd920d57e4ec705cf7a79b534e0b2c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b8f151e4cedd920d57e4ec705cf7a79b534e0b2c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8887",
       "triggerID" : "721082335",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "6f5fac8cf5cf7e0ea5e87c1ca242869ee44fab17",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8886",
       "triggerID" : "6f5fac8cf5cf7e0ea5e87c1ca242869ee44fab17",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e49d604917caf8326cdfb3af129946d7030dba9f",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8916",
       "triggerID" : "e49d604917caf8326cdfb3af129946d7030dba9f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0986c4559d84792d12e6652b323c8a0b93f3b415 UNKNOWN
   * e49d604917caf8326cdfb3af129946d7030dba9f Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8916) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13740: [FLINK-19758][filesystem] Implement a new unified file sink based on the new sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13740:
URL: https://github.com/apache/flink/pull/13740#issuecomment-714266782


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "af69ad89405936040b55d54acb0ef18e6141e5ad",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8078",
       "triggerID" : "af69ad89405936040b55d54acb0ef18e6141e5ad",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * af69ad89405936040b55d54acb0ef18e6141e5ad Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8078) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13740: [FLINK-19758][filesystem] Implement a new unified file sink based on the new sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13740:
URL: https://github.com/apache/flink/pull/13740#issuecomment-714266782


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "af69ad89405936040b55d54acb0ef18e6141e5ad",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8078",
       "triggerID" : "af69ad89405936040b55d54acb0ef18e6141e5ad",
       "triggerType" : "PUSH"
     }, {
       "hash" : "db5d2e491af1b183719c3365c57c3d4e5ee55b2a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8526",
       "triggerID" : "db5d2e491af1b183719c3365c57c3d4e5ee55b2a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef1570ccdb0c2f1d148ce20d2cfb17bf02afafdd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8534",
       "triggerID" : "ef1570ccdb0c2f1d148ce20d2cfb17bf02afafdd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8ad8ba6a6974bdc2295b372d538a95ec1a086b63",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8566",
       "triggerID" : "8ad8ba6a6974bdc2295b372d538a95ec1a086b63",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0986c4559d84792d12e6652b323c8a0b93f3b415",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "0986c4559d84792d12e6652b323c8a0b93f3b415",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fa784a03a5e1f2b3e1635b1151187129d04ef00d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8820",
       "triggerID" : "fa784a03a5e1f2b3e1635b1151187129d04ef00d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b8f151e4cedd920d57e4ec705cf7a79b534e0b2c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8808",
       "triggerID" : "b8f151e4cedd920d57e4ec705cf7a79b534e0b2c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b8f151e4cedd920d57e4ec705cf7a79b534e0b2c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8887",
       "triggerID" : "721082335",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "6f5fac8cf5cf7e0ea5e87c1ca242869ee44fab17",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8886",
       "triggerID" : "6f5fac8cf5cf7e0ea5e87c1ca242869ee44fab17",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e49d604917caf8326cdfb3af129946d7030dba9f",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e49d604917caf8326cdfb3af129946d7030dba9f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0986c4559d84792d12e6652b323c8a0b93f3b415 UNKNOWN
   * 6f5fac8cf5cf7e0ea5e87c1ca242869ee44fab17 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8886) 
   * e49d604917caf8326cdfb3af129946d7030dba9f UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13740: [FLINK-19758][filesystem] Implement a new unified file sink based on the new sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13740:
URL: https://github.com/apache/flink/pull/13740#issuecomment-714266782


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "af69ad89405936040b55d54acb0ef18e6141e5ad",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8078",
       "triggerID" : "af69ad89405936040b55d54acb0ef18e6141e5ad",
       "triggerType" : "PUSH"
     }, {
       "hash" : "db5d2e491af1b183719c3365c57c3d4e5ee55b2a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8526",
       "triggerID" : "db5d2e491af1b183719c3365c57c3d4e5ee55b2a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef1570ccdb0c2f1d148ce20d2cfb17bf02afafdd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8534",
       "triggerID" : "ef1570ccdb0c2f1d148ce20d2cfb17bf02afafdd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8ad8ba6a6974bdc2295b372d538a95ec1a086b63",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8566",
       "triggerID" : "8ad8ba6a6974bdc2295b372d538a95ec1a086b63",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0986c4559d84792d12e6652b323c8a0b93f3b415",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "0986c4559d84792d12e6652b323c8a0b93f3b415",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fa784a03a5e1f2b3e1635b1151187129d04ef00d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8820",
       "triggerID" : "fa784a03a5e1f2b3e1635b1151187129d04ef00d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b8f151e4cedd920d57e4ec705cf7a79b534e0b2c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8808",
       "triggerID" : "b8f151e4cedd920d57e4ec705cf7a79b534e0b2c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b8f151e4cedd920d57e4ec705cf7a79b534e0b2c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8887",
       "triggerID" : "721082335",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "6f5fac8cf5cf7e0ea5e87c1ca242869ee44fab17",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8886",
       "triggerID" : "6f5fac8cf5cf7e0ea5e87c1ca242869ee44fab17",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e49d604917caf8326cdfb3af129946d7030dba9f",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8916",
       "triggerID" : "e49d604917caf8326cdfb3af129946d7030dba9f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "90aacd961fbb0ed538fda229642a707f7afcca69",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8940",
       "triggerID" : "90aacd961fbb0ed538fda229642a707f7afcca69",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0986c4559d84792d12e6652b323c8a0b93f3b415 UNKNOWN
   * e49d604917caf8326cdfb3af129946d7030dba9f Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8916) 
   * 90aacd961fbb0ed538fda229642a707f7afcca69 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8940) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13740: [FLINK-19758][filesystem] Implement a new unified file sink based on the new sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13740:
URL: https://github.com/apache/flink/pull/13740#issuecomment-714266782


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "af69ad89405936040b55d54acb0ef18e6141e5ad",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8078",
       "triggerID" : "af69ad89405936040b55d54acb0ef18e6141e5ad",
       "triggerType" : "PUSH"
     }, {
       "hash" : "db5d2e491af1b183719c3365c57c3d4e5ee55b2a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8526",
       "triggerID" : "db5d2e491af1b183719c3365c57c3d4e5ee55b2a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef1570ccdb0c2f1d148ce20d2cfb17bf02afafdd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8534",
       "triggerID" : "ef1570ccdb0c2f1d148ce20d2cfb17bf02afafdd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8ad8ba6a6974bdc2295b372d538a95ec1a086b63",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8566",
       "triggerID" : "8ad8ba6a6974bdc2295b372d538a95ec1a086b63",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0986c4559d84792d12e6652b323c8a0b93f3b415",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "0986c4559d84792d12e6652b323c8a0b93f3b415",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fa784a03a5e1f2b3e1635b1151187129d04ef00d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8820",
       "triggerID" : "fa784a03a5e1f2b3e1635b1151187129d04ef00d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b8f151e4cedd920d57e4ec705cf7a79b534e0b2c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8808",
       "triggerID" : "b8f151e4cedd920d57e4ec705cf7a79b534e0b2c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b8f151e4cedd920d57e4ec705cf7a79b534e0b2c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8887",
       "triggerID" : "721082335",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "6f5fac8cf5cf7e0ea5e87c1ca242869ee44fab17",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8886",
       "triggerID" : "6f5fac8cf5cf7e0ea5e87c1ca242869ee44fab17",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e49d604917caf8326cdfb3af129946d7030dba9f",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8916",
       "triggerID" : "e49d604917caf8326cdfb3af129946d7030dba9f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "90aacd961fbb0ed538fda229642a707f7afcca69",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "90aacd961fbb0ed538fda229642a707f7afcca69",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0986c4559d84792d12e6652b323c8a0b93f3b415 UNKNOWN
   * e49d604917caf8326cdfb3af129946d7030dba9f Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8916) 
   * 90aacd961fbb0ed538fda229642a707f7afcca69 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] kl0u commented on a change in pull request #13740: [FLINK-19758][filesystem] Implement a new unified file sink based on the new sink API

Posted by GitBox <gi...@apache.org>.
kl0u commented on a change in pull request #13740:
URL: https://github.com/apache/flink/pull/13740#discussion_r515906902



##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializer.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+import static org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.InProgressFileRecoverable;
+
+/**
+ * A {@code SimpleVersionedSerializer} used to serialize the {@link FileWriterBucketState BucketState}.
+ */
+@Internal
+public class FileWriterBucketStateSerializer<BucketID>
+		implements SimpleVersionedSerializer<FileWriterBucketState<BucketID>> {
+
+	private static final int MAGIC_NUMBER = 0x1e764b79;
+
+	private final SimpleVersionedSerializer<InProgressFileRecoverable> inProgressFileRecoverableSerializer;
+
+	private final SimpleVersionedSerializer<BucketID> bucketIdSerializer;
+
+	public FileWriterBucketStateSerializer(
+			SimpleVersionedSerializer<InProgressFileRecoverable> inProgressFileRecoverableSerializer,
+			SimpleVersionedSerializer<BucketID> bucketIdSerializer) {
+		this.inProgressFileRecoverableSerializer = Preconditions.checkNotNull(

Review comment:
       Why not statically importing the `checkNotNull()` check :
   ```
   import static org.apache.flink.util.Preconditions.checkNotNull;
   ```
   so that you do not have to write the whole `Preconditions.checkNotNull()` ?

##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.Writer;
+import org.apache.flink.connector.file.sink.committer.FileCommitter;
+import org.apache.flink.connector.file.sink.writer.DefaultFileWriterBucketFactory;
+import org.apache.flink.connector.file.sink.writer.FileWriter;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketFactory;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketState;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketStateSerializer;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A unified sink for both streaming and blocking mode, based on the new Sink API.
+ */
+@Experimental
+public class FileSink<IN, BucketID>
+		implements Sink<IN, FileSinkCommittable, FileWriterBucketState<BucketID>, Void> {
+
+	private final BucketsBuilder<IN, BucketID, ? extends BucketsBuilder<IN, BucketID, ?>> bucketsBuilder;
+
+	private FileSink(BucketsBuilder<IN, BucketID, ? extends BucketsBuilder<IN, BucketID, ?>> bucketsBuilder) {
+		this.bucketsBuilder = checkNotNull(bucketsBuilder);
+	}
+
+	@Override
+	public Writer<IN, FileSinkCommittable, FileWriterBucketState<BucketID>> createWriter(
+			InitContext context,
+			List<FileWriterBucketState<BucketID>> states) throws IOException {
+		FileWriter<IN, BucketID> writer = bucketsBuilder.createWriter(context);
+		writer.initializeState(states);
+		return writer;
+	}
+
+	@Override
+	public Optional<SimpleVersionedSerializer<FileWriterBucketState<BucketID>>> getWriterStateSerializer()
+			throws IOException {
+		return Optional.of(bucketsBuilder.getWriterStateSerializer());
+	}
+
+	@Override
+	public Optional<Committer<FileSinkCommittable>> createCommitter() throws IOException {
+		return Optional.of(bucketsBuilder.createCommitter());
+	}
+
+	@Override
+	public Optional<SimpleVersionedSerializer<FileSinkCommittable>> getCommittableSerializer()
+			throws IOException {
+		return Optional.of(bucketsBuilder.getCommittableSerializer());
+	}
+
+	@Override
+	public Optional<GlobalCommitter<FileSinkCommittable, Void>> createGlobalCommitter() {
+		return Optional.empty();
+	}
+
+	@Override
+	public Optional<SimpleVersionedSerializer<Void>> getGlobalCommittableSerializer() {
+		return Optional.empty();
+	}
+
+	public static <IN> DefaultRowFormatBuilder<IN> forRowFormat(
+			final Path basePath, final Encoder<IN> encoder) {
+		return new DefaultRowFormatBuilder<>(basePath, encoder, new DateTimeBucketAssigner<>());
+	}
+

Review comment:
       I think we are missing the `forBulkFormat()`? 

##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.writer;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.sink.Writer;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Writer implementation for {@link FileSink}.
+ */
+public class FileWriter<IN, BucketID>
+		implements Writer<IN, FileSinkCommittable, FileWriterBucketState<BucketID>> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FileWriter.class);
+
+	// ------------------------ configuration fields --------------------------
+
+	private final Path basePath;
+
+	private final FileWriterBucketFactory<IN, BucketID> bucketFactory;
+
+	private final BucketAssigner<IN, BucketID> bucketAssigner;
+
+	private final BucketWriter<IN, BucketID> bucketWriter;
+
+	private final RollingPolicy<IN, BucketID> rollingPolicy;
+
+	// --------------------------- runtime fields -----------------------------
+
+	private final BucketerContext bucketerContext;
+
+	private final Map<BucketID, FileWriterBucket<IN, BucketID>> activeBuckets;
+
+	private final OutputFileConfig outputFileConfig;
+
+	// --------------------------- State Related Fields -----------------------------
+
+	private final FileWriterBucketStateSerializer<BucketID> bucketStateSerializer;
+
+	/**
+	 * A constructor creating a new empty bucket manager.
+	 *
+	 * @param basePath The base path for our buckets.
+	 * @param bucketAssigner The {@link BucketAssigner} provided by the user.
+	 * @param bucketFactory The {@link FileWriterBucketFactory} to be used to create buckets.
+	 * @param bucketWriter The {@link BucketWriter} to be used when writing data.
+	 * @param rollingPolicy The {@link RollingPolicy} as specified by the user.
+	 */
+	public FileWriter(
+			final Path basePath,
+			final BucketAssigner<IN, BucketID> bucketAssigner,
+			final FileWriterBucketFactory<IN, BucketID> bucketFactory,
+			final BucketWriter<IN, BucketID> bucketWriter,
+			final RollingPolicy<IN, BucketID> rollingPolicy,
+			final OutputFileConfig outputFileConfig) {
+
+		this.basePath = checkNotNull(basePath);
+		this.bucketAssigner = checkNotNull(bucketAssigner);
+		this.bucketFactory = checkNotNull(bucketFactory);
+		this.bucketWriter = checkNotNull(bucketWriter);
+		this.rollingPolicy = checkNotNull(rollingPolicy);
+
+		this.outputFileConfig = checkNotNull(outputFileConfig);
+
+		this.activeBuckets = new HashMap<>();
+		this.bucketerContext = new BucketerContext();
+
+		this.bucketStateSerializer = new FileWriterBucketStateSerializer<>(
+				bucketWriter.getProperties().getInProgressFileRecoverableSerializer(),
+				bucketAssigner.getSerializer());
+	}
+
+	/**
+	 * Initializes the state after recovery from a failure.
+	 *
+	 * <p>During this process:
+	 * <ol>
+	 *     <li>we set the initial value for part counter to the maximum value used before across all tasks and buckets.
+	 *     This guarantees that we do not overwrite valid data,</li>
+	 *     <li>we commit any pending files for previous checkpoints (previous to the last successful one from which we restore),</li>
+	 *     <li>we resume writing to the previous in-progress file of each bucket, and</li>
+	 *     <li>if we receive multiple states for the same bucket, we merge them.</li>
+	 * </ol>
+	 *
+	 * @param bucketStates the state holding recovered state about active buckets.
+	 *
+	 * @throws Exception if anything goes wrong during retrieving the state or restoring/committing of any
+	 * 		in-progress/pending part files
+	 */
+	public void initializeState(List<FileWriterBucketState<BucketID>> bucketStates) throws IOException {
+		for (FileWriterBucketState<BucketID> state : bucketStates) {

Review comment:
       I think we should add `checkState(bucketStates != null, "The retrieved state was null.");` or sth like that here.

##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.InProgressFileRecoverable;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A bucket is the directory organization of the output of the {@link FileSink}.
+ *
+ * <p>For each incoming element in the {@code FileSink}, the user-specified
+ * {@link BucketAssigner} is queried to see in which bucket this element should be written to.
+ */
+@Internal
+class FileWriterBucket<IN, BucketID> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FileWriterBucket.class);
+
+	private final BucketID bucketId;
+
+	private final Path bucketPath;
+
+	private final String uniqueId;
+
+	private final BucketWriter<IN, BucketID> bucketWriter;
+
+	private final RollingPolicy<IN, BucketID> rollingPolicy;
+
+	private final OutputFileConfig outputFileConfig;
+
+	private final List<InProgressFileWriter.PendingFileRecoverable> pendingFiles = new ArrayList<>();
+
+	private long partCounter;
+
+	@Nullable
+	private InProgressFileRecoverable inProgressFileToCleanup;
+
+	@Nullable
+	private InProgressFileWriter<IN, BucketID> inProgressPart;
+
+	/**
+	 * Constructor to create a new empty bucket.
+	 */
+	private FileWriterBucket(
+			BucketID bucketId,
+			Path bucketPath,
+			String uniqueId,
+			BucketWriter<IN, BucketID> bucketWriter,
+			RollingPolicy<IN, BucketID> rollingPolicy,
+			OutputFileConfig outputFileConfig) {
+		this.bucketId = checkNotNull(bucketId);
+		this.bucketPath = checkNotNull(bucketPath);
+		this.uniqueId = checkNotNull(uniqueId);
+		this.bucketWriter = checkNotNull(bucketWriter);
+		this.rollingPolicy = checkNotNull(rollingPolicy);
+		this.outputFileConfig = checkNotNull(outputFileConfig);
+
+		this.partCounter = 0;
+	}
+
+	/**
+	 * Constructor to restore a bucket from checkpointed state.
+	 */
+	private FileWriterBucket(
+			String uniqueId,
+			BucketWriter<IN, BucketID> partFileFactory,
+			RollingPolicy<IN, BucketID> rollingPolicy,
+			FileWriterBucketState<BucketID> bucketState,
+			OutputFileConfig outputFileConfig) throws IOException {
+
+		this(
+				bucketState.getBucketId(),
+				bucketState.getBucketPath(),
+				uniqueId,
+				partFileFactory,
+				rollingPolicy,
+				outputFileConfig);
+
+		restoreInProgressFile(bucketState);
+	}
+
+	private void restoreInProgressFile(FileWriterBucketState<BucketID> state) throws IOException {
+		if (!state.hasInProgressFileRecoverable()) {
+			return;
+		}
+
+		// we try to resume the previous in-progress file
+		InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable =
+				state.getInProgressFileRecoverable();
+
+		if (bucketWriter.getProperties().supportsResume()) {
+			inProgressPart = bucketWriter.resumeInProgressFileFrom(
+					bucketId, inProgressFileRecoverable, state.getInProgressFileCreationTime());
+		} else {
+			pendingFiles.add(inProgressFileRecoverable);
+		}
+	}
+
+	public BucketID getBucketId() {
+		return bucketId;
+	}
+
+	public Path getBucketPath() {
+		return bucketPath;
+	}
+
+	public long getPartCounter() {
+		return partCounter;
+	}
+
+	public boolean isActive() {
+		return inProgressPart != null || inProgressFileToCleanup != null || pendingFiles.size() > 0;
+	}
+
+	void merge(final FileWriterBucket<IN, BucketID> bucket) throws IOException {
+		checkNotNull(bucket);
+		checkState(Objects.equals(bucket.bucketPath, bucketPath));
+
+		bucket.closePartFile();
+		pendingFiles.addAll(bucket.pendingFiles);
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Merging buckets for bucket id={}", bucketId);
+		}
+	}
+
+	void write(IN element) throws IOException {
+		long now = System.currentTimeMillis();
+		if (inProgressPart == null || rollingPolicy.shouldRollOnEvent(inProgressPart, element)) {
+			if (LOG.isDebugEnabled()) {
+				LOG.debug(
+						"closing in-progress part file for bucket id={} due to element {}.",
+						bucketId,
+						element);
+			}
+
+			inProgressPart = rollPartFile(now);
+		}
+
+		inProgressPart.write(element, now);
+	}
+
+	List<FileSinkCommittable> prepareCommit(boolean flush) throws IOException {
+		if (inProgressPart != null && (rollingPolicy.shouldRollOnCheckpoint(inProgressPart)
+				|| flush)) {
+			if (LOG.isDebugEnabled()) {
+				LOG.debug(
+						"Closing in-progress part file for bucket id={} on checkpoint.",
+						bucketId);
+			}
+			closePartFile();
+		}
+
+		List<FileSinkCommittable> committables = new ArrayList<>();
+		pendingFiles.forEach(pendingFile -> committables.add(new FileSinkCommittable(pendingFile)));
+		pendingFiles.clear();
+
+		if (inProgressFileToCleanup != null) {
+			committables.add(new FileSinkCommittable(inProgressFileToCleanup));
+			inProgressFileToCleanup = null;
+		}
+
+		return committables;
+	}
+
+	FileWriterBucketState<BucketID> snapshotState() throws IOException {
+		InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = null;
+		long inProgressFileCreationTime = Long.MAX_VALUE;
+
+		if (inProgressPart != null) {
+			inProgressFileRecoverable = inProgressPart.persist();
+			inProgressFileToCleanup = inProgressFileRecoverable;
+			inProgressFileCreationTime = inProgressPart.getCreationTime();
+		}
+
+		return new FileWriterBucketState<>(
+				bucketId,
+				bucketPath,
+				inProgressFileCreationTime,
+				inProgressFileRecoverable);
+	}
+
+	private InProgressFileWriter<IN, BucketID> rollPartFile(long currentTime) throws IOException {
+		closePartFile();
+
+		final Path partFilePath = assembleNewPartPath();
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Opening new part file \"{}\" for bucket id={}.",
+					partFilePath.getName(), bucketId);
+		}
+
+		return bucketWriter.openNewInProgressFile(bucketId, partFilePath, currentTime);
+	}
+
+	/**
+	 * Constructor a new PartPath and increment the partCounter.
+	 */
+	private Path assembleNewPartPath() {
+		long currentPartCounter = partCounter++;
+		return new Path(
+				bucketPath,
+				outputFileConfig.getPartPrefix() + '-' + uniqueId + '-' + currentPartCounter
+						+ outputFileConfig.getPartSuffix());
+	}
+
+	private void closePartFile() throws IOException {
+		if (inProgressPart != null) {
+			InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable = inProgressPart.closeForCommit();
+			pendingFiles.add(pendingFileRecoverable);
+			inProgressPart = null;
+		}
+	}
+
+	void disposePartFile() {
+		if (inProgressPart != null) {
+			inProgressPart.dispose();
+		}
+	}
+
+	// --------------------------- Testing Methods -----------------------------
+
+	@VisibleForTesting
+	public String getUniqueId() {
+		return uniqueId;
+	}
+
+	@Nullable
+	@VisibleForTesting
+	InProgressFileWriter<IN, BucketID> getInProgressPart() {
+		return inProgressPart;
+	}
+

Review comment:
       This also seems to be visible only for testing so why not adding the annotation `@VisibleForTesting`?

##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSinkCommittableSerializer.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter;
+
+import java.io.IOException;
+
+/**
+ * Versioned serializer for {@link FileSinkCommittable}.
+ */
+@Internal
+public class FileSinkCommittableSerializer
+		implements SimpleVersionedSerializer<FileSinkCommittable> {
+
+	private static final int MAGIC_NUMBER = 0x1e765c80;
+
+	private final SimpleVersionedSerializer<InProgressFileWriter.PendingFileRecoverable> pendingFileSerializer;
+
+	private final SimpleVersionedSerializer<InProgressFileWriter.InProgressFileRecoverable> inProgressFileSerializer;
+
+	public FileSinkCommittableSerializer(
+			SimpleVersionedSerializer<InProgressFileWriter.PendingFileRecoverable> pendingFileSerializer,
+			SimpleVersionedSerializer<InProgressFileWriter.InProgressFileRecoverable> inProgressFileSerializer) {
+		this.pendingFileSerializer = pendingFileSerializer;

Review comment:
       I would also add null checks here.

##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.writer;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.sink.Writer;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Writer implementation for {@link FileSink}.
+ */
+public class FileWriter<IN, BucketID>
+		implements Writer<IN, FileSinkCommittable, FileWriterBucketState<BucketID>> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FileWriter.class);
+
+	// ------------------------ configuration fields --------------------------
+
+	private final Path basePath;
+
+	private final FileWriterBucketFactory<IN, BucketID> bucketFactory;
+
+	private final BucketAssigner<IN, BucketID> bucketAssigner;
+
+	private final BucketWriter<IN, BucketID> bucketWriter;
+
+	private final RollingPolicy<IN, BucketID> rollingPolicy;
+
+	// --------------------------- runtime fields -----------------------------
+
+	private final BucketerContext bucketerContext;
+
+	private final Map<BucketID, FileWriterBucket<IN, BucketID>> activeBuckets;
+
+	private final OutputFileConfig outputFileConfig;
+
+	// --------------------------- State Related Fields -----------------------------
+
+	private final FileWriterBucketStateSerializer<BucketID> bucketStateSerializer;
+
+	/**
+	 * A constructor creating a new empty bucket manager.
+	 *
+	 * @param basePath The base path for our buckets.
+	 * @param bucketAssigner The {@link BucketAssigner} provided by the user.
+	 * @param bucketFactory The {@link FileWriterBucketFactory} to be used to create buckets.
+	 * @param bucketWriter The {@link BucketWriter} to be used when writing data.
+	 * @param rollingPolicy The {@link RollingPolicy} as specified by the user.
+	 */
+	public FileWriter(
+			final Path basePath,
+			final BucketAssigner<IN, BucketID> bucketAssigner,
+			final FileWriterBucketFactory<IN, BucketID> bucketFactory,
+			final BucketWriter<IN, BucketID> bucketWriter,
+			final RollingPolicy<IN, BucketID> rollingPolicy,
+			final OutputFileConfig outputFileConfig) {
+
+		this.basePath = checkNotNull(basePath);
+		this.bucketAssigner = checkNotNull(bucketAssigner);
+		this.bucketFactory = checkNotNull(bucketFactory);
+		this.bucketWriter = checkNotNull(bucketWriter);
+		this.rollingPolicy = checkNotNull(rollingPolicy);
+
+		this.outputFileConfig = checkNotNull(outputFileConfig);
+
+		this.activeBuckets = new HashMap<>();
+		this.bucketerContext = new BucketerContext();
+
+		this.bucketStateSerializer = new FileWriterBucketStateSerializer<>(
+				bucketWriter.getProperties().getInProgressFileRecoverableSerializer(),
+				bucketAssigner.getSerializer());
+	}
+
+	/**
+	 * Initializes the state after recovery from a failure.
+	 *
+	 * <p>During this process:
+	 * <ol>
+	 *     <li>we set the initial value for part counter to the maximum value used before across all tasks and buckets.
+	 *     This guarantees that we do not overwrite valid data,</li>
+	 *     <li>we commit any pending files for previous checkpoints (previous to the last successful one from which we restore),</li>
+	 *     <li>we resume writing to the previous in-progress file of each bucket, and</li>
+	 *     <li>if we receive multiple states for the same bucket, we merge them.</li>
+	 * </ol>
+	 *
+	 * @param bucketStates the state holding recovered state about active buckets.
+	 *
+	 * @throws Exception if anything goes wrong during retrieving the state or restoring/committing of any
+	 * 		in-progress/pending part files
+	 */
+	public void initializeState(List<FileWriterBucketState<BucketID>> bucketStates) throws IOException {
+		for (FileWriterBucketState<BucketID> state : bucketStates) {
+			BucketID bucketId = state.getBucketId();
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Restoring: {}", state);
+			}
+
+			FileWriterBucket<IN, BucketID> restoredBucket = bucketFactory.restoreBucket(
+					bucketWriter,
+					rollingPolicy,
+					state,
+					outputFileConfig);
+
+			updateActiveBucketId(bucketId, restoredBucket);
+		}
+	}
+
+	private void updateActiveBucketId(
+			BucketID bucketId,
+			FileWriterBucket<IN, BucketID> restoredBucket) throws IOException {
+		final FileWriterBucket<IN, BucketID> bucket = activeBuckets.get(bucketId);
+		if (bucket != null) {
+			bucket.merge(restoredBucket);
+		} else {
+			activeBuckets.put(bucketId, restoredBucket);
+		}
+	}
+
+	@Override
+	public void write(IN element, Context context) throws IOException {
+		// setting the values in the bucketer context
+		bucketerContext.update(
+				context.timestamp(),
+				context.currentWatermark());
+
+		final BucketID bucketId = bucketAssigner.getBucketId(element, bucketerContext);
+		final FileWriterBucket<IN, BucketID> bucket = getOrCreateBucketForBucketId(bucketId);
+		bucket.write(element);
+	}
+
+	@Override
+	public List<FileSinkCommittable> prepareCommit(boolean flush) throws IOException {
+		List<FileSinkCommittable> committables = new ArrayList<>();
+
+		// Every time before we prepare commit, we first check and remove the inactive
+		// buckets. Checking the activeness right before pre-committing avoid re-creating
+		// the bucket every time if the bucket use OnCheckpointingRollingPolicy.
+		Iterator<Map.Entry<BucketID, FileWriterBucket<IN, BucketID>>> activeBucketIt =
+				activeBuckets.entrySet().iterator();
+		while (activeBucketIt.hasNext()) {

Review comment:
       Shouldn't we check if a bucket is active _after_ we send the committables? A bucket may only have pending files, right? So after sending them, it may be empty and we can remove it. WDYT?

##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.InProgressFileRecoverable;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A bucket is the directory organization of the output of the {@link FileSink}.
+ *
+ * <p>For each incoming element in the {@code FileSink}, the user-specified
+ * {@link BucketAssigner} is queried to see in which bucket this element should be written to.
+ */
+@Internal
+class FileWriterBucket<IN, BucketID> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FileWriterBucket.class);
+
+	private final BucketID bucketId;
+
+	private final Path bucketPath;
+
+	private final String uniqueId;
+
+	private final BucketWriter<IN, BucketID> bucketWriter;
+
+	private final RollingPolicy<IN, BucketID> rollingPolicy;
+
+	private final OutputFileConfig outputFileConfig;
+
+	private final List<InProgressFileWriter.PendingFileRecoverable> pendingFiles = new ArrayList<>();
+
+	private long partCounter;
+
+	@Nullable
+	private InProgressFileRecoverable inProgressFileToCleanup;
+
+	@Nullable
+	private InProgressFileWriter<IN, BucketID> inProgressPart;
+
+	/**
+	 * Constructor to create a new empty bucket.
+	 */
+	private FileWriterBucket(
+			BucketID bucketId,
+			Path bucketPath,
+			String uniqueId,
+			BucketWriter<IN, BucketID> bucketWriter,
+			RollingPolicy<IN, BucketID> rollingPolicy,
+			OutputFileConfig outputFileConfig) {
+		this.bucketId = checkNotNull(bucketId);
+		this.bucketPath = checkNotNull(bucketPath);
+		this.uniqueId = checkNotNull(uniqueId);
+		this.bucketWriter = checkNotNull(bucketWriter);
+		this.rollingPolicy = checkNotNull(rollingPolicy);
+		this.outputFileConfig = checkNotNull(outputFileConfig);
+
+		this.partCounter = 0;
+	}
+
+	/**
+	 * Constructor to restore a bucket from checkpointed state.
+	 */
+	private FileWriterBucket(
+			String uniqueId,
+			BucketWriter<IN, BucketID> partFileFactory,
+			RollingPolicy<IN, BucketID> rollingPolicy,
+			FileWriterBucketState<BucketID> bucketState,
+			OutputFileConfig outputFileConfig) throws IOException {
+
+		this(
+				bucketState.getBucketId(),
+				bucketState.getBucketPath(),
+				uniqueId,
+				partFileFactory,
+				rollingPolicy,
+				outputFileConfig);
+
+		restoreInProgressFile(bucketState);
+	}
+
+	private void restoreInProgressFile(FileWriterBucketState<BucketID> state) throws IOException {
+		if (!state.hasInProgressFileRecoverable()) {
+			return;
+		}
+
+		// we try to resume the previous in-progress file
+		InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable =
+				state.getInProgressFileRecoverable();
+
+		if (bucketWriter.getProperties().supportsResume()) {
+			inProgressPart = bucketWriter.resumeInProgressFileFrom(
+					bucketId, inProgressFileRecoverable, state.getInProgressFileCreationTime());
+		} else {
+			pendingFiles.add(inProgressFileRecoverable);
+		}
+	}
+
+	public BucketID getBucketId() {
+		return bucketId;
+	}
+
+	public Path getBucketPath() {
+		return bucketPath;
+	}
+
+	public long getPartCounter() {
+		return partCounter;
+	}
+
+	public boolean isActive() {
+		return inProgressPart != null || inProgressFileToCleanup != null || pendingFiles.size() > 0;
+	}
+
+	void merge(final FileWriterBucket<IN, BucketID> bucket) throws IOException {
+		checkNotNull(bucket);
+		checkState(Objects.equals(bucket.bucketPath, bucketPath));
+
+		bucket.closePartFile();
+		pendingFiles.addAll(bucket.pendingFiles);
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Merging buckets for bucket id={}", bucketId);
+		}
+	}
+
+	void write(IN element) throws IOException {
+		long now = System.currentTimeMillis();
+		if (inProgressPart == null || rollingPolicy.shouldRollOnEvent(inProgressPart, element)) {
+			if (LOG.isDebugEnabled()) {
+				LOG.debug(
+						"closing in-progress part file for bucket id={} due to element {}.",

Review comment:
       I think that here it should be "Opening new in-progress part file for..." because if there is no part file, then we are not closing anything.

##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.writer;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.sink.Writer;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Writer implementation for {@link FileSink}.
+ */
+public class FileWriter<IN, BucketID>
+		implements Writer<IN, FileSinkCommittable, FileWriterBucketState<BucketID>> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FileWriter.class);
+
+	// ------------------------ configuration fields --------------------------
+
+	private final Path basePath;
+
+	private final FileWriterBucketFactory<IN, BucketID> bucketFactory;
+
+	private final BucketAssigner<IN, BucketID> bucketAssigner;
+
+	private final BucketWriter<IN, BucketID> bucketWriter;
+
+	private final RollingPolicy<IN, BucketID> rollingPolicy;
+
+	// --------------------------- runtime fields -----------------------------
+
+	private final BucketerContext bucketerContext;
+
+	private final Map<BucketID, FileWriterBucket<IN, BucketID>> activeBuckets;
+
+	private final OutputFileConfig outputFileConfig;
+
+	// --------------------------- State Related Fields -----------------------------
+
+	private final FileWriterBucketStateSerializer<BucketID> bucketStateSerializer;
+
+	/**
+	 * A constructor creating a new empty bucket manager.
+	 *
+	 * @param basePath The base path for our buckets.
+	 * @param bucketAssigner The {@link BucketAssigner} provided by the user.
+	 * @param bucketFactory The {@link FileWriterBucketFactory} to be used to create buckets.
+	 * @param bucketWriter The {@link BucketWriter} to be used when writing data.
+	 * @param rollingPolicy The {@link RollingPolicy} as specified by the user.
+	 */
+	public FileWriter(
+			final Path basePath,
+			final BucketAssigner<IN, BucketID> bucketAssigner,
+			final FileWriterBucketFactory<IN, BucketID> bucketFactory,
+			final BucketWriter<IN, BucketID> bucketWriter,
+			final RollingPolicy<IN, BucketID> rollingPolicy,
+			final OutputFileConfig outputFileConfig) {
+
+		this.basePath = checkNotNull(basePath);
+		this.bucketAssigner = checkNotNull(bucketAssigner);
+		this.bucketFactory = checkNotNull(bucketFactory);
+		this.bucketWriter = checkNotNull(bucketWriter);
+		this.rollingPolicy = checkNotNull(rollingPolicy);
+
+		this.outputFileConfig = checkNotNull(outputFileConfig);
+
+		this.activeBuckets = new HashMap<>();
+		this.bucketerContext = new BucketerContext();
+
+		this.bucketStateSerializer = new FileWriterBucketStateSerializer<>(
+				bucketWriter.getProperties().getInProgressFileRecoverableSerializer(),
+				bucketAssigner.getSerializer());
+	}
+
+	/**
+	 * Initializes the state after recovery from a failure.
+	 *
+	 * <p>During this process:
+	 * <ol>
+	 *     <li>we set the initial value for part counter to the maximum value used before across all tasks and buckets.
+	 *     This guarantees that we do not overwrite valid data,</li>
+	 *     <li>we commit any pending files for previous checkpoints (previous to the last successful one from which we restore),</li>
+	 *     <li>we resume writing to the previous in-progress file of each bucket, and</li>
+	 *     <li>if we receive multiple states for the same bucket, we merge them.</li>
+	 * </ol>
+	 *
+	 * @param bucketStates the state holding recovered state about active buckets.
+	 *
+	 * @throws Exception if anything goes wrong during retrieving the state or restoring/committing of any

Review comment:
       `Exception` -> `IOException`

##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.writer;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.sink.Writer;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Writer implementation for {@link FileSink}.
+ */
+public class FileWriter<IN, BucketID>
+		implements Writer<IN, FileSinkCommittable, FileWriterBucketState<BucketID>> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FileWriter.class);
+
+	// ------------------------ configuration fields --------------------------
+
+	private final Path basePath;
+
+	private final FileWriterBucketFactory<IN, BucketID> bucketFactory;
+
+	private final BucketAssigner<IN, BucketID> bucketAssigner;
+
+	private final BucketWriter<IN, BucketID> bucketWriter;
+
+	private final RollingPolicy<IN, BucketID> rollingPolicy;
+
+	// --------------------------- runtime fields -----------------------------
+
+	private final BucketerContext bucketerContext;
+
+	private final Map<BucketID, FileWriterBucket<IN, BucketID>> activeBuckets;
+
+	private final OutputFileConfig outputFileConfig;
+
+	// --------------------------- State Related Fields -----------------------------
+
+	private final FileWriterBucketStateSerializer<BucketID> bucketStateSerializer;
+
+	/**
+	 * A constructor creating a new empty bucket manager.
+	 *
+	 * @param basePath The base path for our buckets.
+	 * @param bucketAssigner The {@link BucketAssigner} provided by the user.
+	 * @param bucketFactory The {@link FileWriterBucketFactory} to be used to create buckets.
+	 * @param bucketWriter The {@link BucketWriter} to be used when writing data.
+	 * @param rollingPolicy The {@link RollingPolicy} as specified by the user.
+	 */
+	public FileWriter(
+			final Path basePath,
+			final BucketAssigner<IN, BucketID> bucketAssigner,
+			final FileWriterBucketFactory<IN, BucketID> bucketFactory,
+			final BucketWriter<IN, BucketID> bucketWriter,
+			final RollingPolicy<IN, BucketID> rollingPolicy,
+			final OutputFileConfig outputFileConfig) {
+
+		this.basePath = checkNotNull(basePath);
+		this.bucketAssigner = checkNotNull(bucketAssigner);
+		this.bucketFactory = checkNotNull(bucketFactory);
+		this.bucketWriter = checkNotNull(bucketWriter);
+		this.rollingPolicy = checkNotNull(rollingPolicy);
+
+		this.outputFileConfig = checkNotNull(outputFileConfig);
+
+		this.activeBuckets = new HashMap<>();
+		this.bucketerContext = new BucketerContext();
+
+		this.bucketStateSerializer = new FileWriterBucketStateSerializer<>(
+				bucketWriter.getProperties().getInProgressFileRecoverableSerializer(),
+				bucketAssigner.getSerializer());
+	}
+
+	/**
+	 * Initializes the state after recovery from a failure.
+	 *
+	 * <p>During this process:
+	 * <ol>
+	 *     <li>we set the initial value for part counter to the maximum value used before across all tasks and buckets.
+	 *     This guarantees that we do not overwrite valid data,</li>
+	 *     <li>we commit any pending files for previous checkpoints (previous to the last successful one from which we restore),</li>
+	 *     <li>we resume writing to the previous in-progress file of each bucket, and</li>
+	 *     <li>if we receive multiple states for the same bucket, we merge them.</li>
+	 * </ol>
+	 *
+	 * @param bucketStates the state holding recovered state about active buckets.
+	 *
+	 * @throws Exception if anything goes wrong during retrieving the state or restoring/committing of any
+	 * 		in-progress/pending part files
+	 */
+	public void initializeState(List<FileWriterBucketState<BucketID>> bucketStates) throws IOException {
+		for (FileWriterBucketState<BucketID> state : bucketStates) {
+			BucketID bucketId = state.getBucketId();
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Restoring: {}", state);
+			}
+
+			FileWriterBucket<IN, BucketID> restoredBucket = bucketFactory.restoreBucket(
+					bucketWriter,
+					rollingPolicy,
+					state,
+					outputFileConfig);
+
+			updateActiveBucketId(bucketId, restoredBucket);
+		}
+	}
+
+	private void updateActiveBucketId(
+			BucketID bucketId,
+			FileWriterBucket<IN, BucketID> restoredBucket) throws IOException {
+		final FileWriterBucket<IN, BucketID> bucket = activeBuckets.get(bucketId);
+		if (bucket != null) {
+			bucket.merge(restoredBucket);
+		} else {
+			activeBuckets.put(bucketId, restoredBucket);
+		}
+	}
+
+	@Override
+	public void write(IN element, Context context) throws IOException {
+		// setting the values in the bucketer context
+		bucketerContext.update(
+				context.timestamp(),
+				context.currentWatermark());
+
+		final BucketID bucketId = bucketAssigner.getBucketId(element, bucketerContext);
+		final FileWriterBucket<IN, BucketID> bucket = getOrCreateBucketForBucketId(bucketId);
+		bucket.write(element);
+	}
+
+	@Override
+	public List<FileSinkCommittable> prepareCommit(boolean flush) throws IOException {
+		List<FileSinkCommittable> committables = new ArrayList<>();
+
+		// Every time before we prepare commit, we first check and remove the inactive
+		// buckets. Checking the activeness right before pre-committing avoid re-creating
+		// the bucket every time if the bucket use OnCheckpointingRollingPolicy.
+		Iterator<Map.Entry<BucketID, FileWriterBucket<IN, BucketID>>> activeBucketIt =
+				activeBuckets.entrySet().iterator();
+		while (activeBucketIt.hasNext()) {
+			Map.Entry<BucketID, FileWriterBucket<IN, BucketID>> entry = activeBucketIt.next();
+			if (!entry.getValue().isActive()) {
+				activeBucketIt.remove();
+			} else {
+				committables.addAll(entry.getValue().prepareCommit(flush));
+			}
+		}
+
+		return committables;
+	}
+
+	@Override
+	public List<FileWriterBucketState<BucketID>> snapshotState() throws IOException {
+		checkState(
+				bucketWriter != null && bucketStateSerializer != null,
+				"sink has not been initialized");
+
+		List<FileWriterBucketState<BucketID>> state = new ArrayList<>();
+		for (FileWriterBucket<IN, BucketID> bucket : activeBuckets.values()) {
+			state.add(bucket.snapshotState());
+		}
+
+		return state;
+	}
+
+	private FileWriterBucket<IN, BucketID> getOrCreateBucketForBucketId(BucketID bucketId) throws IOException {
+		FileWriterBucket<IN, BucketID> bucket = activeBuckets.get(bucketId);
+		if (bucket == null) {
+			final Path bucketPath = assembleBucketPath(bucketId);
+			bucket = bucketFactory.getNewBucket(
+					bucketId,
+					bucketPath,
+					bucketWriter,
+					rollingPolicy,
+					outputFileConfig);
+			activeBuckets.put(bucketId, bucket);
+		}
+		return bucket;
+	}
+

Review comment:
       Missing `@Override`.

##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.InProgressFileRecoverable;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A bucket is the directory organization of the output of the {@link FileSink}.
+ *
+ * <p>For each incoming element in the {@code FileSink}, the user-specified
+ * {@link BucketAssigner} is queried to see in which bucket this element should be written to.
+ */
+@Internal
+class FileWriterBucket<IN, BucketID> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FileWriterBucket.class);
+
+	private final BucketID bucketId;
+
+	private final Path bucketPath;
+
+	private final String uniqueId;
+
+	private final BucketWriter<IN, BucketID> bucketWriter;
+
+	private final RollingPolicy<IN, BucketID> rollingPolicy;
+
+	private final OutputFileConfig outputFileConfig;
+
+	private final List<InProgressFileWriter.PendingFileRecoverable> pendingFiles = new ArrayList<>();
+
+	private long partCounter;
+
+	@Nullable
+	private InProgressFileRecoverable inProgressFileToCleanup;
+
+	@Nullable
+	private InProgressFileWriter<IN, BucketID> inProgressPart;
+
+	/**
+	 * Constructor to create a new empty bucket.
+	 */
+	private FileWriterBucket(
+			BucketID bucketId,
+			Path bucketPath,
+			String uniqueId,
+			BucketWriter<IN, BucketID> bucketWriter,
+			RollingPolicy<IN, BucketID> rollingPolicy,
+			OutputFileConfig outputFileConfig) {
+		this.bucketId = checkNotNull(bucketId);
+		this.bucketPath = checkNotNull(bucketPath);
+		this.uniqueId = checkNotNull(uniqueId);

Review comment:
       Why not putting it here `RandomStringUtils.randomAlphanumeric(32)`? Do we need to set it deterministically somewhere for testing?

##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.committer;
+
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Committer implementation for {@link FileSink}.
+ */
+public class FileCommitter implements Committer<FileSinkCommittable> {
+	private static final Logger LOG = LoggerFactory.getLogger(FileCommitter.class);
+
+	private final BucketWriter<?, ?> bucketWriter;
+
+	public FileCommitter(BucketWriter<?, ?> bucketWriter) {
+		this.bucketWriter = checkNotNull(bucketWriter);
+	}
+
+	@Override
+	public List<FileSinkCommittable> commit(List<FileSinkCommittable> committables)  {
+		List<FileSinkCommittable> needRetry = new ArrayList<>();
+		for (FileSinkCommittable committable : committables) {
+			if (committable.hasPendingFile()) {
+				// We should always use commitAfterRecovery which contains additional checks.
+				try {
+					bucketWriter.recoverPendingFile(committable.getPendingFile()).commitAfterRecovery();
+				} catch (IOException e) {
+					LOG.error("Failed to commit {}", committable.getPendingFile());
+					needRetry.add(committable);
+				}
+			}
+
+			if (committable.hasInProgressFileToCleanup()) {
+				try {
+					bucketWriter.cleanupInProgressFileRecoverable(committable.getInProgressFileToCleanup());
+				} catch (IOException e) {
+					LOG.error("Failed to cleanup {}", committable.getInProgressFileToCleanup());
+					needRetry.add(committable);

Review comment:
       Same here.

##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.committer;
+
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Committer implementation for {@link FileSink}.
+ */
+public class FileCommitter implements Committer<FileSinkCommittable> {
+	private static final Logger LOG = LoggerFactory.getLogger(FileCommitter.class);
+
+	private final BucketWriter<?, ?> bucketWriter;
+
+	public FileCommitter(BucketWriter<?, ?> bucketWriter) {
+		this.bucketWriter = checkNotNull(bucketWriter);
+	}
+
+	@Override
+	public List<FileSinkCommittable> commit(List<FileSinkCommittable> committables)  {
+		List<FileSinkCommittable> needRetry = new ArrayList<>();
+		for (FileSinkCommittable committable : committables) {
+			if (committable.hasPendingFile()) {
+				// We should always use commitAfterRecovery which contains additional checks.
+				try {
+					bucketWriter.recoverPendingFile(committable.getPendingFile()).commitAfterRecovery();
+				} catch (IOException e) {
+					LOG.error("Failed to commit {}", committable.getPendingFile());
+					needRetry.add(committable);

Review comment:
       Currently this will fail because the operators throw an exception. So for now we may have to simply return an empty list and open another JIRA as a reminder, to enable retry when the operators allow it. The other option would be to have a flag like `supportsRetry`, that is always `false` for now and in the future we enable it. 

##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.Writer;
+import org.apache.flink.connector.file.sink.committer.FileCommitter;
+import org.apache.flink.connector.file.sink.writer.DefaultFileWriterBucketFactory;
+import org.apache.flink.connector.file.sink.writer.FileWriter;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketFactory;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketState;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketStateSerializer;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A unified sink for both streaming and blocking mode, based on the new Sink API.
+ */
+@Experimental
+public class FileSink<IN, BucketID>
+		implements Sink<IN, FileSinkCommittable, FileWriterBucketState<BucketID>, Void> {
+
+	private final BucketsBuilder<IN, BucketID, ? extends BucketsBuilder<IN, BucketID, ?>> bucketsBuilder;
+
+	private FileSink(BucketsBuilder<IN, BucketID, ? extends BucketsBuilder<IN, BucketID, ?>> bucketsBuilder) {
+		this.bucketsBuilder = checkNotNull(bucketsBuilder);
+	}
+
+	@Override
+	public Writer<IN, FileSinkCommittable, FileWriterBucketState<BucketID>> createWriter(
+			InitContext context,
+			List<FileWriterBucketState<BucketID>> states) throws IOException {
+		FileWriter<IN, BucketID> writer = bucketsBuilder.createWriter(context);
+		writer.initializeState(states);
+		return writer;
+	}
+
+	@Override
+	public Optional<SimpleVersionedSerializer<FileWriterBucketState<BucketID>>> getWriterStateSerializer()
+			throws IOException {
+		return Optional.of(bucketsBuilder.getWriterStateSerializer());
+	}
+
+	@Override
+	public Optional<Committer<FileSinkCommittable>> createCommitter() throws IOException {
+		return Optional.of(bucketsBuilder.createCommitter());
+	}
+
+	@Override
+	public Optional<SimpleVersionedSerializer<FileSinkCommittable>> getCommittableSerializer()
+			throws IOException {
+		return Optional.of(bucketsBuilder.getCommittableSerializer());
+	}
+
+	@Override
+	public Optional<GlobalCommitter<FileSinkCommittable, Void>> createGlobalCommitter() {
+		return Optional.empty();
+	}
+
+	@Override
+	public Optional<SimpleVersionedSerializer<Void>> getGlobalCommittableSerializer() {
+		return Optional.empty();
+	}
+
+	public static <IN> DefaultRowFormatBuilder<IN> forRowFormat(
+			final Path basePath, final Encoder<IN> encoder) {
+		return new DefaultRowFormatBuilder<>(basePath, encoder, new DateTimeBucketAssigner<>());
+	}
+
+	/**
+	 * The base abstract class for the {@link RowFormatBuilder} and {@link BulkFormatBuilder}.
+	 */
+	@Internal
+	private abstract static class BucketsBuilder<IN, BucketID, T extends BucketsBuilder<IN, BucketID, T>>
+			implements Serializable {
+
+		private static final long serialVersionUID = 1L;
+
+		public static final long DEFAULT_BUCKET_CHECK_INTERVAL = 60L * 1000L;
+
+		@SuppressWarnings("unchecked")
+		protected T self() {
+			return (T) this;
+		}
+
+		@Internal
+		protected abstract FileWriter<IN, BucketID> createWriter(final InitContext context) throws IOException;
+
+		@Internal
+		protected abstract FileCommitter createCommitter() throws IOException;
+
+		@Internal
+		protected abstract SimpleVersionedSerializer<FileWriterBucketState<BucketID>> getWriterStateSerializer() throws IOException;
+
+		@Internal
+		protected abstract SimpleVersionedSerializer<FileSinkCommittable> getCommittableSerializer() throws IOException;
+	}
+
+	/**
+	 * A builder for configuring the sink for row-wise encoding formats.
+	 */
+	public static class RowFormatBuilder<IN, BucketID, T extends RowFormatBuilder<IN, BucketID, T>>
+			extends BucketsBuilder<IN, BucketID, T> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final Path basePath;
+
+		private final Encoder<IN> encoder;
+
+		private BucketAssigner<IN, BucketID> bucketAssigner;
+
+		private RollingPolicy<IN, BucketID> rollingPolicy;
+
+		private FileWriterBucketFactory<IN, BucketID> bucketFactory;
+
+		private OutputFileConfig outputFileConfig;
+
+		protected RowFormatBuilder(
+				Path basePath,
+				Encoder<IN> encoder,
+				BucketAssigner<IN, BucketID> bucketAssigner) {
+			this(
+					basePath,
+					encoder,
+					bucketAssigner,
+					DefaultRollingPolicy.builder().build(),
+					new DefaultFileWriterBucketFactory<>(),
+					OutputFileConfig.builder().build());
+		}
+
+		protected RowFormatBuilder(
+				Path basePath,
+				Encoder<IN> encoder,
+				BucketAssigner<IN, BucketID> assigner,
+				RollingPolicy<IN, BucketID> policy,
+				FileWriterBucketFactory<IN, BucketID> bucketFactory,
+				OutputFileConfig outputFileConfig) {
+			this.basePath = checkNotNull(basePath);
+			this.encoder = checkNotNull(encoder);
+			this.bucketAssigner = checkNotNull(assigner);
+			this.rollingPolicy = checkNotNull(policy);
+			this.bucketFactory = checkNotNull(bucketFactory);
+			this.outputFileConfig = checkNotNull(outputFileConfig);
+		}
+
+		public T withBucketAssigner(final BucketAssigner<IN, BucketID> assigner) {
+			this.bucketAssigner = checkNotNull(assigner);
+			return self();
+		}
+
+		public T withRollingPolicy(final RollingPolicy<IN, BucketID> policy) {
+			this.rollingPolicy = checkNotNull(policy);
+			return self();
+		}
+
+		public T withOutputFileConfig(final OutputFileConfig outputFileConfig) {
+			this.outputFileConfig = outputFileConfig;
+			return self();
+		}
+
+		public RowFormatBuilder<IN, BucketID, ? extends RowFormatBuilder<IN, BucketID, ?>> withNewBucketAssignerAndPolicy(
+				BucketAssigner<IN, BucketID> assigner,
+				RollingPolicy<IN, BucketID> policy) {
+			Preconditions.checkState(
+					bucketFactory.getClass() == DefaultFileWriterBucketFactory.class,
+					"newBuilderWithBucketAssignerAndPolicy() cannot be called "
+							+ "after specifying a customized bucket factory");
+			return new RowFormatBuilder<>(
+					basePath,
+					encoder,
+					checkNotNull(assigner),
+					checkNotNull(policy),
+					bucketFactory,
+					outputFileConfig);
+		}
+
+		@VisibleForTesting
+		T withBucketFactory(final FileWriterBucketFactory<IN, BucketID> factory) {
+			this.bucketFactory = Preconditions.checkNotNull(factory);
+			return self();
+		}
+
+		/** Creates the actual sink. */
+		public FileSink<IN, BucketID> build() {
+			return new FileSink<>(this);
+		}
+
+		@Override
+		public FileWriter<IN, BucketID> createWriter(InitContext context) throws IOException {
+			return new FileWriter<>(
+					basePath,
+					bucketAssigner,
+					bucketFactory,
+					createBucketWriter(),
+					rollingPolicy,
+					outputFileConfig);
+		}
+
+		@Override
+		public FileCommitter createCommitter() throws IOException {
+			return new FileCommitter(createBucketWriter());
+		}
+
+		@Override
+		public SimpleVersionedSerializer<FileWriterBucketState<BucketID>> getWriterStateSerializer() throws IOException {

Review comment:
       This can become `package-private`.

##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.Writer;
+import org.apache.flink.connector.file.sink.committer.FileCommitter;
+import org.apache.flink.connector.file.sink.writer.DefaultFileWriterBucketFactory;
+import org.apache.flink.connector.file.sink.writer.FileWriter;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketFactory;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketState;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketStateSerializer;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A unified sink for both streaming and blocking mode, based on the new Sink API.
+ */
+@Experimental
+public class FileSink<IN, BucketID>
+		implements Sink<IN, FileSinkCommittable, FileWriterBucketState<BucketID>, Void> {
+
+	private final BucketsBuilder<IN, BucketID, ? extends BucketsBuilder<IN, BucketID, ?>> bucketsBuilder;
+
+	private FileSink(BucketsBuilder<IN, BucketID, ? extends BucketsBuilder<IN, BucketID, ?>> bucketsBuilder) {
+		this.bucketsBuilder = checkNotNull(bucketsBuilder);
+	}
+
+	@Override
+	public Writer<IN, FileSinkCommittable, FileWriterBucketState<BucketID>> createWriter(
+			InitContext context,
+			List<FileWriterBucketState<BucketID>> states) throws IOException {
+		FileWriter<IN, BucketID> writer = bucketsBuilder.createWriter(context);
+		writer.initializeState(states);
+		return writer;
+	}
+
+	@Override
+	public Optional<SimpleVersionedSerializer<FileWriterBucketState<BucketID>>> getWriterStateSerializer()
+			throws IOException {
+		return Optional.of(bucketsBuilder.getWriterStateSerializer());
+	}
+
+	@Override
+	public Optional<Committer<FileSinkCommittable>> createCommitter() throws IOException {
+		return Optional.of(bucketsBuilder.createCommitter());
+	}
+
+	@Override
+	public Optional<SimpleVersionedSerializer<FileSinkCommittable>> getCommittableSerializer()
+			throws IOException {
+		return Optional.of(bucketsBuilder.getCommittableSerializer());
+	}
+
+	@Override
+	public Optional<GlobalCommitter<FileSinkCommittable, Void>> createGlobalCommitter() {
+		return Optional.empty();
+	}
+
+	@Override
+	public Optional<SimpleVersionedSerializer<Void>> getGlobalCommittableSerializer() {
+		return Optional.empty();
+	}
+
+	public static <IN> DefaultRowFormatBuilder<IN> forRowFormat(
+			final Path basePath, final Encoder<IN> encoder) {
+		return new DefaultRowFormatBuilder<>(basePath, encoder, new DateTimeBucketAssigner<>());
+	}
+
+	/**
+	 * The base abstract class for the {@link RowFormatBuilder} and {@link BulkFormatBuilder}.
+	 */
+	@Internal
+	private abstract static class BucketsBuilder<IN, BucketID, T extends BucketsBuilder<IN, BucketID, T>>
+			implements Serializable {
+
+		private static final long serialVersionUID = 1L;
+
+		public static final long DEFAULT_BUCKET_CHECK_INTERVAL = 60L * 1000L;
+
+		@SuppressWarnings("unchecked")
+		protected T self() {
+			return (T) this;
+		}
+
+		@Internal
+		protected abstract FileWriter<IN, BucketID> createWriter(final InitContext context) throws IOException;
+
+		@Internal
+		protected abstract FileCommitter createCommitter() throws IOException;
+
+		@Internal
+		protected abstract SimpleVersionedSerializer<FileWriterBucketState<BucketID>> getWriterStateSerializer() throws IOException;
+
+		@Internal
+		protected abstract SimpleVersionedSerializer<FileSinkCommittable> getCommittableSerializer() throws IOException;
+	}
+
+	/**
+	 * A builder for configuring the sink for row-wise encoding formats.
+	 */
+	public static class RowFormatBuilder<IN, BucketID, T extends RowFormatBuilder<IN, BucketID, T>>
+			extends BucketsBuilder<IN, BucketID, T> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final Path basePath;
+
+		private final Encoder<IN> encoder;
+
+		private BucketAssigner<IN, BucketID> bucketAssigner;
+
+		private RollingPolicy<IN, BucketID> rollingPolicy;
+
+		private FileWriterBucketFactory<IN, BucketID> bucketFactory;
+
+		private OutputFileConfig outputFileConfig;
+
+		protected RowFormatBuilder(
+				Path basePath,
+				Encoder<IN> encoder,
+				BucketAssigner<IN, BucketID> bucketAssigner) {
+			this(
+					basePath,
+					encoder,
+					bucketAssigner,
+					DefaultRollingPolicy.builder().build(),
+					new DefaultFileWriterBucketFactory<>(),
+					OutputFileConfig.builder().build());
+		}
+
+		protected RowFormatBuilder(
+				Path basePath,
+				Encoder<IN> encoder,
+				BucketAssigner<IN, BucketID> assigner,
+				RollingPolicy<IN, BucketID> policy,
+				FileWriterBucketFactory<IN, BucketID> bucketFactory,
+				OutputFileConfig outputFileConfig) {
+			this.basePath = checkNotNull(basePath);
+			this.encoder = checkNotNull(encoder);
+			this.bucketAssigner = checkNotNull(assigner);
+			this.rollingPolicy = checkNotNull(policy);
+			this.bucketFactory = checkNotNull(bucketFactory);
+			this.outputFileConfig = checkNotNull(outputFileConfig);
+		}
+
+		public T withBucketAssigner(final BucketAssigner<IN, BucketID> assigner) {
+			this.bucketAssigner = checkNotNull(assigner);
+			return self();
+		}
+
+		public T withRollingPolicy(final RollingPolicy<IN, BucketID> policy) {
+			this.rollingPolicy = checkNotNull(policy);
+			return self();
+		}
+
+		public T withOutputFileConfig(final OutputFileConfig outputFileConfig) {
+			this.outputFileConfig = outputFileConfig;
+			return self();
+		}
+
+		public RowFormatBuilder<IN, BucketID, ? extends RowFormatBuilder<IN, BucketID, ?>> withNewBucketAssignerAndPolicy(
+				BucketAssigner<IN, BucketID> assigner,
+				RollingPolicy<IN, BucketID> policy) {
+			Preconditions.checkState(
+					bucketFactory.getClass() == DefaultFileWriterBucketFactory.class,
+					"newBuilderWithBucketAssignerAndPolicy() cannot be called "
+							+ "after specifying a customized bucket factory");
+			return new RowFormatBuilder<>(
+					basePath,
+					encoder,
+					checkNotNull(assigner),
+					checkNotNull(policy),
+					bucketFactory,
+					outputFileConfig);
+		}
+
+		@VisibleForTesting
+		T withBucketFactory(final FileWriterBucketFactory<IN, BucketID> factory) {
+			this.bucketFactory = Preconditions.checkNotNull(factory);
+			return self();
+		}
+
+		/** Creates the actual sink. */
+		public FileSink<IN, BucketID> build() {
+			return new FileSink<>(this);
+		}
+
+		@Override
+		public FileWriter<IN, BucketID> createWriter(InitContext context) throws IOException {
+			return new FileWriter<>(
+					basePath,
+					bucketAssigner,
+					bucketFactory,
+					createBucketWriter(),
+					rollingPolicy,
+					outputFileConfig);
+		}
+
+		@Override
+		public FileCommitter createCommitter() throws IOException {
+			return new FileCommitter(createBucketWriter());
+		}
+
+		@Override
+		public SimpleVersionedSerializer<FileWriterBucketState<BucketID>> getWriterStateSerializer() throws IOException {
+			return new FileWriterBucketStateSerializer<>(
+					createBucketWriter()
+							.getProperties()
+							.getInProgressFileRecoverableSerializer(),
+					bucketAssigner.getSerializer());
+		}
+
+		@Override
+		public SimpleVersionedSerializer<FileSinkCommittable> getCommittableSerializer() throws IOException {

Review comment:
       This can become `package-private`.

##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.Writer;
+import org.apache.flink.connector.file.sink.committer.FileCommitter;
+import org.apache.flink.connector.file.sink.writer.DefaultFileWriterBucketFactory;
+import org.apache.flink.connector.file.sink.writer.FileWriter;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketFactory;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketState;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketStateSerializer;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A unified sink for both streaming and blocking mode, based on the new Sink API.
+ */
+@Experimental
+public class FileSink<IN, BucketID>
+		implements Sink<IN, FileSinkCommittable, FileWriterBucketState<BucketID>, Void> {
+
+	private final BucketsBuilder<IN, BucketID, ? extends BucketsBuilder<IN, BucketID, ?>> bucketsBuilder;
+
+	private FileSink(BucketsBuilder<IN, BucketID, ? extends BucketsBuilder<IN, BucketID, ?>> bucketsBuilder) {
+		this.bucketsBuilder = checkNotNull(bucketsBuilder);
+	}
+
+	@Override
+	public Writer<IN, FileSinkCommittable, FileWriterBucketState<BucketID>> createWriter(
+			InitContext context,
+			List<FileWriterBucketState<BucketID>> states) throws IOException {
+		FileWriter<IN, BucketID> writer = bucketsBuilder.createWriter(context);
+		writer.initializeState(states);
+		return writer;
+	}
+
+	@Override
+	public Optional<SimpleVersionedSerializer<FileWriterBucketState<BucketID>>> getWriterStateSerializer()
+			throws IOException {
+		return Optional.of(bucketsBuilder.getWriterStateSerializer());
+	}
+
+	@Override
+	public Optional<Committer<FileSinkCommittable>> createCommitter() throws IOException {
+		return Optional.of(bucketsBuilder.createCommitter());
+	}
+
+	@Override
+	public Optional<SimpleVersionedSerializer<FileSinkCommittable>> getCommittableSerializer()
+			throws IOException {
+		return Optional.of(bucketsBuilder.getCommittableSerializer());
+	}
+
+	@Override
+	public Optional<GlobalCommitter<FileSinkCommittable, Void>> createGlobalCommitter() {
+		return Optional.empty();
+	}
+
+	@Override
+	public Optional<SimpleVersionedSerializer<Void>> getGlobalCommittableSerializer() {
+		return Optional.empty();
+	}
+
+	public static <IN> DefaultRowFormatBuilder<IN> forRowFormat(
+			final Path basePath, final Encoder<IN> encoder) {
+		return new DefaultRowFormatBuilder<>(basePath, encoder, new DateTimeBucketAssigner<>());
+	}
+
+	/**
+	 * The base abstract class for the {@link RowFormatBuilder} and {@link BulkFormatBuilder}.
+	 */
+	@Internal
+	private abstract static class BucketsBuilder<IN, BucketID, T extends BucketsBuilder<IN, BucketID, T>>
+			implements Serializable {
+
+		private static final long serialVersionUID = 1L;
+
+		public static final long DEFAULT_BUCKET_CHECK_INTERVAL = 60L * 1000L;
+
+		@SuppressWarnings("unchecked")
+		protected T self() {
+			return (T) this;
+		}
+
+		@Internal
+		protected abstract FileWriter<IN, BucketID> createWriter(final InitContext context) throws IOException;
+
+		@Internal
+		protected abstract FileCommitter createCommitter() throws IOException;
+
+		@Internal
+		protected abstract SimpleVersionedSerializer<FileWriterBucketState<BucketID>> getWriterStateSerializer() throws IOException;
+
+		@Internal
+		protected abstract SimpleVersionedSerializer<FileSinkCommittable> getCommittableSerializer() throws IOException;
+	}
+
+	/**
+	 * A builder for configuring the sink for row-wise encoding formats.
+	 */
+	public static class RowFormatBuilder<IN, BucketID, T extends RowFormatBuilder<IN, BucketID, T>>
+			extends BucketsBuilder<IN, BucketID, T> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final Path basePath;
+
+		private final Encoder<IN> encoder;
+
+		private BucketAssigner<IN, BucketID> bucketAssigner;
+
+		private RollingPolicy<IN, BucketID> rollingPolicy;
+
+		private FileWriterBucketFactory<IN, BucketID> bucketFactory;
+
+		private OutputFileConfig outputFileConfig;
+
+		protected RowFormatBuilder(
+				Path basePath,
+				Encoder<IN> encoder,
+				BucketAssigner<IN, BucketID> bucketAssigner) {
+			this(
+					basePath,
+					encoder,
+					bucketAssigner,
+					DefaultRollingPolicy.builder().build(),
+					new DefaultFileWriterBucketFactory<>(),
+					OutputFileConfig.builder().build());
+		}
+
+		protected RowFormatBuilder(
+				Path basePath,
+				Encoder<IN> encoder,
+				BucketAssigner<IN, BucketID> assigner,
+				RollingPolicy<IN, BucketID> policy,
+				FileWriterBucketFactory<IN, BucketID> bucketFactory,
+				OutputFileConfig outputFileConfig) {
+			this.basePath = checkNotNull(basePath);
+			this.encoder = checkNotNull(encoder);
+			this.bucketAssigner = checkNotNull(assigner);
+			this.rollingPolicy = checkNotNull(policy);
+			this.bucketFactory = checkNotNull(bucketFactory);
+			this.outputFileConfig = checkNotNull(outputFileConfig);
+		}
+
+		public T withBucketAssigner(final BucketAssigner<IN, BucketID> assigner) {
+			this.bucketAssigner = checkNotNull(assigner);
+			return self();
+		}
+
+		public T withRollingPolicy(final RollingPolicy<IN, BucketID> policy) {
+			this.rollingPolicy = checkNotNull(policy);
+			return self();
+		}
+
+		public T withOutputFileConfig(final OutputFileConfig outputFileConfig) {
+			this.outputFileConfig = outputFileConfig;
+			return self();
+		}
+
+		public RowFormatBuilder<IN, BucketID, ? extends RowFormatBuilder<IN, BucketID, ?>> withNewBucketAssignerAndPolicy(
+				BucketAssigner<IN, BucketID> assigner,
+				RollingPolicy<IN, BucketID> policy) {
+			Preconditions.checkState(
+					bucketFactory.getClass() == DefaultFileWriterBucketFactory.class,
+					"newBuilderWithBucketAssignerAndPolicy() cannot be called "
+							+ "after specifying a customized bucket factory");
+			return new RowFormatBuilder<>(
+					basePath,
+					encoder,
+					checkNotNull(assigner),
+					checkNotNull(policy),
+					bucketFactory,
+					outputFileConfig);
+		}
+
+		@VisibleForTesting
+		T withBucketFactory(final FileWriterBucketFactory<IN, BucketID> factory) {
+			this.bucketFactory = Preconditions.checkNotNull(factory);
+			return self();
+		}
+
+		/** Creates the actual sink. */
+		public FileSink<IN, BucketID> build() {
+			return new FileSink<>(this);
+		}
+
+		@Override
+		public FileWriter<IN, BucketID> createWriter(InitContext context) throws IOException {
+			return new FileWriter<>(
+					basePath,
+					bucketAssigner,
+					bucketFactory,
+					createBucketWriter(),
+					rollingPolicy,
+					outputFileConfig);
+		}
+
+		@Override
+		public FileCommitter createCommitter() throws IOException {

Review comment:
       This can become `package-private`.

##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.Writer;
+import org.apache.flink.connector.file.sink.committer.FileCommitter;
+import org.apache.flink.connector.file.sink.writer.DefaultFileWriterBucketFactory;
+import org.apache.flink.connector.file.sink.writer.FileWriter;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketFactory;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketState;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketStateSerializer;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A unified sink for both streaming and blocking mode, based on the new Sink API.
+ */
+@Experimental
+public class FileSink<IN, BucketID>
+		implements Sink<IN, FileSinkCommittable, FileWriterBucketState<BucketID>, Void> {
+
+	private final BucketsBuilder<IN, BucketID, ? extends BucketsBuilder<IN, BucketID, ?>> bucketsBuilder;
+
+	private FileSink(BucketsBuilder<IN, BucketID, ? extends BucketsBuilder<IN, BucketID, ?>> bucketsBuilder) {
+		this.bucketsBuilder = checkNotNull(bucketsBuilder);
+	}
+
+	@Override
+	public Writer<IN, FileSinkCommittable, FileWriterBucketState<BucketID>> createWriter(
+			InitContext context,
+			List<FileWriterBucketState<BucketID>> states) throws IOException {
+		FileWriter<IN, BucketID> writer = bucketsBuilder.createWriter(context);
+		writer.initializeState(states);
+		return writer;
+	}
+
+	@Override
+	public Optional<SimpleVersionedSerializer<FileWriterBucketState<BucketID>>> getWriterStateSerializer()
+			throws IOException {
+		return Optional.of(bucketsBuilder.getWriterStateSerializer());
+	}
+
+	@Override
+	public Optional<Committer<FileSinkCommittable>> createCommitter() throws IOException {
+		return Optional.of(bucketsBuilder.createCommitter());
+	}
+
+	@Override
+	public Optional<SimpleVersionedSerializer<FileSinkCommittable>> getCommittableSerializer()
+			throws IOException {
+		return Optional.of(bucketsBuilder.getCommittableSerializer());
+	}
+
+	@Override
+	public Optional<GlobalCommitter<FileSinkCommittable, Void>> createGlobalCommitter() {
+		return Optional.empty();
+	}
+
+	@Override
+	public Optional<SimpleVersionedSerializer<Void>> getGlobalCommittableSerializer() {
+		return Optional.empty();
+	}
+
+	public static <IN> DefaultRowFormatBuilder<IN> forRowFormat(
+			final Path basePath, final Encoder<IN> encoder) {
+		return new DefaultRowFormatBuilder<>(basePath, encoder, new DateTimeBucketAssigner<>());
+	}
+
+	/**
+	 * The base abstract class for the {@link RowFormatBuilder} and {@link BulkFormatBuilder}.
+	 */
+	@Internal
+	private abstract static class BucketsBuilder<IN, BucketID, T extends BucketsBuilder<IN, BucketID, T>>
+			implements Serializable {
+
+		private static final long serialVersionUID = 1L;
+
+		public static final long DEFAULT_BUCKET_CHECK_INTERVAL = 60L * 1000L;
+
+		@SuppressWarnings("unchecked")
+		protected T self() {
+			return (T) this;
+		}
+
+		@Internal
+		protected abstract FileWriter<IN, BucketID> createWriter(final InitContext context) throws IOException;
+
+		@Internal
+		protected abstract FileCommitter createCommitter() throws IOException;
+
+		@Internal
+		protected abstract SimpleVersionedSerializer<FileWriterBucketState<BucketID>> getWriterStateSerializer() throws IOException;
+
+		@Internal
+		protected abstract SimpleVersionedSerializer<FileSinkCommittable> getCommittableSerializer() throws IOException;
+	}
+
+	/**
+	 * A builder for configuring the sink for row-wise encoding formats.
+	 */
+	public static class RowFormatBuilder<IN, BucketID, T extends RowFormatBuilder<IN, BucketID, T>>
+			extends BucketsBuilder<IN, BucketID, T> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final Path basePath;
+
+		private final Encoder<IN> encoder;
+
+		private BucketAssigner<IN, BucketID> bucketAssigner;
+
+		private RollingPolicy<IN, BucketID> rollingPolicy;
+
+		private FileWriterBucketFactory<IN, BucketID> bucketFactory;
+
+		private OutputFileConfig outputFileConfig;
+
+		protected RowFormatBuilder(
+				Path basePath,
+				Encoder<IN> encoder,
+				BucketAssigner<IN, BucketID> bucketAssigner) {
+			this(
+					basePath,
+					encoder,
+					bucketAssigner,
+					DefaultRollingPolicy.builder().build(),
+					new DefaultFileWriterBucketFactory<>(),
+					OutputFileConfig.builder().build());
+		}
+
+		protected RowFormatBuilder(
+				Path basePath,
+				Encoder<IN> encoder,
+				BucketAssigner<IN, BucketID> assigner,
+				RollingPolicy<IN, BucketID> policy,
+				FileWriterBucketFactory<IN, BucketID> bucketFactory,
+				OutputFileConfig outputFileConfig) {
+			this.basePath = checkNotNull(basePath);
+			this.encoder = checkNotNull(encoder);
+			this.bucketAssigner = checkNotNull(assigner);
+			this.rollingPolicy = checkNotNull(policy);
+			this.bucketFactory = checkNotNull(bucketFactory);
+			this.outputFileConfig = checkNotNull(outputFileConfig);
+		}
+
+		public T withBucketAssigner(final BucketAssigner<IN, BucketID> assigner) {
+			this.bucketAssigner = checkNotNull(assigner);
+			return self();
+		}
+
+		public T withRollingPolicy(final RollingPolicy<IN, BucketID> policy) {
+			this.rollingPolicy = checkNotNull(policy);
+			return self();
+		}
+
+		public T withOutputFileConfig(final OutputFileConfig outputFileConfig) {
+			this.outputFileConfig = outputFileConfig;
+			return self();
+		}
+
+		public RowFormatBuilder<IN, BucketID, ? extends RowFormatBuilder<IN, BucketID, ?>> withNewBucketAssignerAndPolicy(
+				BucketAssigner<IN, BucketID> assigner,
+				RollingPolicy<IN, BucketID> policy) {
+			Preconditions.checkState(
+					bucketFactory.getClass() == DefaultFileWriterBucketFactory.class,
+					"newBuilderWithBucketAssignerAndPolicy() cannot be called "
+							+ "after specifying a customized bucket factory");
+			return new RowFormatBuilder<>(
+					basePath,
+					encoder,
+					checkNotNull(assigner),
+					checkNotNull(policy),
+					bucketFactory,
+					outputFileConfig);
+		}
+
+		@VisibleForTesting
+		T withBucketFactory(final FileWriterBucketFactory<IN, BucketID> factory) {
+			this.bucketFactory = Preconditions.checkNotNull(factory);
+			return self();
+		}
+
+		/** Creates the actual sink. */
+		public FileSink<IN, BucketID> build() {
+			return new FileSink<>(this);
+		}
+
+		@Override
+		public FileWriter<IN, BucketID> createWriter(InitContext context) throws IOException {

Review comment:
       This can become `package-private`.

##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.Writer;
+import org.apache.flink.connector.file.sink.committer.FileCommitter;
+import org.apache.flink.connector.file.sink.writer.DefaultFileWriterBucketFactory;
+import org.apache.flink.connector.file.sink.writer.FileWriter;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketFactory;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketState;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketStateSerializer;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A unified sink for both streaming and blocking mode, based on the new Sink API.
+ */
+@Experimental
+public class FileSink<IN, BucketID>
+		implements Sink<IN, FileSinkCommittable, FileWriterBucketState<BucketID>, Void> {
+
+	private final BucketsBuilder<IN, BucketID, ? extends BucketsBuilder<IN, BucketID, ?>> bucketsBuilder;
+
+	private FileSink(BucketsBuilder<IN, BucketID, ? extends BucketsBuilder<IN, BucketID, ?>> bucketsBuilder) {
+		this.bucketsBuilder = checkNotNull(bucketsBuilder);
+	}
+
+	@Override
+	public Writer<IN, FileSinkCommittable, FileWriterBucketState<BucketID>> createWriter(
+			InitContext context,
+			List<FileWriterBucketState<BucketID>> states) throws IOException {
+		FileWriter<IN, BucketID> writer = bucketsBuilder.createWriter(context);
+		writer.initializeState(states);
+		return writer;
+	}
+
+	@Override
+	public Optional<SimpleVersionedSerializer<FileWriterBucketState<BucketID>>> getWriterStateSerializer()
+			throws IOException {
+		return Optional.of(bucketsBuilder.getWriterStateSerializer());
+	}
+
+	@Override
+	public Optional<Committer<FileSinkCommittable>> createCommitter() throws IOException {
+		return Optional.of(bucketsBuilder.createCommitter());
+	}
+
+	@Override
+	public Optional<SimpleVersionedSerializer<FileSinkCommittable>> getCommittableSerializer()
+			throws IOException {
+		return Optional.of(bucketsBuilder.getCommittableSerializer());
+	}
+
+	@Override
+	public Optional<GlobalCommitter<FileSinkCommittable, Void>> createGlobalCommitter() {
+		return Optional.empty();
+	}
+
+	@Override
+	public Optional<SimpleVersionedSerializer<Void>> getGlobalCommittableSerializer() {
+		return Optional.empty();
+	}
+
+	public static <IN> DefaultRowFormatBuilder<IN> forRowFormat(
+			final Path basePath, final Encoder<IN> encoder) {
+		return new DefaultRowFormatBuilder<>(basePath, encoder, new DateTimeBucketAssigner<>());
+	}
+
+	/**
+	 * The base abstract class for the {@link RowFormatBuilder} and {@link BulkFormatBuilder}.
+	 */
+	@Internal
+	private abstract static class BucketsBuilder<IN, BucketID, T extends BucketsBuilder<IN, BucketID, T>>
+			implements Serializable {
+
+		private static final long serialVersionUID = 1L;
+
+		public static final long DEFAULT_BUCKET_CHECK_INTERVAL = 60L * 1000L;
+
+		@SuppressWarnings("unchecked")
+		protected T self() {
+			return (T) this;
+		}
+
+		@Internal
+		protected abstract FileWriter<IN, BucketID> createWriter(final InitContext context) throws IOException;
+
+		@Internal
+		protected abstract FileCommitter createCommitter() throws IOException;
+
+		@Internal
+		protected abstract SimpleVersionedSerializer<FileWriterBucketState<BucketID>> getWriterStateSerializer() throws IOException;
+
+		@Internal
+		protected abstract SimpleVersionedSerializer<FileSinkCommittable> getCommittableSerializer() throws IOException;
+	}
+
+	/**
+	 * A builder for configuring the sink for row-wise encoding formats.
+	 */
+	public static class RowFormatBuilder<IN, BucketID, T extends RowFormatBuilder<IN, BucketID, T>>
+			extends BucketsBuilder<IN, BucketID, T> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final Path basePath;
+
+		private final Encoder<IN> encoder;
+
+		private BucketAssigner<IN, BucketID> bucketAssigner;
+
+		private RollingPolicy<IN, BucketID> rollingPolicy;
+
+		private FileWriterBucketFactory<IN, BucketID> bucketFactory;
+
+		private OutputFileConfig outputFileConfig;
+
+		protected RowFormatBuilder(
+				Path basePath,
+				Encoder<IN> encoder,
+				BucketAssigner<IN, BucketID> bucketAssigner) {
+			this(
+					basePath,
+					encoder,
+					bucketAssigner,
+					DefaultRollingPolicy.builder().build(),
+					new DefaultFileWriterBucketFactory<>(),
+					OutputFileConfig.builder().build());
+		}
+
+		protected RowFormatBuilder(
+				Path basePath,
+				Encoder<IN> encoder,
+				BucketAssigner<IN, BucketID> assigner,
+				RollingPolicy<IN, BucketID> policy,
+				FileWriterBucketFactory<IN, BucketID> bucketFactory,
+				OutputFileConfig outputFileConfig) {
+			this.basePath = checkNotNull(basePath);
+			this.encoder = checkNotNull(encoder);
+			this.bucketAssigner = checkNotNull(assigner);
+			this.rollingPolicy = checkNotNull(policy);
+			this.bucketFactory = checkNotNull(bucketFactory);
+			this.outputFileConfig = checkNotNull(outputFileConfig);
+		}
+
+		public T withBucketAssigner(final BucketAssigner<IN, BucketID> assigner) {
+			this.bucketAssigner = checkNotNull(assigner);
+			return self();
+		}
+
+		public T withRollingPolicy(final RollingPolicy<IN, BucketID> policy) {
+			this.rollingPolicy = checkNotNull(policy);
+			return self();
+		}
+
+		public T withOutputFileConfig(final OutputFileConfig outputFileConfig) {
+			this.outputFileConfig = outputFileConfig;
+			return self();
+		}
+
+		public RowFormatBuilder<IN, BucketID, ? extends RowFormatBuilder<IN, BucketID, ?>> withNewBucketAssignerAndPolicy(
+				BucketAssigner<IN, BucketID> assigner,
+				RollingPolicy<IN, BucketID> policy) {
+			Preconditions.checkState(
+					bucketFactory.getClass() == DefaultFileWriterBucketFactory.class,
+					"newBuilderWithBucketAssignerAndPolicy() cannot be called "
+							+ "after specifying a customized bucket factory");
+			return new RowFormatBuilder<>(
+					basePath,
+					encoder,
+					checkNotNull(assigner),
+					checkNotNull(policy),
+					bucketFactory,
+					outputFileConfig);
+		}
+
+		@VisibleForTesting
+		T withBucketFactory(final FileWriterBucketFactory<IN, BucketID> factory) {
+			this.bucketFactory = Preconditions.checkNotNull(factory);
+			return self();
+		}
+
+		/** Creates the actual sink. */
+		public FileSink<IN, BucketID> build() {
+			return new FileSink<>(this);
+		}
+
+		@Override
+		public FileWriter<IN, BucketID> createWriter(InitContext context) throws IOException {
+			return new FileWriter<>(
+					basePath,
+					bucketAssigner,
+					bucketFactory,
+					createBucketWriter(),
+					rollingPolicy,
+					outputFileConfig);
+		}
+
+		@Override
+		public FileCommitter createCommitter() throws IOException {
+			return new FileCommitter(createBucketWriter());
+		}
+
+		@Override
+		public SimpleVersionedSerializer<FileWriterBucketState<BucketID>> getWriterStateSerializer() throws IOException {
+			return new FileWriterBucketStateSerializer<>(
+					createBucketWriter()
+							.getProperties()
+							.getInProgressFileRecoverableSerializer(),
+					bucketAssigner.getSerializer());
+		}
+
+		@Override
+		public SimpleVersionedSerializer<FileSinkCommittable> getCommittableSerializer() throws IOException {
+			BucketWriter<IN, BucketID> bucketWriter = createBucketWriter();
+
+			return new FileSinkCommittableSerializer(
+					bucketWriter.getProperties().getPendingFileRecoverableSerializer(),
+					bucketWriter.getProperties().getInProgressFileRecoverableSerializer());
+		}
+
+		private BucketWriter<IN, BucketID> createBucketWriter() throws IOException {
+			return new RowWiseBucketWriter<>(
+					FileSystem.get(basePath.toUri()).createRecoverableWriter(),
+					encoder);
+		}
+	}
+
+	/**
+	 * Builder for the vanilla {@link FileSink} using a row format.
+	 */
+	public static final class DefaultRowFormatBuilder<IN> extends RowFormatBuilder<IN, String, DefaultRowFormatBuilder<IN>> {
+		private static final long serialVersionUID = -8503344257202146718L;
+
+		private DefaultRowFormatBuilder(
+				Path basePath,
+				Encoder<IN> encoder,
+				BucketAssigner<IN, String> bucketAssigner) {
+			super(basePath, encoder, bucketAssigner);
+		}
+	}
+
+	/**
+	 * A builder for configuring the sink for bulk-encoding formats, e.g. Parquet/ORC.
+	 */
+	@PublicEvolving
+	public static class BulkFormatBuilder<IN, BucketID, T extends BulkFormatBuilder<IN, BucketID, T>>
+			extends BucketsBuilder<IN, BucketID, T> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final Path basePath;
+
+		private BulkWriter.Factory<IN> writerFactory;
+
+		private BucketAssigner<IN, BucketID> bucketAssigner;
+
+		private CheckpointRollingPolicy<IN, BucketID> rollingPolicy;
+
+		private FileWriterBucketFactory<IN, BucketID> bucketFactory;
+
+		private OutputFileConfig outputFileConfig;
+
+		protected BulkFormatBuilder(
+				Path basePath,
+				BulkWriter.Factory<IN> writerFactory,
+				BucketAssigner<IN, BucketID> assigner) {
+			this(
+					basePath,
+					writerFactory,
+					assigner,
+					OnCheckpointRollingPolicy.build(),
+					new DefaultFileWriterBucketFactory<>(),
+					OutputFileConfig.builder().build());
+		}
+
+		protected BulkFormatBuilder(
+				Path basePath,
+				BulkWriter.Factory<IN> writerFactory,
+				BucketAssigner<IN, BucketID> assigner,
+				CheckpointRollingPolicy<IN, BucketID> policy,
+				FileWriterBucketFactory<IN, BucketID> bucketFactory,
+				OutputFileConfig outputFileConfig) {
+			this.basePath = Preconditions.checkNotNull(basePath);
+			this.writerFactory = writerFactory;
+			this.bucketAssigner = Preconditions.checkNotNull(assigner);
+			this.rollingPolicy = Preconditions.checkNotNull(policy);
+			this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
+			this.outputFileConfig = Preconditions.checkNotNull(outputFileConfig);
+		}
+
+		public T withBucketAssigner(BucketAssigner<IN, BucketID> assigner) {
+			this.bucketAssigner = Preconditions.checkNotNull(assigner);
+			return self();
+		}
+
+		public T withRollingPolicy(CheckpointRollingPolicy<IN, BucketID> rollingPolicy) {
+			this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy);
+			return self();
+		}
+
+		@VisibleForTesting
+		T withBucketFactory(final FileWriterBucketFactory<IN, BucketID> factory) {
+			this.bucketFactory = Preconditions.checkNotNull(factory);
+			return self();
+		}
+
+		public T withOutputFileConfig(final OutputFileConfig outputFileConfig) {
+			this.outputFileConfig = outputFileConfig;
+			return self();
+		}
+
+		public BulkFormatBuilder<IN, BucketID, ? extends BulkFormatBuilder<IN, BucketID, ?>> withNewBucketAssigner(
+				BucketAssigner<IN, BucketID> assigner) {
+			Preconditions.checkState(
+					bucketFactory.getClass() == DefaultFileWriterBucketFactory.class,
+					"newBuilderWithBucketAssigner() cannot be called "
+							+ "after specifying a customized bucket factory");
+			return new BulkFormatBuilder<>(
+					basePath,
+					writerFactory,
+					Preconditions.checkNotNull(assigner),
+					rollingPolicy,
+					bucketFactory,
+					outputFileConfig);
+		}
+
+		/** Creates the actual sink. */
+		public FileSink<IN, BucketID> build() {
+			return new FileSink<>(this);
+		}
+
+		@Override
+		public FileWriter<IN, BucketID> createWriter(InitContext context) throws IOException {
+			return new FileWriter<>(
+					basePath,
+					bucketAssigner,
+					bucketFactory,
+					createBucketWriter(),
+					rollingPolicy,
+					outputFileConfig);
+		}
+
+		@Override
+		public FileCommitter createCommitter() throws IOException {

Review comment:
       This can become `package-private`.

##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.Writer;
+import org.apache.flink.connector.file.sink.committer.FileCommitter;
+import org.apache.flink.connector.file.sink.writer.DefaultFileWriterBucketFactory;
+import org.apache.flink.connector.file.sink.writer.FileWriter;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketFactory;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketState;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketStateSerializer;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A unified sink for both streaming and blocking mode, based on the new Sink API.
+ */
+@Experimental
+public class FileSink<IN, BucketID>
+		implements Sink<IN, FileSinkCommittable, FileWriterBucketState<BucketID>, Void> {
+
+	private final BucketsBuilder<IN, BucketID, ? extends BucketsBuilder<IN, BucketID, ?>> bucketsBuilder;
+
+	private FileSink(BucketsBuilder<IN, BucketID, ? extends BucketsBuilder<IN, BucketID, ?>> bucketsBuilder) {
+		this.bucketsBuilder = checkNotNull(bucketsBuilder);
+	}
+
+	@Override
+	public Writer<IN, FileSinkCommittable, FileWriterBucketState<BucketID>> createWriter(
+			InitContext context,
+			List<FileWriterBucketState<BucketID>> states) throws IOException {
+		FileWriter<IN, BucketID> writer = bucketsBuilder.createWriter(context);
+		writer.initializeState(states);
+		return writer;
+	}
+
+	@Override
+	public Optional<SimpleVersionedSerializer<FileWriterBucketState<BucketID>>> getWriterStateSerializer()
+			throws IOException {
+		return Optional.of(bucketsBuilder.getWriterStateSerializer());
+	}
+
+	@Override
+	public Optional<Committer<FileSinkCommittable>> createCommitter() throws IOException {
+		return Optional.of(bucketsBuilder.createCommitter());
+	}
+
+	@Override
+	public Optional<SimpleVersionedSerializer<FileSinkCommittable>> getCommittableSerializer()
+			throws IOException {
+		return Optional.of(bucketsBuilder.getCommittableSerializer());
+	}
+
+	@Override
+	public Optional<GlobalCommitter<FileSinkCommittable, Void>> createGlobalCommitter() {
+		return Optional.empty();
+	}
+
+	@Override
+	public Optional<SimpleVersionedSerializer<Void>> getGlobalCommittableSerializer() {
+		return Optional.empty();
+	}
+
+	public static <IN> DefaultRowFormatBuilder<IN> forRowFormat(
+			final Path basePath, final Encoder<IN> encoder) {
+		return new DefaultRowFormatBuilder<>(basePath, encoder, new DateTimeBucketAssigner<>());
+	}
+
+	/**
+	 * The base abstract class for the {@link RowFormatBuilder} and {@link BulkFormatBuilder}.
+	 */
+	@Internal
+	private abstract static class BucketsBuilder<IN, BucketID, T extends BucketsBuilder<IN, BucketID, T>>
+			implements Serializable {
+
+		private static final long serialVersionUID = 1L;
+
+		public static final long DEFAULT_BUCKET_CHECK_INTERVAL = 60L * 1000L;
+
+		@SuppressWarnings("unchecked")
+		protected T self() {
+			return (T) this;
+		}
+
+		@Internal
+		protected abstract FileWriter<IN, BucketID> createWriter(final InitContext context) throws IOException;
+
+		@Internal
+		protected abstract FileCommitter createCommitter() throws IOException;
+
+		@Internal
+		protected abstract SimpleVersionedSerializer<FileWriterBucketState<BucketID>> getWriterStateSerializer() throws IOException;
+
+		@Internal
+		protected abstract SimpleVersionedSerializer<FileSinkCommittable> getCommittableSerializer() throws IOException;
+	}
+
+	/**
+	 * A builder for configuring the sink for row-wise encoding formats.
+	 */
+	public static class RowFormatBuilder<IN, BucketID, T extends RowFormatBuilder<IN, BucketID, T>>
+			extends BucketsBuilder<IN, BucketID, T> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final Path basePath;
+
+		private final Encoder<IN> encoder;
+
+		private BucketAssigner<IN, BucketID> bucketAssigner;
+
+		private RollingPolicy<IN, BucketID> rollingPolicy;
+
+		private FileWriterBucketFactory<IN, BucketID> bucketFactory;
+
+		private OutputFileConfig outputFileConfig;
+
+		protected RowFormatBuilder(
+				Path basePath,
+				Encoder<IN> encoder,
+				BucketAssigner<IN, BucketID> bucketAssigner) {
+			this(
+					basePath,
+					encoder,
+					bucketAssigner,
+					DefaultRollingPolicy.builder().build(),
+					new DefaultFileWriterBucketFactory<>(),
+					OutputFileConfig.builder().build());
+		}
+
+		protected RowFormatBuilder(
+				Path basePath,
+				Encoder<IN> encoder,
+				BucketAssigner<IN, BucketID> assigner,
+				RollingPolicy<IN, BucketID> policy,
+				FileWriterBucketFactory<IN, BucketID> bucketFactory,
+				OutputFileConfig outputFileConfig) {
+			this.basePath = checkNotNull(basePath);
+			this.encoder = checkNotNull(encoder);
+			this.bucketAssigner = checkNotNull(assigner);
+			this.rollingPolicy = checkNotNull(policy);
+			this.bucketFactory = checkNotNull(bucketFactory);
+			this.outputFileConfig = checkNotNull(outputFileConfig);
+		}
+
+		public T withBucketAssigner(final BucketAssigner<IN, BucketID> assigner) {
+			this.bucketAssigner = checkNotNull(assigner);
+			return self();
+		}
+
+		public T withRollingPolicy(final RollingPolicy<IN, BucketID> policy) {
+			this.rollingPolicy = checkNotNull(policy);
+			return self();
+		}
+
+		public T withOutputFileConfig(final OutputFileConfig outputFileConfig) {
+			this.outputFileConfig = outputFileConfig;
+			return self();
+		}
+
+		public RowFormatBuilder<IN, BucketID, ? extends RowFormatBuilder<IN, BucketID, ?>> withNewBucketAssignerAndPolicy(
+				BucketAssigner<IN, BucketID> assigner,
+				RollingPolicy<IN, BucketID> policy) {
+			Preconditions.checkState(
+					bucketFactory.getClass() == DefaultFileWriterBucketFactory.class,
+					"newBuilderWithBucketAssignerAndPolicy() cannot be called "
+							+ "after specifying a customized bucket factory");
+			return new RowFormatBuilder<>(
+					basePath,
+					encoder,
+					checkNotNull(assigner),
+					checkNotNull(policy),
+					bucketFactory,
+					outputFileConfig);
+		}
+
+		@VisibleForTesting
+		T withBucketFactory(final FileWriterBucketFactory<IN, BucketID> factory) {
+			this.bucketFactory = Preconditions.checkNotNull(factory);
+			return self();
+		}
+
+		/** Creates the actual sink. */
+		public FileSink<IN, BucketID> build() {
+			return new FileSink<>(this);
+		}
+
+		@Override
+		public FileWriter<IN, BucketID> createWriter(InitContext context) throws IOException {
+			return new FileWriter<>(
+					basePath,
+					bucketAssigner,
+					bucketFactory,
+					createBucketWriter(),
+					rollingPolicy,
+					outputFileConfig);
+		}
+
+		@Override
+		public FileCommitter createCommitter() throws IOException {
+			return new FileCommitter(createBucketWriter());
+		}
+
+		@Override
+		public SimpleVersionedSerializer<FileWriterBucketState<BucketID>> getWriterStateSerializer() throws IOException {
+			return new FileWriterBucketStateSerializer<>(
+					createBucketWriter()
+							.getProperties()
+							.getInProgressFileRecoverableSerializer(),
+					bucketAssigner.getSerializer());
+		}
+
+		@Override
+		public SimpleVersionedSerializer<FileSinkCommittable> getCommittableSerializer() throws IOException {
+			BucketWriter<IN, BucketID> bucketWriter = createBucketWriter();
+
+			return new FileSinkCommittableSerializer(
+					bucketWriter.getProperties().getPendingFileRecoverableSerializer(),
+					bucketWriter.getProperties().getInProgressFileRecoverableSerializer());
+		}
+
+		private BucketWriter<IN, BucketID> createBucketWriter() throws IOException {
+			return new RowWiseBucketWriter<>(
+					FileSystem.get(basePath.toUri()).createRecoverableWriter(),
+					encoder);
+		}
+	}
+
+	/**
+	 * Builder for the vanilla {@link FileSink} using a row format.
+	 */
+	public static final class DefaultRowFormatBuilder<IN> extends RowFormatBuilder<IN, String, DefaultRowFormatBuilder<IN>> {
+		private static final long serialVersionUID = -8503344257202146718L;
+
+		private DefaultRowFormatBuilder(
+				Path basePath,
+				Encoder<IN> encoder,
+				BucketAssigner<IN, String> bucketAssigner) {
+			super(basePath, encoder, bucketAssigner);
+		}
+	}
+
+	/**
+	 * A builder for configuring the sink for bulk-encoding formats, e.g. Parquet/ORC.
+	 */
+	@PublicEvolving
+	public static class BulkFormatBuilder<IN, BucketID, T extends BulkFormatBuilder<IN, BucketID, T>>
+			extends BucketsBuilder<IN, BucketID, T> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final Path basePath;
+
+		private BulkWriter.Factory<IN> writerFactory;
+
+		private BucketAssigner<IN, BucketID> bucketAssigner;
+
+		private CheckpointRollingPolicy<IN, BucketID> rollingPolicy;
+
+		private FileWriterBucketFactory<IN, BucketID> bucketFactory;
+
+		private OutputFileConfig outputFileConfig;
+
+		protected BulkFormatBuilder(
+				Path basePath,
+				BulkWriter.Factory<IN> writerFactory,
+				BucketAssigner<IN, BucketID> assigner) {
+			this(
+					basePath,
+					writerFactory,
+					assigner,
+					OnCheckpointRollingPolicy.build(),
+					new DefaultFileWriterBucketFactory<>(),
+					OutputFileConfig.builder().build());
+		}
+
+		protected BulkFormatBuilder(
+				Path basePath,
+				BulkWriter.Factory<IN> writerFactory,
+				BucketAssigner<IN, BucketID> assigner,
+				CheckpointRollingPolicy<IN, BucketID> policy,
+				FileWriterBucketFactory<IN, BucketID> bucketFactory,
+				OutputFileConfig outputFileConfig) {
+			this.basePath = Preconditions.checkNotNull(basePath);
+			this.writerFactory = writerFactory;
+			this.bucketAssigner = Preconditions.checkNotNull(assigner);
+			this.rollingPolicy = Preconditions.checkNotNull(policy);
+			this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
+			this.outputFileConfig = Preconditions.checkNotNull(outputFileConfig);
+		}
+
+		public T withBucketAssigner(BucketAssigner<IN, BucketID> assigner) {
+			this.bucketAssigner = Preconditions.checkNotNull(assigner);
+			return self();
+		}
+
+		public T withRollingPolicy(CheckpointRollingPolicy<IN, BucketID> rollingPolicy) {
+			this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy);
+			return self();
+		}
+
+		@VisibleForTesting
+		T withBucketFactory(final FileWriterBucketFactory<IN, BucketID> factory) {
+			this.bucketFactory = Preconditions.checkNotNull(factory);
+			return self();
+		}
+
+		public T withOutputFileConfig(final OutputFileConfig outputFileConfig) {
+			this.outputFileConfig = outputFileConfig;
+			return self();
+		}
+
+		public BulkFormatBuilder<IN, BucketID, ? extends BulkFormatBuilder<IN, BucketID, ?>> withNewBucketAssigner(
+				BucketAssigner<IN, BucketID> assigner) {
+			Preconditions.checkState(
+					bucketFactory.getClass() == DefaultFileWriterBucketFactory.class,
+					"newBuilderWithBucketAssigner() cannot be called "
+							+ "after specifying a customized bucket factory");
+			return new BulkFormatBuilder<>(
+					basePath,
+					writerFactory,
+					Preconditions.checkNotNull(assigner),
+					rollingPolicy,
+					bucketFactory,
+					outputFileConfig);
+		}
+
+		/** Creates the actual sink. */
+		public FileSink<IN, BucketID> build() {
+			return new FileSink<>(this);
+		}
+
+		@Override
+		public FileWriter<IN, BucketID> createWriter(InitContext context) throws IOException {
+			return new FileWriter<>(
+					basePath,
+					bucketAssigner,
+					bucketFactory,
+					createBucketWriter(),
+					rollingPolicy,
+					outputFileConfig);
+		}
+
+		@Override
+		public FileCommitter createCommitter() throws IOException {
+			return new FileCommitter(createBucketWriter());
+		}
+
+		@Override
+		public SimpleVersionedSerializer<FileWriterBucketState<BucketID>> getWriterStateSerializer() throws IOException {

Review comment:
       This can become `package-private`.

##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.Writer;
+import org.apache.flink.connector.file.sink.committer.FileCommitter;
+import org.apache.flink.connector.file.sink.writer.DefaultFileWriterBucketFactory;
+import org.apache.flink.connector.file.sink.writer.FileWriter;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketFactory;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketState;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketStateSerializer;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A unified sink for both streaming and blocking mode, based on the new Sink API.
+ */
+@Experimental
+public class FileSink<IN, BucketID>
+		implements Sink<IN, FileSinkCommittable, FileWriterBucketState<BucketID>, Void> {
+
+	private final BucketsBuilder<IN, BucketID, ? extends BucketsBuilder<IN, BucketID, ?>> bucketsBuilder;
+
+	private FileSink(BucketsBuilder<IN, BucketID, ? extends BucketsBuilder<IN, BucketID, ?>> bucketsBuilder) {
+		this.bucketsBuilder = checkNotNull(bucketsBuilder);
+	}
+
+	@Override
+	public Writer<IN, FileSinkCommittable, FileWriterBucketState<BucketID>> createWriter(
+			InitContext context,
+			List<FileWriterBucketState<BucketID>> states) throws IOException {
+		FileWriter<IN, BucketID> writer = bucketsBuilder.createWriter(context);
+		writer.initializeState(states);
+		return writer;
+	}
+
+	@Override
+	public Optional<SimpleVersionedSerializer<FileWriterBucketState<BucketID>>> getWriterStateSerializer()
+			throws IOException {
+		return Optional.of(bucketsBuilder.getWriterStateSerializer());
+	}
+
+	@Override
+	public Optional<Committer<FileSinkCommittable>> createCommitter() throws IOException {
+		return Optional.of(bucketsBuilder.createCommitter());
+	}
+
+	@Override
+	public Optional<SimpleVersionedSerializer<FileSinkCommittable>> getCommittableSerializer()
+			throws IOException {
+		return Optional.of(bucketsBuilder.getCommittableSerializer());
+	}
+
+	@Override
+	public Optional<GlobalCommitter<FileSinkCommittable, Void>> createGlobalCommitter() {
+		return Optional.empty();
+	}
+
+	@Override
+	public Optional<SimpleVersionedSerializer<Void>> getGlobalCommittableSerializer() {
+		return Optional.empty();
+	}
+
+	public static <IN> DefaultRowFormatBuilder<IN> forRowFormat(
+			final Path basePath, final Encoder<IN> encoder) {
+		return new DefaultRowFormatBuilder<>(basePath, encoder, new DateTimeBucketAssigner<>());
+	}
+
+	/**
+	 * The base abstract class for the {@link RowFormatBuilder} and {@link BulkFormatBuilder}.
+	 */
+	@Internal
+	private abstract static class BucketsBuilder<IN, BucketID, T extends BucketsBuilder<IN, BucketID, T>>
+			implements Serializable {
+
+		private static final long serialVersionUID = 1L;
+
+		public static final long DEFAULT_BUCKET_CHECK_INTERVAL = 60L * 1000L;
+
+		@SuppressWarnings("unchecked")
+		protected T self() {
+			return (T) this;
+		}
+
+		@Internal
+		protected abstract FileWriter<IN, BucketID> createWriter(final InitContext context) throws IOException;
+
+		@Internal
+		protected abstract FileCommitter createCommitter() throws IOException;
+
+		@Internal
+		protected abstract SimpleVersionedSerializer<FileWriterBucketState<BucketID>> getWriterStateSerializer() throws IOException;
+
+		@Internal
+		protected abstract SimpleVersionedSerializer<FileSinkCommittable> getCommittableSerializer() throws IOException;
+	}
+
+	/**
+	 * A builder for configuring the sink for row-wise encoding formats.
+	 */
+	public static class RowFormatBuilder<IN, BucketID, T extends RowFormatBuilder<IN, BucketID, T>>
+			extends BucketsBuilder<IN, BucketID, T> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final Path basePath;
+
+		private final Encoder<IN> encoder;
+
+		private BucketAssigner<IN, BucketID> bucketAssigner;
+
+		private RollingPolicy<IN, BucketID> rollingPolicy;
+
+		private FileWriterBucketFactory<IN, BucketID> bucketFactory;
+
+		private OutputFileConfig outputFileConfig;
+
+		protected RowFormatBuilder(
+				Path basePath,
+				Encoder<IN> encoder,
+				BucketAssigner<IN, BucketID> bucketAssigner) {
+			this(
+					basePath,
+					encoder,
+					bucketAssigner,
+					DefaultRollingPolicy.builder().build(),
+					new DefaultFileWriterBucketFactory<>(),
+					OutputFileConfig.builder().build());
+		}
+
+		protected RowFormatBuilder(
+				Path basePath,
+				Encoder<IN> encoder,
+				BucketAssigner<IN, BucketID> assigner,
+				RollingPolicy<IN, BucketID> policy,
+				FileWriterBucketFactory<IN, BucketID> bucketFactory,
+				OutputFileConfig outputFileConfig) {
+			this.basePath = checkNotNull(basePath);
+			this.encoder = checkNotNull(encoder);
+			this.bucketAssigner = checkNotNull(assigner);
+			this.rollingPolicy = checkNotNull(policy);
+			this.bucketFactory = checkNotNull(bucketFactory);
+			this.outputFileConfig = checkNotNull(outputFileConfig);
+		}
+
+		public T withBucketAssigner(final BucketAssigner<IN, BucketID> assigner) {
+			this.bucketAssigner = checkNotNull(assigner);
+			return self();
+		}
+
+		public T withRollingPolicy(final RollingPolicy<IN, BucketID> policy) {
+			this.rollingPolicy = checkNotNull(policy);
+			return self();
+		}
+
+		public T withOutputFileConfig(final OutputFileConfig outputFileConfig) {
+			this.outputFileConfig = outputFileConfig;
+			return self();
+		}
+
+		public RowFormatBuilder<IN, BucketID, ? extends RowFormatBuilder<IN, BucketID, ?>> withNewBucketAssignerAndPolicy(
+				BucketAssigner<IN, BucketID> assigner,
+				RollingPolicy<IN, BucketID> policy) {
+			Preconditions.checkState(
+					bucketFactory.getClass() == DefaultFileWriterBucketFactory.class,
+					"newBuilderWithBucketAssignerAndPolicy() cannot be called "
+							+ "after specifying a customized bucket factory");
+			return new RowFormatBuilder<>(
+					basePath,
+					encoder,
+					checkNotNull(assigner),
+					checkNotNull(policy),
+					bucketFactory,
+					outputFileConfig);
+		}
+
+		@VisibleForTesting
+		T withBucketFactory(final FileWriterBucketFactory<IN, BucketID> factory) {
+			this.bucketFactory = Preconditions.checkNotNull(factory);
+			return self();
+		}
+
+		/** Creates the actual sink. */
+		public FileSink<IN, BucketID> build() {
+			return new FileSink<>(this);
+		}
+
+		@Override
+		public FileWriter<IN, BucketID> createWriter(InitContext context) throws IOException {
+			return new FileWriter<>(
+					basePath,
+					bucketAssigner,
+					bucketFactory,
+					createBucketWriter(),
+					rollingPolicy,
+					outputFileConfig);
+		}
+
+		@Override
+		public FileCommitter createCommitter() throws IOException {
+			return new FileCommitter(createBucketWriter());
+		}
+
+		@Override
+		public SimpleVersionedSerializer<FileWriterBucketState<BucketID>> getWriterStateSerializer() throws IOException {
+			return new FileWriterBucketStateSerializer<>(
+					createBucketWriter()
+							.getProperties()
+							.getInProgressFileRecoverableSerializer(),
+					bucketAssigner.getSerializer());
+		}
+
+		@Override
+		public SimpleVersionedSerializer<FileSinkCommittable> getCommittableSerializer() throws IOException {
+			BucketWriter<IN, BucketID> bucketWriter = createBucketWriter();
+
+			return new FileSinkCommittableSerializer(
+					bucketWriter.getProperties().getPendingFileRecoverableSerializer(),
+					bucketWriter.getProperties().getInProgressFileRecoverableSerializer());
+		}
+
+		private BucketWriter<IN, BucketID> createBucketWriter() throws IOException {
+			return new RowWiseBucketWriter<>(
+					FileSystem.get(basePath.toUri()).createRecoverableWriter(),
+					encoder);
+		}
+	}
+
+	/**
+	 * Builder for the vanilla {@link FileSink} using a row format.
+	 */
+	public static final class DefaultRowFormatBuilder<IN> extends RowFormatBuilder<IN, String, DefaultRowFormatBuilder<IN>> {
+		private static final long serialVersionUID = -8503344257202146718L;
+
+		private DefaultRowFormatBuilder(
+				Path basePath,
+				Encoder<IN> encoder,
+				BucketAssigner<IN, String> bucketAssigner) {
+			super(basePath, encoder, bucketAssigner);
+		}
+	}
+
+	/**
+	 * A builder for configuring the sink for bulk-encoding formats, e.g. Parquet/ORC.
+	 */
+	@PublicEvolving
+	public static class BulkFormatBuilder<IN, BucketID, T extends BulkFormatBuilder<IN, BucketID, T>>
+			extends BucketsBuilder<IN, BucketID, T> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final Path basePath;
+
+		private BulkWriter.Factory<IN> writerFactory;
+
+		private BucketAssigner<IN, BucketID> bucketAssigner;
+
+		private CheckpointRollingPolicy<IN, BucketID> rollingPolicy;
+
+		private FileWriterBucketFactory<IN, BucketID> bucketFactory;
+
+		private OutputFileConfig outputFileConfig;
+
+		protected BulkFormatBuilder(
+				Path basePath,
+				BulkWriter.Factory<IN> writerFactory,
+				BucketAssigner<IN, BucketID> assigner) {
+			this(
+					basePath,
+					writerFactory,
+					assigner,
+					OnCheckpointRollingPolicy.build(),
+					new DefaultFileWriterBucketFactory<>(),
+					OutputFileConfig.builder().build());
+		}
+
+		protected BulkFormatBuilder(
+				Path basePath,
+				BulkWriter.Factory<IN> writerFactory,
+				BucketAssigner<IN, BucketID> assigner,
+				CheckpointRollingPolicy<IN, BucketID> policy,
+				FileWriterBucketFactory<IN, BucketID> bucketFactory,
+				OutputFileConfig outputFileConfig) {
+			this.basePath = Preconditions.checkNotNull(basePath);
+			this.writerFactory = writerFactory;
+			this.bucketAssigner = Preconditions.checkNotNull(assigner);
+			this.rollingPolicy = Preconditions.checkNotNull(policy);
+			this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
+			this.outputFileConfig = Preconditions.checkNotNull(outputFileConfig);
+		}
+
+		public T withBucketAssigner(BucketAssigner<IN, BucketID> assigner) {
+			this.bucketAssigner = Preconditions.checkNotNull(assigner);
+			return self();
+		}
+
+		public T withRollingPolicy(CheckpointRollingPolicy<IN, BucketID> rollingPolicy) {
+			this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy);
+			return self();
+		}
+
+		@VisibleForTesting
+		T withBucketFactory(final FileWriterBucketFactory<IN, BucketID> factory) {
+			this.bucketFactory = Preconditions.checkNotNull(factory);
+			return self();
+		}
+
+		public T withOutputFileConfig(final OutputFileConfig outputFileConfig) {
+			this.outputFileConfig = outputFileConfig;
+			return self();
+		}
+
+		public BulkFormatBuilder<IN, BucketID, ? extends BulkFormatBuilder<IN, BucketID, ?>> withNewBucketAssigner(
+				BucketAssigner<IN, BucketID> assigner) {
+			Preconditions.checkState(
+					bucketFactory.getClass() == DefaultFileWriterBucketFactory.class,
+					"newBuilderWithBucketAssigner() cannot be called "
+							+ "after specifying a customized bucket factory");
+			return new BulkFormatBuilder<>(
+					basePath,
+					writerFactory,
+					Preconditions.checkNotNull(assigner),
+					rollingPolicy,
+					bucketFactory,
+					outputFileConfig);
+		}
+
+		/** Creates the actual sink. */
+		public FileSink<IN, BucketID> build() {
+			return new FileSink<>(this);
+		}
+
+		@Override
+		public FileWriter<IN, BucketID> createWriter(InitContext context) throws IOException {
+			return new FileWriter<>(
+					basePath,
+					bucketAssigner,
+					bucketFactory,
+					createBucketWriter(),
+					rollingPolicy,
+					outputFileConfig);
+		}
+
+		@Override
+		public FileCommitter createCommitter() throws IOException {
+			return new FileCommitter(createBucketWriter());
+		}
+
+		@Override
+		public SimpleVersionedSerializer<FileWriterBucketState<BucketID>> getWriterStateSerializer() throws IOException {
+			return new FileWriterBucketStateSerializer<>(
+					createBucketWriter()
+							.getProperties()
+							.getInProgressFileRecoverableSerializer(),
+					bucketAssigner.getSerializer());
+		}
+
+		@Override
+		public SimpleVersionedSerializer<FileSinkCommittable> getCommittableSerializer() throws IOException {

Review comment:
       This can become `package-private`.

##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.Writer;
+import org.apache.flink.connector.file.sink.committer.FileCommitter;
+import org.apache.flink.connector.file.sink.writer.DefaultFileWriterBucketFactory;
+import org.apache.flink.connector.file.sink.writer.FileWriter;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketFactory;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketState;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketStateSerializer;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A unified sink for both streaming and blocking mode, based on the new Sink API.
+ */
+@Experimental
+public class FileSink<IN, BucketID>
+		implements Sink<IN, FileSinkCommittable, FileWriterBucketState<BucketID>, Void> {
+
+	private final BucketsBuilder<IN, BucketID, ? extends BucketsBuilder<IN, BucketID, ?>> bucketsBuilder;
+
+	private FileSink(BucketsBuilder<IN, BucketID, ? extends BucketsBuilder<IN, BucketID, ?>> bucketsBuilder) {
+		this.bucketsBuilder = checkNotNull(bucketsBuilder);
+	}
+
+	@Override
+	public Writer<IN, FileSinkCommittable, FileWriterBucketState<BucketID>> createWriter(
+			InitContext context,
+			List<FileWriterBucketState<BucketID>> states) throws IOException {
+		FileWriter<IN, BucketID> writer = bucketsBuilder.createWriter(context);
+		writer.initializeState(states);
+		return writer;
+	}
+
+	@Override
+	public Optional<SimpleVersionedSerializer<FileWriterBucketState<BucketID>>> getWriterStateSerializer()
+			throws IOException {
+		return Optional.of(bucketsBuilder.getWriterStateSerializer());
+	}
+
+	@Override
+	public Optional<Committer<FileSinkCommittable>> createCommitter() throws IOException {
+		return Optional.of(bucketsBuilder.createCommitter());
+	}
+
+	@Override
+	public Optional<SimpleVersionedSerializer<FileSinkCommittable>> getCommittableSerializer()
+			throws IOException {
+		return Optional.of(bucketsBuilder.getCommittableSerializer());
+	}
+
+	@Override
+	public Optional<GlobalCommitter<FileSinkCommittable, Void>> createGlobalCommitter() {
+		return Optional.empty();
+	}
+
+	@Override
+	public Optional<SimpleVersionedSerializer<Void>> getGlobalCommittableSerializer() {
+		return Optional.empty();
+	}
+
+	public static <IN> DefaultRowFormatBuilder<IN> forRowFormat(
+			final Path basePath, final Encoder<IN> encoder) {
+		return new DefaultRowFormatBuilder<>(basePath, encoder, new DateTimeBucketAssigner<>());
+	}
+
+	/**
+	 * The base abstract class for the {@link RowFormatBuilder} and {@link BulkFormatBuilder}.
+	 */
+	@Internal
+	private abstract static class BucketsBuilder<IN, BucketID, T extends BucketsBuilder<IN, BucketID, T>>
+			implements Serializable {
+
+		private static final long serialVersionUID = 1L;
+
+		public static final long DEFAULT_BUCKET_CHECK_INTERVAL = 60L * 1000L;
+
+		@SuppressWarnings("unchecked")
+		protected T self() {
+			return (T) this;
+		}
+
+		@Internal
+		protected abstract FileWriter<IN, BucketID> createWriter(final InitContext context) throws IOException;
+
+		@Internal
+		protected abstract FileCommitter createCommitter() throws IOException;
+
+		@Internal
+		protected abstract SimpleVersionedSerializer<FileWriterBucketState<BucketID>> getWriterStateSerializer() throws IOException;
+
+		@Internal
+		protected abstract SimpleVersionedSerializer<FileSinkCommittable> getCommittableSerializer() throws IOException;
+	}
+
+	/**
+	 * A builder for configuring the sink for row-wise encoding formats.
+	 */
+	public static class RowFormatBuilder<IN, BucketID, T extends RowFormatBuilder<IN, BucketID, T>>
+			extends BucketsBuilder<IN, BucketID, T> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final Path basePath;
+
+		private final Encoder<IN> encoder;
+
+		private BucketAssigner<IN, BucketID> bucketAssigner;
+
+		private RollingPolicy<IN, BucketID> rollingPolicy;
+
+		private FileWriterBucketFactory<IN, BucketID> bucketFactory;
+
+		private OutputFileConfig outputFileConfig;
+
+		protected RowFormatBuilder(
+				Path basePath,
+				Encoder<IN> encoder,
+				BucketAssigner<IN, BucketID> bucketAssigner) {
+			this(
+					basePath,
+					encoder,
+					bucketAssigner,
+					DefaultRollingPolicy.builder().build(),
+					new DefaultFileWriterBucketFactory<>(),
+					OutputFileConfig.builder().build());
+		}
+
+		protected RowFormatBuilder(
+				Path basePath,
+				Encoder<IN> encoder,
+				BucketAssigner<IN, BucketID> assigner,
+				RollingPolicy<IN, BucketID> policy,
+				FileWriterBucketFactory<IN, BucketID> bucketFactory,
+				OutputFileConfig outputFileConfig) {
+			this.basePath = checkNotNull(basePath);
+			this.encoder = checkNotNull(encoder);
+			this.bucketAssigner = checkNotNull(assigner);
+			this.rollingPolicy = checkNotNull(policy);
+			this.bucketFactory = checkNotNull(bucketFactory);
+			this.outputFileConfig = checkNotNull(outputFileConfig);
+		}
+
+		public T withBucketAssigner(final BucketAssigner<IN, BucketID> assigner) {
+			this.bucketAssigner = checkNotNull(assigner);
+			return self();
+		}
+
+		public T withRollingPolicy(final RollingPolicy<IN, BucketID> policy) {
+			this.rollingPolicy = checkNotNull(policy);
+			return self();
+		}
+
+		public T withOutputFileConfig(final OutputFileConfig outputFileConfig) {
+			this.outputFileConfig = outputFileConfig;
+			return self();
+		}
+
+		public RowFormatBuilder<IN, BucketID, ? extends RowFormatBuilder<IN, BucketID, ?>> withNewBucketAssignerAndPolicy(
+				BucketAssigner<IN, BucketID> assigner,
+				RollingPolicy<IN, BucketID> policy) {
+			Preconditions.checkState(
+					bucketFactory.getClass() == DefaultFileWriterBucketFactory.class,
+					"newBuilderWithBucketAssignerAndPolicy() cannot be called "
+							+ "after specifying a customized bucket factory");
+			return new RowFormatBuilder<>(
+					basePath,
+					encoder,
+					checkNotNull(assigner),
+					checkNotNull(policy),
+					bucketFactory,
+					outputFileConfig);
+		}
+
+		@VisibleForTesting
+		T withBucketFactory(final FileWriterBucketFactory<IN, BucketID> factory) {
+			this.bucketFactory = Preconditions.checkNotNull(factory);
+			return self();
+		}
+
+		/** Creates the actual sink. */
+		public FileSink<IN, BucketID> build() {
+			return new FileSink<>(this);
+		}
+
+		@Override
+		public FileWriter<IN, BucketID> createWriter(InitContext context) throws IOException {
+			return new FileWriter<>(
+					basePath,
+					bucketAssigner,
+					bucketFactory,
+					createBucketWriter(),
+					rollingPolicy,
+					outputFileConfig);
+		}
+
+		@Override
+		public FileCommitter createCommitter() throws IOException {
+			return new FileCommitter(createBucketWriter());
+		}
+
+		@Override
+		public SimpleVersionedSerializer<FileWriterBucketState<BucketID>> getWriterStateSerializer() throws IOException {
+			return new FileWriterBucketStateSerializer<>(
+					createBucketWriter()
+							.getProperties()
+							.getInProgressFileRecoverableSerializer(),
+					bucketAssigner.getSerializer());
+		}
+
+		@Override
+		public SimpleVersionedSerializer<FileSinkCommittable> getCommittableSerializer() throws IOException {
+			BucketWriter<IN, BucketID> bucketWriter = createBucketWriter();
+
+			return new FileSinkCommittableSerializer(
+					bucketWriter.getProperties().getPendingFileRecoverableSerializer(),
+					bucketWriter.getProperties().getInProgressFileRecoverableSerializer());
+		}
+
+		private BucketWriter<IN, BucketID> createBucketWriter() throws IOException {
+			return new RowWiseBucketWriter<>(
+					FileSystem.get(basePath.toUri()).createRecoverableWriter(),
+					encoder);
+		}
+	}
+
+	/**
+	 * Builder for the vanilla {@link FileSink} using a row format.
+	 */
+	public static final class DefaultRowFormatBuilder<IN> extends RowFormatBuilder<IN, String, DefaultRowFormatBuilder<IN>> {
+		private static final long serialVersionUID = -8503344257202146718L;
+
+		private DefaultRowFormatBuilder(
+				Path basePath,
+				Encoder<IN> encoder,
+				BucketAssigner<IN, String> bucketAssigner) {
+			super(basePath, encoder, bucketAssigner);
+		}
+	}
+
+	/**
+	 * A builder for configuring the sink for bulk-encoding formats, e.g. Parquet/ORC.
+	 */
+	@PublicEvolving
+	public static class BulkFormatBuilder<IN, BucketID, T extends BulkFormatBuilder<IN, BucketID, T>>
+			extends BucketsBuilder<IN, BucketID, T> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final Path basePath;
+
+		private BulkWriter.Factory<IN> writerFactory;
+
+		private BucketAssigner<IN, BucketID> bucketAssigner;
+
+		private CheckpointRollingPolicy<IN, BucketID> rollingPolicy;
+
+		private FileWriterBucketFactory<IN, BucketID> bucketFactory;
+
+		private OutputFileConfig outputFileConfig;
+
+		protected BulkFormatBuilder(
+				Path basePath,
+				BulkWriter.Factory<IN> writerFactory,
+				BucketAssigner<IN, BucketID> assigner) {
+			this(
+					basePath,
+					writerFactory,
+					assigner,
+					OnCheckpointRollingPolicy.build(),
+					new DefaultFileWriterBucketFactory<>(),
+					OutputFileConfig.builder().build());
+		}
+
+		protected BulkFormatBuilder(
+				Path basePath,
+				BulkWriter.Factory<IN> writerFactory,
+				BucketAssigner<IN, BucketID> assigner,
+				CheckpointRollingPolicy<IN, BucketID> policy,
+				FileWriterBucketFactory<IN, BucketID> bucketFactory,
+				OutputFileConfig outputFileConfig) {
+			this.basePath = Preconditions.checkNotNull(basePath);
+			this.writerFactory = writerFactory;
+			this.bucketAssigner = Preconditions.checkNotNull(assigner);
+			this.rollingPolicy = Preconditions.checkNotNull(policy);
+			this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
+			this.outputFileConfig = Preconditions.checkNotNull(outputFileConfig);
+		}
+
+		public T withBucketAssigner(BucketAssigner<IN, BucketID> assigner) {
+			this.bucketAssigner = Preconditions.checkNotNull(assigner);
+			return self();
+		}
+
+		public T withRollingPolicy(CheckpointRollingPolicy<IN, BucketID> rollingPolicy) {
+			this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy);
+			return self();
+		}
+
+		@VisibleForTesting
+		T withBucketFactory(final FileWriterBucketFactory<IN, BucketID> factory) {
+			this.bucketFactory = Preconditions.checkNotNull(factory);
+			return self();
+		}
+
+		public T withOutputFileConfig(final OutputFileConfig outputFileConfig) {
+			this.outputFileConfig = outputFileConfig;
+			return self();
+		}
+
+		public BulkFormatBuilder<IN, BucketID, ? extends BulkFormatBuilder<IN, BucketID, ?>> withNewBucketAssigner(
+				BucketAssigner<IN, BucketID> assigner) {
+			Preconditions.checkState(
+					bucketFactory.getClass() == DefaultFileWriterBucketFactory.class,
+					"newBuilderWithBucketAssigner() cannot be called "
+							+ "after specifying a customized bucket factory");
+			return new BulkFormatBuilder<>(
+					basePath,
+					writerFactory,
+					Preconditions.checkNotNull(assigner),
+					rollingPolicy,
+					bucketFactory,
+					outputFileConfig);
+		}
+
+		/** Creates the actual sink. */
+		public FileSink<IN, BucketID> build() {
+			return new FileSink<>(this);
+		}
+
+		@Override
+		public FileWriter<IN, BucketID> createWriter(InitContext context) throws IOException {

Review comment:
       This can become `package-private`.

##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.writer;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.sink.Writer;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Writer implementation for {@link FileSink}.
+ */
+public class FileWriter<IN, BucketID>
+		implements Writer<IN, FileSinkCommittable, FileWriterBucketState<BucketID>> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FileWriter.class);
+
+	// ------------------------ configuration fields --------------------------
+
+	private final Path basePath;
+
+	private final FileWriterBucketFactory<IN, BucketID> bucketFactory;
+
+	private final BucketAssigner<IN, BucketID> bucketAssigner;
+
+	private final BucketWriter<IN, BucketID> bucketWriter;
+
+	private final RollingPolicy<IN, BucketID> rollingPolicy;
+
+	// --------------------------- runtime fields -----------------------------
+
+	private final BucketerContext bucketerContext;
+
+	private final Map<BucketID, FileWriterBucket<IN, BucketID>> activeBuckets;
+
+	private final OutputFileConfig outputFileConfig;
+
+	// --------------------------- State Related Fields -----------------------------
+
+	private final FileWriterBucketStateSerializer<BucketID> bucketStateSerializer;
+
+	/**
+	 * A constructor creating a new empty bucket manager.
+	 *
+	 * @param basePath The base path for our buckets.
+	 * @param bucketAssigner The {@link BucketAssigner} provided by the user.
+	 * @param bucketFactory The {@link FileWriterBucketFactory} to be used to create buckets.
+	 * @param bucketWriter The {@link BucketWriter} to be used when writing data.
+	 * @param rollingPolicy The {@link RollingPolicy} as specified by the user.
+	 */
+	public FileWriter(
+			final Path basePath,
+			final BucketAssigner<IN, BucketID> bucketAssigner,
+			final FileWriterBucketFactory<IN, BucketID> bucketFactory,
+			final BucketWriter<IN, BucketID> bucketWriter,
+			final RollingPolicy<IN, BucketID> rollingPolicy,
+			final OutputFileConfig outputFileConfig) {
+
+		this.basePath = checkNotNull(basePath);
+		this.bucketAssigner = checkNotNull(bucketAssigner);
+		this.bucketFactory = checkNotNull(bucketFactory);
+		this.bucketWriter = checkNotNull(bucketWriter);
+		this.rollingPolicy = checkNotNull(rollingPolicy);
+
+		this.outputFileConfig = checkNotNull(outputFileConfig);
+
+		this.activeBuckets = new HashMap<>();
+		this.bucketerContext = new BucketerContext();
+
+		this.bucketStateSerializer = new FileWriterBucketStateSerializer<>(
+				bucketWriter.getProperties().getInProgressFileRecoverableSerializer(),
+				bucketAssigner.getSerializer());
+	}
+
+	/**
+	 * Initializes the state after recovery from a failure.
+	 *
+	 * <p>During this process:
+	 * <ol>
+	 *     <li>we set the initial value for part counter to the maximum value used before across all tasks and buckets.
+	 *     This guarantees that we do not overwrite valid data,</li>
+	 *     <li>we commit any pending files for previous checkpoints (previous to the last successful one from which we restore),</li>
+	 *     <li>we resume writing to the previous in-progress file of each bucket, and</li>
+	 *     <li>if we receive multiple states for the same bucket, we merge them.</li>
+	 * </ol>
+	 *
+	 * @param bucketStates the state holding recovered state about active buckets.
+	 *
+	 * @throws Exception if anything goes wrong during retrieving the state or restoring/committing of any
+	 * 		in-progress/pending part files
+	 */
+	public void initializeState(List<FileWriterBucketState<BucketID>> bucketStates) throws IOException {
+		for (FileWriterBucketState<BucketID> state : bucketStates) {
+			BucketID bucketId = state.getBucketId();
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Restoring: {}", state);
+			}
+
+			FileWriterBucket<IN, BucketID> restoredBucket = bucketFactory.restoreBucket(
+					bucketWriter,
+					rollingPolicy,
+					state,
+					outputFileConfig);
+
+			updateActiveBucketId(bucketId, restoredBucket);
+		}
+	}
+
+	private void updateActiveBucketId(
+			BucketID bucketId,
+			FileWriterBucket<IN, BucketID> restoredBucket) throws IOException {
+		final FileWriterBucket<IN, BucketID> bucket = activeBuckets.get(bucketId);
+		if (bucket != null) {
+			bucket.merge(restoredBucket);
+		} else {
+			activeBuckets.put(bucketId, restoredBucket);
+		}
+	}
+
+	@Override
+	public void write(IN element, Context context) throws IOException {
+		// setting the values in the bucketer context
+		bucketerContext.update(
+				context.timestamp(),
+				context.currentWatermark());
+
+		final BucketID bucketId = bucketAssigner.getBucketId(element, bucketerContext);
+		final FileWriterBucket<IN, BucketID> bucket = getOrCreateBucketForBucketId(bucketId);
+		bucket.write(element);
+	}
+
+	@Override
+	public List<FileSinkCommittable> prepareCommit(boolean flush) throws IOException {
+		List<FileSinkCommittable> committables = new ArrayList<>();
+
+		// Every time before we prepare commit, we first check and remove the inactive
+		// buckets. Checking the activeness right before pre-committing avoid re-creating
+		// the bucket every time if the bucket use OnCheckpointingRollingPolicy.
+		Iterator<Map.Entry<BucketID, FileWriterBucket<IN, BucketID>>> activeBucketIt =
+				activeBuckets.entrySet().iterator();
+		while (activeBucketIt.hasNext()) {

Review comment:
       The only problem I can find is memory consumption. Nothing more. You think that this is not an issue?

##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.Writer;
+import org.apache.flink.connector.file.sink.committer.FileCommitter;
+import org.apache.flink.connector.file.sink.writer.DefaultFileWriterBucketFactory;
+import org.apache.flink.connector.file.sink.writer.FileWriter;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketFactory;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketState;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketStateSerializer;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A unified sink for both streaming and blocking mode, based on the new Sink API.
+ */
+@Experimental
+public class FileSink<IN, BucketID>
+		implements Sink<IN, FileSinkCommittable, FileWriterBucketState<BucketID>, Void> {
+
+	private final BucketsBuilder<IN, BucketID, ? extends BucketsBuilder<IN, BucketID, ?>> bucketsBuilder;
+
+	private FileSink(BucketsBuilder<IN, BucketID, ? extends BucketsBuilder<IN, BucketID, ?>> bucketsBuilder) {
+		this.bucketsBuilder = checkNotNull(bucketsBuilder);
+	}
+
+	@Override
+	public Writer<IN, FileSinkCommittable, FileWriterBucketState<BucketID>> createWriter(
+			InitContext context,
+			List<FileWriterBucketState<BucketID>> states) throws IOException {
+		FileWriter<IN, BucketID> writer = bucketsBuilder.createWriter(context);
+		writer.initializeState(states);
+		return writer;
+	}
+
+	@Override
+	public Optional<SimpleVersionedSerializer<FileWriterBucketState<BucketID>>> getWriterStateSerializer()
+			throws IOException {
+		return Optional.of(bucketsBuilder.getWriterStateSerializer());
+	}
+
+	@Override
+	public Optional<Committer<FileSinkCommittable>> createCommitter() throws IOException {
+		return Optional.of(bucketsBuilder.createCommitter());
+	}
+
+	@Override
+	public Optional<SimpleVersionedSerializer<FileSinkCommittable>> getCommittableSerializer()
+			throws IOException {
+		return Optional.of(bucketsBuilder.getCommittableSerializer());
+	}
+
+	@Override
+	public Optional<GlobalCommitter<FileSinkCommittable, Void>> createGlobalCommitter() {
+		return Optional.empty();
+	}
+
+	@Override
+	public Optional<SimpleVersionedSerializer<Void>> getGlobalCommittableSerializer() {
+		return Optional.empty();
+	}
+
+	public static <IN> DefaultRowFormatBuilder<IN> forRowFormat(
+			final Path basePath, final Encoder<IN> encoder) {
+		return new DefaultRowFormatBuilder<>(basePath, encoder, new DateTimeBucketAssigner<>());
+	}
+
+	/**
+	 * The base abstract class for the {@link RowFormatBuilder} and {@link BulkFormatBuilder}.
+	 */
+	@Internal
+	private abstract static class BucketsBuilder<IN, BucketID, T extends BucketsBuilder<IN, BucketID, T>>
+			implements Serializable {
+
+		private static final long serialVersionUID = 1L;
+
+		public static final long DEFAULT_BUCKET_CHECK_INTERVAL = 60L * 1000L;
+
+		@SuppressWarnings("unchecked")
+		protected T self() {
+			return (T) this;
+		}
+
+		@Internal
+		protected abstract FileWriter<IN, BucketID> createWriter(final InitContext context) throws IOException;
+
+		@Internal
+		protected abstract FileCommitter createCommitter() throws IOException;
+
+		@Internal
+		protected abstract SimpleVersionedSerializer<FileWriterBucketState<BucketID>> getWriterStateSerializer() throws IOException;
+
+		@Internal
+		protected abstract SimpleVersionedSerializer<FileSinkCommittable> getCommittableSerializer() throws IOException;
+	}
+
+	/**
+	 * A builder for configuring the sink for row-wise encoding formats.
+	 */
+	public static class RowFormatBuilder<IN, BucketID, T extends RowFormatBuilder<IN, BucketID, T>>
+			extends BucketsBuilder<IN, BucketID, T> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final Path basePath;
+
+		private final Encoder<IN> encoder;
+
+		private BucketAssigner<IN, BucketID> bucketAssigner;
+
+		private RollingPolicy<IN, BucketID> rollingPolicy;
+
+		private FileWriterBucketFactory<IN, BucketID> bucketFactory;
+
+		private OutputFileConfig outputFileConfig;
+
+		protected RowFormatBuilder(
+				Path basePath,
+				Encoder<IN> encoder,
+				BucketAssigner<IN, BucketID> bucketAssigner) {
+			this(
+					basePath,
+					encoder,
+					bucketAssigner,
+					DefaultRollingPolicy.builder().build(),
+					new DefaultFileWriterBucketFactory<>(),
+					OutputFileConfig.builder().build());
+		}
+
+		protected RowFormatBuilder(
+				Path basePath,
+				Encoder<IN> encoder,
+				BucketAssigner<IN, BucketID> assigner,
+				RollingPolicy<IN, BucketID> policy,
+				FileWriterBucketFactory<IN, BucketID> bucketFactory,
+				OutputFileConfig outputFileConfig) {
+			this.basePath = checkNotNull(basePath);
+			this.encoder = checkNotNull(encoder);
+			this.bucketAssigner = checkNotNull(assigner);
+			this.rollingPolicy = checkNotNull(policy);
+			this.bucketFactory = checkNotNull(bucketFactory);
+			this.outputFileConfig = checkNotNull(outputFileConfig);
+		}
+
+		public T withBucketAssigner(final BucketAssigner<IN, BucketID> assigner) {
+			this.bucketAssigner = checkNotNull(assigner);
+			return self();
+		}
+
+		public T withRollingPolicy(final RollingPolicy<IN, BucketID> policy) {
+			this.rollingPolicy = checkNotNull(policy);
+			return self();
+		}
+
+		public T withOutputFileConfig(final OutputFileConfig outputFileConfig) {
+			this.outputFileConfig = outputFileConfig;
+			return self();
+		}
+
+		public RowFormatBuilder<IN, BucketID, ? extends RowFormatBuilder<IN, BucketID, ?>> withNewBucketAssignerAndPolicy(
+				BucketAssigner<IN, BucketID> assigner,
+				RollingPolicy<IN, BucketID> policy) {
+			Preconditions.checkState(
+					bucketFactory.getClass() == DefaultFileWriterBucketFactory.class,
+					"newBuilderWithBucketAssignerAndPolicy() cannot be called "
+							+ "after specifying a customized bucket factory");
+			return new RowFormatBuilder<>(
+					basePath,
+					encoder,
+					checkNotNull(assigner),
+					checkNotNull(policy),
+					bucketFactory,
+					outputFileConfig);
+		}
+
+		@VisibleForTesting
+		T withBucketFactory(final FileWriterBucketFactory<IN, BucketID> factory) {
+			this.bucketFactory = Preconditions.checkNotNull(factory);
+			return self();
+		}
+
+		/** Creates the actual sink. */
+		public FileSink<IN, BucketID> build() {
+			return new FileSink<>(this);
+		}
+
+		@Override
+		public FileWriter<IN, BucketID> createWriter(InitContext context) throws IOException {

Review comment:
       The interface is the `BucketsBuilder` right? Can't we change that to `package-private` as well? It is a newly introduced interface.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #13740: [FLINK-19758][filesystem] Implement a new unified file sink based on the new sink API

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #13740:
URL: https://github.com/apache/flink/pull/13740#issuecomment-714266782


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "af69ad89405936040b55d54acb0ef18e6141e5ad",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "af69ad89405936040b55d54acb0ef18e6141e5ad",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * af69ad89405936040b55d54acb0ef18e6141e5ad UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13740: [FLINK-19758][filesystem] Implement a new unified file sink based on the new sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13740:
URL: https://github.com/apache/flink/pull/13740#issuecomment-714266782


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "af69ad89405936040b55d54acb0ef18e6141e5ad",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8078",
       "triggerID" : "af69ad89405936040b55d54acb0ef18e6141e5ad",
       "triggerType" : "PUSH"
     }, {
       "hash" : "db5d2e491af1b183719c3365c57c3d4e5ee55b2a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8526",
       "triggerID" : "db5d2e491af1b183719c3365c57c3d4e5ee55b2a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef1570ccdb0c2f1d148ce20d2cfb17bf02afafdd",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8534",
       "triggerID" : "ef1570ccdb0c2f1d148ce20d2cfb17bf02afafdd",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ef1570ccdb0c2f1d148ce20d2cfb17bf02afafdd Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8534) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on a change in pull request #13740: [FLINK-19758][filesystem] Implement a new unified file sink based on the new sink API

Posted by GitBox <gi...@apache.org>.
gaoyunhaii commented on a change in pull request #13740:
URL: https://github.com/apache/flink/pull/13740#discussion_r516044747



##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.InProgressFileRecoverable;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A bucket is the directory organization of the output of the {@link FileSink}.
+ *
+ * <p>For each incoming element in the {@code FileSink}, the user-specified
+ * {@link BucketAssigner} is queried to see in which bucket this element should be written to.
+ */
+@Internal
+class FileWriterBucket<IN, BucketID> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FileWriterBucket.class);
+
+	private final BucketID bucketId;
+
+	private final Path bucketPath;
+
+	private final String uniqueId;
+
+	private final BucketWriter<IN, BucketID> bucketWriter;
+
+	private final RollingPolicy<IN, BucketID> rollingPolicy;
+
+	private final OutputFileConfig outputFileConfig;
+
+	private final List<InProgressFileWriter.PendingFileRecoverable> pendingFiles = new ArrayList<>();
+
+	private long partCounter;
+
+	@Nullable
+	private InProgressFileRecoverable inProgressFileToCleanup;
+
+	@Nullable
+	private InProgressFileWriter<IN, BucketID> inProgressPart;
+
+	/**
+	 * Constructor to create a new empty bucket.
+	 */
+	private FileWriterBucket(
+			BucketID bucketId,
+			Path bucketPath,
+			String uniqueId,
+			BucketWriter<IN, BucketID> bucketWriter,
+			RollingPolicy<IN, BucketID> rollingPolicy,
+			OutputFileConfig outputFileConfig) {
+		this.bucketId = checkNotNull(bucketId);
+		this.bucketPath = checkNotNull(bucketPath);
+		this.uniqueId = checkNotNull(uniqueId);

Review comment:
       Currently we did have no requirements for set it directly. I move the parameter to be initialized inside the `FileWriterBucket`.

##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.InProgressFileRecoverable;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A bucket is the directory organization of the output of the {@link FileSink}.
+ *
+ * <p>For each incoming element in the {@code FileSink}, the user-specified
+ * {@link BucketAssigner} is queried to see in which bucket this element should be written to.
+ */
+@Internal
+class FileWriterBucket<IN, BucketID> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FileWriterBucket.class);
+
+	private final BucketID bucketId;
+
+	private final Path bucketPath;
+
+	private final String uniqueId;
+
+	private final BucketWriter<IN, BucketID> bucketWriter;
+
+	private final RollingPolicy<IN, BucketID> rollingPolicy;
+
+	private final OutputFileConfig outputFileConfig;
+
+	private final List<InProgressFileWriter.PendingFileRecoverable> pendingFiles = new ArrayList<>();
+
+	private long partCounter;
+
+	@Nullable
+	private InProgressFileRecoverable inProgressFileToCleanup;
+
+	@Nullable
+	private InProgressFileWriter<IN, BucketID> inProgressPart;
+
+	/**
+	 * Constructor to create a new empty bucket.
+	 */
+	private FileWriterBucket(
+			BucketID bucketId,
+			Path bucketPath,
+			String uniqueId,
+			BucketWriter<IN, BucketID> bucketWriter,
+			RollingPolicy<IN, BucketID> rollingPolicy,
+			OutputFileConfig outputFileConfig) {
+		this.bucketId = checkNotNull(bucketId);
+		this.bucketPath = checkNotNull(bucketPath);
+		this.uniqueId = checkNotNull(uniqueId);

Review comment:
       Currently we did have no requirements for set it directly. I moved the parameter to be initialized inside the `FileWriterBucket`.

##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.InProgressFileRecoverable;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A bucket is the directory organization of the output of the {@link FileSink}.
+ *
+ * <p>For each incoming element in the {@code FileSink}, the user-specified
+ * {@link BucketAssigner} is queried to see in which bucket this element should be written to.
+ */
+@Internal
+class FileWriterBucket<IN, BucketID> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FileWriterBucket.class);
+
+	private final BucketID bucketId;
+
+	private final Path bucketPath;
+
+	private final String uniqueId;
+
+	private final BucketWriter<IN, BucketID> bucketWriter;
+
+	private final RollingPolicy<IN, BucketID> rollingPolicy;
+
+	private final OutputFileConfig outputFileConfig;
+
+	private final List<InProgressFileWriter.PendingFileRecoverable> pendingFiles = new ArrayList<>();
+
+	private long partCounter;
+
+	@Nullable
+	private InProgressFileRecoverable inProgressFileToCleanup;
+
+	@Nullable
+	private InProgressFileWriter<IN, BucketID> inProgressPart;
+
+	/**
+	 * Constructor to create a new empty bucket.
+	 */
+	private FileWriterBucket(
+			BucketID bucketId,
+			Path bucketPath,
+			String uniqueId,
+			BucketWriter<IN, BucketID> bucketWriter,
+			RollingPolicy<IN, BucketID> rollingPolicy,
+			OutputFileConfig outputFileConfig) {
+		this.bucketId = checkNotNull(bucketId);
+		this.bucketPath = checkNotNull(bucketPath);
+		this.uniqueId = checkNotNull(uniqueId);
+		this.bucketWriter = checkNotNull(bucketWriter);
+		this.rollingPolicy = checkNotNull(rollingPolicy);
+		this.outputFileConfig = checkNotNull(outputFileConfig);
+
+		this.partCounter = 0;
+	}
+
+	/**
+	 * Constructor to restore a bucket from checkpointed state.
+	 */
+	private FileWriterBucket(
+			String uniqueId,
+			BucketWriter<IN, BucketID> partFileFactory,
+			RollingPolicy<IN, BucketID> rollingPolicy,
+			FileWriterBucketState<BucketID> bucketState,
+			OutputFileConfig outputFileConfig) throws IOException {
+
+		this(
+				bucketState.getBucketId(),
+				bucketState.getBucketPath(),
+				uniqueId,
+				partFileFactory,
+				rollingPolicy,
+				outputFileConfig);
+
+		restoreInProgressFile(bucketState);
+	}
+
+	private void restoreInProgressFile(FileWriterBucketState<BucketID> state) throws IOException {
+		if (!state.hasInProgressFileRecoverable()) {
+			return;
+		}
+
+		// we try to resume the previous in-progress file
+		InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable =
+				state.getInProgressFileRecoverable();
+
+		if (bucketWriter.getProperties().supportsResume()) {
+			inProgressPart = bucketWriter.resumeInProgressFileFrom(
+					bucketId, inProgressFileRecoverable, state.getInProgressFileCreationTime());
+		} else {
+			pendingFiles.add(inProgressFileRecoverable);
+		}
+	}
+
+	public BucketID getBucketId() {
+		return bucketId;
+	}
+
+	public Path getBucketPath() {
+		return bucketPath;
+	}
+
+	public long getPartCounter() {
+		return partCounter;
+	}
+
+	public boolean isActive() {
+		return inProgressPart != null || inProgressFileToCleanup != null || pendingFiles.size() > 0;
+	}
+
+	void merge(final FileWriterBucket<IN, BucketID> bucket) throws IOException {
+		checkNotNull(bucket);
+		checkState(Objects.equals(bucket.bucketPath, bucketPath));
+
+		bucket.closePartFile();
+		pendingFiles.addAll(bucket.pendingFiles);
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Merging buckets for bucket id={}", bucketId);
+		}
+	}
+
+	void write(IN element) throws IOException {
+		long now = System.currentTimeMillis();
+		if (inProgressPart == null || rollingPolicy.shouldRollOnEvent(inProgressPart, element)) {
+			if (LOG.isDebugEnabled()) {
+				LOG.debug(
+						"closing in-progress part file for bucket id={} due to element {}.",
+						bucketId,
+						element);
+			}
+
+			inProgressPart = rollPartFile(now);
+		}
+
+		inProgressPart.write(element, now);
+	}
+
+	List<FileSinkCommittable> prepareCommit(boolean flush) throws IOException {
+		if (inProgressPart != null && (rollingPolicy.shouldRollOnCheckpoint(inProgressPart)
+				|| flush)) {
+			if (LOG.isDebugEnabled()) {
+				LOG.debug(
+						"Closing in-progress part file for bucket id={} on checkpoint.",
+						bucketId);
+			}
+			closePartFile();
+		}
+
+		List<FileSinkCommittable> committables = new ArrayList<>();
+		pendingFiles.forEach(pendingFile -> committables.add(new FileSinkCommittable(pendingFile)));
+		pendingFiles.clear();
+
+		if (inProgressFileToCleanup != null) {
+			committables.add(new FileSinkCommittable(inProgressFileToCleanup));
+			inProgressFileToCleanup = null;
+		}
+
+		return committables;
+	}
+
+	FileWriterBucketState<BucketID> snapshotState() throws IOException {
+		InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = null;
+		long inProgressFileCreationTime = Long.MAX_VALUE;
+
+		if (inProgressPart != null) {
+			inProgressFileRecoverable = inProgressPart.persist();
+			inProgressFileToCleanup = inProgressFileRecoverable;
+			inProgressFileCreationTime = inProgressPart.getCreationTime();
+		}
+
+		return new FileWriterBucketState<>(
+				bucketId,
+				bucketPath,
+				inProgressFileCreationTime,
+				inProgressFileRecoverable);
+	}
+
+	private InProgressFileWriter<IN, BucketID> rollPartFile(long currentTime) throws IOException {
+		closePartFile();
+
+		final Path partFilePath = assembleNewPartPath();
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Opening new part file \"{}\" for bucket id={}.",
+					partFilePath.getName(), bucketId);
+		}
+
+		return bucketWriter.openNewInProgressFile(bucketId, partFilePath, currentTime);
+	}
+
+	/**
+	 * Constructor a new PartPath and increment the partCounter.
+	 */
+	private Path assembleNewPartPath() {
+		long currentPartCounter = partCounter++;
+		return new Path(
+				bucketPath,
+				outputFileConfig.getPartPrefix() + '-' + uniqueId + '-' + currentPartCounter
+						+ outputFileConfig.getPartSuffix());
+	}
+
+	private void closePartFile() throws IOException {
+		if (inProgressPart != null) {
+			InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable = inProgressPart.closeForCommit();
+			pendingFiles.add(pendingFileRecoverable);
+			inProgressPart = null;
+		}
+	}
+
+	void disposePartFile() {
+		if (inProgressPart != null) {
+			inProgressPart.dispose();
+		}
+	}
+
+	// --------------------------- Testing Methods -----------------------------
+
+	@VisibleForTesting
+	public String getUniqueId() {
+		return uniqueId;
+	}
+
+	@Nullable
+	@VisibleForTesting
+	InProgressFileWriter<IN, BucketID> getInProgressPart() {
+		return inProgressPart;
+	}
+

Review comment:
       Added `@VisibleForTesting` for all the testing methods.

##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.writer;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.sink.Writer;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Writer implementation for {@link FileSink}.
+ */
+public class FileWriter<IN, BucketID>
+		implements Writer<IN, FileSinkCommittable, FileWriterBucketState<BucketID>> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FileWriter.class);
+
+	// ------------------------ configuration fields --------------------------
+
+	private final Path basePath;
+
+	private final FileWriterBucketFactory<IN, BucketID> bucketFactory;
+
+	private final BucketAssigner<IN, BucketID> bucketAssigner;
+
+	private final BucketWriter<IN, BucketID> bucketWriter;
+
+	private final RollingPolicy<IN, BucketID> rollingPolicy;
+
+	// --------------------------- runtime fields -----------------------------
+
+	private final BucketerContext bucketerContext;
+
+	private final Map<BucketID, FileWriterBucket<IN, BucketID>> activeBuckets;
+
+	private final OutputFileConfig outputFileConfig;
+
+	// --------------------------- State Related Fields -----------------------------
+
+	private final FileWriterBucketStateSerializer<BucketID> bucketStateSerializer;
+
+	/**
+	 * A constructor creating a new empty bucket manager.
+	 *
+	 * @param basePath The base path for our buckets.
+	 * @param bucketAssigner The {@link BucketAssigner} provided by the user.
+	 * @param bucketFactory The {@link FileWriterBucketFactory} to be used to create buckets.
+	 * @param bucketWriter The {@link BucketWriter} to be used when writing data.
+	 * @param rollingPolicy The {@link RollingPolicy} as specified by the user.
+	 */
+	public FileWriter(
+			final Path basePath,
+			final BucketAssigner<IN, BucketID> bucketAssigner,
+			final FileWriterBucketFactory<IN, BucketID> bucketFactory,
+			final BucketWriter<IN, BucketID> bucketWriter,
+			final RollingPolicy<IN, BucketID> rollingPolicy,
+			final OutputFileConfig outputFileConfig) {
+
+		this.basePath = checkNotNull(basePath);
+		this.bucketAssigner = checkNotNull(bucketAssigner);
+		this.bucketFactory = checkNotNull(bucketFactory);
+		this.bucketWriter = checkNotNull(bucketWriter);
+		this.rollingPolicy = checkNotNull(rollingPolicy);
+
+		this.outputFileConfig = checkNotNull(outputFileConfig);
+
+		this.activeBuckets = new HashMap<>();
+		this.bucketerContext = new BucketerContext();
+
+		this.bucketStateSerializer = new FileWriterBucketStateSerializer<>(
+				bucketWriter.getProperties().getInProgressFileRecoverableSerializer(),
+				bucketAssigner.getSerializer());
+	}
+
+	/**
+	 * Initializes the state after recovery from a failure.
+	 *
+	 * <p>During this process:
+	 * <ol>
+	 *     <li>we set the initial value for part counter to the maximum value used before across all tasks and buckets.
+	 *     This guarantees that we do not overwrite valid data,</li>
+	 *     <li>we commit any pending files for previous checkpoints (previous to the last successful one from which we restore),</li>
+	 *     <li>we resume writing to the previous in-progress file of each bucket, and</li>
+	 *     <li>if we receive multiple states for the same bucket, we merge them.</li>
+	 * </ol>
+	 *
+	 * @param bucketStates the state holding recovered state about active buckets.
+	 *
+	 * @throws Exception if anything goes wrong during retrieving the state or restoring/committing of any
+	 * 		in-progress/pending part files
+	 */
+	public void initializeState(List<FileWriterBucketState<BucketID>> bucketStates) throws IOException {
+		for (FileWriterBucketState<BucketID> state : bucketStates) {
+			BucketID bucketId = state.getBucketId();
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Restoring: {}", state);
+			}
+
+			FileWriterBucket<IN, BucketID> restoredBucket = bucketFactory.restoreBucket(
+					bucketWriter,
+					rollingPolicy,
+					state,
+					outputFileConfig);
+
+			updateActiveBucketId(bucketId, restoredBucket);
+		}
+	}
+
+	private void updateActiveBucketId(
+			BucketID bucketId,
+			FileWriterBucket<IN, BucketID> restoredBucket) throws IOException {
+		final FileWriterBucket<IN, BucketID> bucket = activeBuckets.get(bucketId);
+		if (bucket != null) {
+			bucket.merge(restoredBucket);
+		} else {
+			activeBuckets.put(bucketId, restoredBucket);
+		}
+	}
+
+	@Override
+	public void write(IN element, Context context) throws IOException {
+		// setting the values in the bucketer context
+		bucketerContext.update(
+				context.timestamp(),
+				context.currentWatermark());
+
+		final BucketID bucketId = bucketAssigner.getBucketId(element, bucketerContext);
+		final FileWriterBucket<IN, BucketID> bucket = getOrCreateBucketForBucketId(bucketId);
+		bucket.write(element);
+	}
+
+	@Override
+	public List<FileSinkCommittable> prepareCommit(boolean flush) throws IOException {
+		List<FileSinkCommittable> committables = new ArrayList<>();
+
+		// Every time before we prepare commit, we first check and remove the inactive
+		// buckets. Checking the activeness right before pre-committing avoid re-creating
+		// the bucket every time if the bucket use OnCheckpointingRollingPolicy.
+		Iterator<Map.Entry<BucketID, FileWriterBucket<IN, BucketID>>> activeBucketIt =
+				activeBuckets.entrySet().iterator();
+		while (activeBucketIt.hasNext()) {

Review comment:
       One concern for the bucket that only have pending files is that we check it after sending the commits, it would always be inactive, then we will need to remove it and add a new one in the next checkpoint period.Do you think this would be a problem ?   If we delay the checking for one checkpoint period, we would only remove the bucket if it does not receive records for one checkpoint period. 

##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.committer;
+
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Committer implementation for {@link FileSink}.
+ */
+public class FileCommitter implements Committer<FileSinkCommittable> {
+	private static final Logger LOG = LoggerFactory.getLogger(FileCommitter.class);
+
+	private final BucketWriter<?, ?> bucketWriter;
+
+	public FileCommitter(BucketWriter<?, ?> bucketWriter) {
+		this.bucketWriter = checkNotNull(bucketWriter);
+	}
+
+	@Override
+	public List<FileSinkCommittable> commit(List<FileSinkCommittable> committables)  {
+		List<FileSinkCommittable> needRetry = new ArrayList<>();
+		for (FileSinkCommittable committable : committables) {
+			if (committable.hasPendingFile()) {
+				// We should always use commitAfterRecovery which contains additional checks.
+				try {
+					bucketWriter.recoverPendingFile(committable.getPendingFile()).commitAfterRecovery();
+				} catch (IOException e) {
+					LOG.error("Failed to commit {}", committable.getPendingFile());
+					needRetry.add(committable);

Review comment:
       Agree with that we should not use retry now, I changed to return the empty list for now.

##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.committer;
+
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Committer implementation for {@link FileSink}.
+ */
+public class FileCommitter implements Committer<FileSinkCommittable> {
+	private static final Logger LOG = LoggerFactory.getLogger(FileCommitter.class);
+
+	private final BucketWriter<?, ?> bucketWriter;
+
+	public FileCommitter(BucketWriter<?, ?> bucketWriter) {
+		this.bucketWriter = checkNotNull(bucketWriter);
+	}
+
+	@Override
+	public List<FileSinkCommittable> commit(List<FileSinkCommittable> committables)  {
+		List<FileSinkCommittable> needRetry = new ArrayList<>();
+		for (FileSinkCommittable committable : committables) {
+			if (committable.hasPendingFile()) {
+				// We should always use commitAfterRecovery which contains additional checks.
+				try {
+					bucketWriter.recoverPendingFile(committable.getPendingFile()).commitAfterRecovery();
+				} catch (IOException e) {
+					LOG.error("Failed to commit {}", committable.getPendingFile());
+					needRetry.add(committable);

Review comment:
       Agree with that we should not use retry now, I changed to return the empty list for now.

##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.Writer;
+import org.apache.flink.connector.file.sink.committer.FileCommitter;
+import org.apache.flink.connector.file.sink.writer.DefaultFileWriterBucketFactory;
+import org.apache.flink.connector.file.sink.writer.FileWriter;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketFactory;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketState;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketStateSerializer;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A unified sink for both streaming and blocking mode, based on the new Sink API.
+ */
+@Experimental
+public class FileSink<IN, BucketID>
+		implements Sink<IN, FileSinkCommittable, FileWriterBucketState<BucketID>, Void> {
+
+	private final BucketsBuilder<IN, BucketID, ? extends BucketsBuilder<IN, BucketID, ?>> bucketsBuilder;
+
+	private FileSink(BucketsBuilder<IN, BucketID, ? extends BucketsBuilder<IN, BucketID, ?>> bucketsBuilder) {
+		this.bucketsBuilder = checkNotNull(bucketsBuilder);
+	}
+
+	@Override
+	public Writer<IN, FileSinkCommittable, FileWriterBucketState<BucketID>> createWriter(
+			InitContext context,
+			List<FileWriterBucketState<BucketID>> states) throws IOException {
+		FileWriter<IN, BucketID> writer = bucketsBuilder.createWriter(context);
+		writer.initializeState(states);
+		return writer;
+	}
+
+	@Override
+	public Optional<SimpleVersionedSerializer<FileWriterBucketState<BucketID>>> getWriterStateSerializer()
+			throws IOException {
+		return Optional.of(bucketsBuilder.getWriterStateSerializer());
+	}
+
+	@Override
+	public Optional<Committer<FileSinkCommittable>> createCommitter() throws IOException {
+		return Optional.of(bucketsBuilder.createCommitter());
+	}
+
+	@Override
+	public Optional<SimpleVersionedSerializer<FileSinkCommittable>> getCommittableSerializer()
+			throws IOException {
+		return Optional.of(bucketsBuilder.getCommittableSerializer());
+	}
+
+	@Override
+	public Optional<GlobalCommitter<FileSinkCommittable, Void>> createGlobalCommitter() {
+		return Optional.empty();
+	}
+
+	@Override
+	public Optional<SimpleVersionedSerializer<Void>> getGlobalCommittableSerializer() {
+		return Optional.empty();
+	}
+
+	public static <IN> DefaultRowFormatBuilder<IN> forRowFormat(
+			final Path basePath, final Encoder<IN> encoder) {
+		return new DefaultRowFormatBuilder<>(basePath, encoder, new DateTimeBucketAssigner<>());
+	}
+
+	/**
+	 * The base abstract class for the {@link RowFormatBuilder} and {@link BulkFormatBuilder}.
+	 */
+	@Internal
+	private abstract static class BucketsBuilder<IN, BucketID, T extends BucketsBuilder<IN, BucketID, T>>
+			implements Serializable {
+
+		private static final long serialVersionUID = 1L;
+
+		public static final long DEFAULT_BUCKET_CHECK_INTERVAL = 60L * 1000L;
+
+		@SuppressWarnings("unchecked")
+		protected T self() {
+			return (T) this;
+		}
+
+		@Internal
+		protected abstract FileWriter<IN, BucketID> createWriter(final InitContext context) throws IOException;
+
+		@Internal
+		protected abstract FileCommitter createCommitter() throws IOException;
+
+		@Internal
+		protected abstract SimpleVersionedSerializer<FileWriterBucketState<BucketID>> getWriterStateSerializer() throws IOException;
+
+		@Internal
+		protected abstract SimpleVersionedSerializer<FileSinkCommittable> getCommittableSerializer() throws IOException;
+	}
+
+	/**
+	 * A builder for configuring the sink for row-wise encoding formats.
+	 */
+	public static class RowFormatBuilder<IN, BucketID, T extends RowFormatBuilder<IN, BucketID, T>>
+			extends BucketsBuilder<IN, BucketID, T> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final Path basePath;
+
+		private final Encoder<IN> encoder;
+
+		private BucketAssigner<IN, BucketID> bucketAssigner;
+
+		private RollingPolicy<IN, BucketID> rollingPolicy;
+
+		private FileWriterBucketFactory<IN, BucketID> bucketFactory;
+
+		private OutputFileConfig outputFileConfig;
+
+		protected RowFormatBuilder(
+				Path basePath,
+				Encoder<IN> encoder,
+				BucketAssigner<IN, BucketID> bucketAssigner) {
+			this(
+					basePath,
+					encoder,
+					bucketAssigner,
+					DefaultRollingPolicy.builder().build(),
+					new DefaultFileWriterBucketFactory<>(),
+					OutputFileConfig.builder().build());
+		}
+
+		protected RowFormatBuilder(
+				Path basePath,
+				Encoder<IN> encoder,
+				BucketAssigner<IN, BucketID> assigner,
+				RollingPolicy<IN, BucketID> policy,
+				FileWriterBucketFactory<IN, BucketID> bucketFactory,
+				OutputFileConfig outputFileConfig) {
+			this.basePath = checkNotNull(basePath);
+			this.encoder = checkNotNull(encoder);
+			this.bucketAssigner = checkNotNull(assigner);
+			this.rollingPolicy = checkNotNull(policy);
+			this.bucketFactory = checkNotNull(bucketFactory);
+			this.outputFileConfig = checkNotNull(outputFileConfig);
+		}
+
+		public T withBucketAssigner(final BucketAssigner<IN, BucketID> assigner) {
+			this.bucketAssigner = checkNotNull(assigner);
+			return self();
+		}
+
+		public T withRollingPolicy(final RollingPolicy<IN, BucketID> policy) {
+			this.rollingPolicy = checkNotNull(policy);
+			return self();
+		}
+
+		public T withOutputFileConfig(final OutputFileConfig outputFileConfig) {
+			this.outputFileConfig = outputFileConfig;
+			return self();
+		}
+
+		public RowFormatBuilder<IN, BucketID, ? extends RowFormatBuilder<IN, BucketID, ?>> withNewBucketAssignerAndPolicy(
+				BucketAssigner<IN, BucketID> assigner,
+				RollingPolicy<IN, BucketID> policy) {
+			Preconditions.checkState(
+					bucketFactory.getClass() == DefaultFileWriterBucketFactory.class,
+					"newBuilderWithBucketAssignerAndPolicy() cannot be called "
+							+ "after specifying a customized bucket factory");
+			return new RowFormatBuilder<>(
+					basePath,
+					encoder,
+					checkNotNull(assigner),
+					checkNotNull(policy),
+					bucketFactory,
+					outputFileConfig);
+		}
+
+		@VisibleForTesting
+		T withBucketFactory(final FileWriterBucketFactory<IN, BucketID> factory) {
+			this.bucketFactory = Preconditions.checkNotNull(factory);
+			return self();
+		}
+
+		/** Creates the actual sink. */
+		public FileSink<IN, BucketID> build() {
+			return new FileSink<>(this);
+		}
+
+		@Override
+		public FileWriter<IN, BucketID> createWriter(InitContext context) throws IOException {

Review comment:
       It seems we could not change the modifier since the method is inherited from an interface~?

##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.Writer;
+import org.apache.flink.connector.file.sink.committer.FileCommitter;
+import org.apache.flink.connector.file.sink.writer.DefaultFileWriterBucketFactory;
+import org.apache.flink.connector.file.sink.writer.FileWriter;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketFactory;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketState;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketStateSerializer;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A unified sink for both streaming and blocking mode, based on the new Sink API.
+ */
+@Experimental
+public class FileSink<IN, BucketID>
+		implements Sink<IN, FileSinkCommittable, FileWriterBucketState<BucketID>, Void> {
+
+	private final BucketsBuilder<IN, BucketID, ? extends BucketsBuilder<IN, BucketID, ?>> bucketsBuilder;
+
+	private FileSink(BucketsBuilder<IN, BucketID, ? extends BucketsBuilder<IN, BucketID, ?>> bucketsBuilder) {
+		this.bucketsBuilder = checkNotNull(bucketsBuilder);
+	}
+
+	@Override
+	public Writer<IN, FileSinkCommittable, FileWriterBucketState<BucketID>> createWriter(
+			InitContext context,
+			List<FileWriterBucketState<BucketID>> states) throws IOException {
+		FileWriter<IN, BucketID> writer = bucketsBuilder.createWriter(context);
+		writer.initializeState(states);
+		return writer;
+	}
+
+	@Override
+	public Optional<SimpleVersionedSerializer<FileWriterBucketState<BucketID>>> getWriterStateSerializer()
+			throws IOException {
+		return Optional.of(bucketsBuilder.getWriterStateSerializer());
+	}
+
+	@Override
+	public Optional<Committer<FileSinkCommittable>> createCommitter() throws IOException {
+		return Optional.of(bucketsBuilder.createCommitter());
+	}
+
+	@Override
+	public Optional<SimpleVersionedSerializer<FileSinkCommittable>> getCommittableSerializer()
+			throws IOException {
+		return Optional.of(bucketsBuilder.getCommittableSerializer());
+	}
+
+	@Override
+	public Optional<GlobalCommitter<FileSinkCommittable, Void>> createGlobalCommitter() {
+		return Optional.empty();
+	}
+
+	@Override
+	public Optional<SimpleVersionedSerializer<Void>> getGlobalCommittableSerializer() {
+		return Optional.empty();
+	}
+
+	public static <IN> DefaultRowFormatBuilder<IN> forRowFormat(
+			final Path basePath, final Encoder<IN> encoder) {
+		return new DefaultRowFormatBuilder<>(basePath, encoder, new DateTimeBucketAssigner<>());
+	}
+
+	/**
+	 * The base abstract class for the {@link RowFormatBuilder} and {@link BulkFormatBuilder}.
+	 */
+	@Internal
+	private abstract static class BucketsBuilder<IN, BucketID, T extends BucketsBuilder<IN, BucketID, T>>
+			implements Serializable {
+
+		private static final long serialVersionUID = 1L;
+
+		public static final long DEFAULT_BUCKET_CHECK_INTERVAL = 60L * 1000L;
+
+		@SuppressWarnings("unchecked")
+		protected T self() {
+			return (T) this;
+		}
+
+		@Internal
+		protected abstract FileWriter<IN, BucketID> createWriter(final InitContext context) throws IOException;
+
+		@Internal
+		protected abstract FileCommitter createCommitter() throws IOException;
+
+		@Internal
+		protected abstract SimpleVersionedSerializer<FileWriterBucketState<BucketID>> getWriterStateSerializer() throws IOException;
+
+		@Internal
+		protected abstract SimpleVersionedSerializer<FileSinkCommittable> getCommittableSerializer() throws IOException;
+	}
+
+	/**
+	 * A builder for configuring the sink for row-wise encoding formats.
+	 */
+	public static class RowFormatBuilder<IN, BucketID, T extends RowFormatBuilder<IN, BucketID, T>>
+			extends BucketsBuilder<IN, BucketID, T> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final Path basePath;
+
+		private final Encoder<IN> encoder;
+
+		private BucketAssigner<IN, BucketID> bucketAssigner;
+
+		private RollingPolicy<IN, BucketID> rollingPolicy;
+
+		private FileWriterBucketFactory<IN, BucketID> bucketFactory;
+
+		private OutputFileConfig outputFileConfig;
+
+		protected RowFormatBuilder(
+				Path basePath,
+				Encoder<IN> encoder,
+				BucketAssigner<IN, BucketID> bucketAssigner) {
+			this(
+					basePath,
+					encoder,
+					bucketAssigner,
+					DefaultRollingPolicy.builder().build(),
+					new DefaultFileWriterBucketFactory<>(),
+					OutputFileConfig.builder().build());
+		}
+
+		protected RowFormatBuilder(
+				Path basePath,
+				Encoder<IN> encoder,
+				BucketAssigner<IN, BucketID> assigner,
+				RollingPolicy<IN, BucketID> policy,
+				FileWriterBucketFactory<IN, BucketID> bucketFactory,
+				OutputFileConfig outputFileConfig) {
+			this.basePath = checkNotNull(basePath);
+			this.encoder = checkNotNull(encoder);
+			this.bucketAssigner = checkNotNull(assigner);
+			this.rollingPolicy = checkNotNull(policy);
+			this.bucketFactory = checkNotNull(bucketFactory);
+			this.outputFileConfig = checkNotNull(outputFileConfig);
+		}
+
+		public T withBucketAssigner(final BucketAssigner<IN, BucketID> assigner) {
+			this.bucketAssigner = checkNotNull(assigner);
+			return self();
+		}
+
+		public T withRollingPolicy(final RollingPolicy<IN, BucketID> policy) {
+			this.rollingPolicy = checkNotNull(policy);
+			return self();
+		}
+
+		public T withOutputFileConfig(final OutputFileConfig outputFileConfig) {
+			this.outputFileConfig = outputFileConfig;
+			return self();
+		}
+
+		public RowFormatBuilder<IN, BucketID, ? extends RowFormatBuilder<IN, BucketID, ?>> withNewBucketAssignerAndPolicy(
+				BucketAssigner<IN, BucketID> assigner,
+				RollingPolicy<IN, BucketID> policy) {
+			Preconditions.checkState(
+					bucketFactory.getClass() == DefaultFileWriterBucketFactory.class,
+					"newBuilderWithBucketAssignerAndPolicy() cannot be called "
+							+ "after specifying a customized bucket factory");
+			return new RowFormatBuilder<>(
+					basePath,
+					encoder,
+					checkNotNull(assigner),
+					checkNotNull(policy),
+					bucketFactory,
+					outputFileConfig);
+		}
+
+		@VisibleForTesting
+		T withBucketFactory(final FileWriterBucketFactory<IN, BucketID> factory) {
+			this.bucketFactory = Preconditions.checkNotNull(factory);
+			return self();
+		}
+
+		/** Creates the actual sink. */
+		public FileSink<IN, BucketID> build() {
+			return new FileSink<>(this);
+		}
+
+		@Override
+		public FileWriter<IN, BucketID> createWriter(InitContext context) throws IOException {

Review comment:
       Yes, indeed, I changed the modifier to `package-private`.

##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.writer;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.sink.Writer;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Writer implementation for {@link FileSink}.
+ */
+public class FileWriter<IN, BucketID>
+		implements Writer<IN, FileSinkCommittable, FileWriterBucketState<BucketID>> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FileWriter.class);
+
+	// ------------------------ configuration fields --------------------------
+
+	private final Path basePath;
+
+	private final FileWriterBucketFactory<IN, BucketID> bucketFactory;
+
+	private final BucketAssigner<IN, BucketID> bucketAssigner;
+
+	private final BucketWriter<IN, BucketID> bucketWriter;
+
+	private final RollingPolicy<IN, BucketID> rollingPolicy;
+
+	// --------------------------- runtime fields -----------------------------
+
+	private final BucketerContext bucketerContext;
+
+	private final Map<BucketID, FileWriterBucket<IN, BucketID>> activeBuckets;
+
+	private final OutputFileConfig outputFileConfig;
+
+	// --------------------------- State Related Fields -----------------------------
+
+	private final FileWriterBucketStateSerializer<BucketID> bucketStateSerializer;
+
+	/**
+	 * A constructor creating a new empty bucket manager.
+	 *
+	 * @param basePath The base path for our buckets.
+	 * @param bucketAssigner The {@link BucketAssigner} provided by the user.
+	 * @param bucketFactory The {@link FileWriterBucketFactory} to be used to create buckets.
+	 * @param bucketWriter The {@link BucketWriter} to be used when writing data.
+	 * @param rollingPolicy The {@link RollingPolicy} as specified by the user.
+	 */
+	public FileWriter(
+			final Path basePath,
+			final BucketAssigner<IN, BucketID> bucketAssigner,
+			final FileWriterBucketFactory<IN, BucketID> bucketFactory,
+			final BucketWriter<IN, BucketID> bucketWriter,
+			final RollingPolicy<IN, BucketID> rollingPolicy,
+			final OutputFileConfig outputFileConfig) {
+
+		this.basePath = checkNotNull(basePath);
+		this.bucketAssigner = checkNotNull(bucketAssigner);
+		this.bucketFactory = checkNotNull(bucketFactory);
+		this.bucketWriter = checkNotNull(bucketWriter);
+		this.rollingPolicy = checkNotNull(rollingPolicy);
+
+		this.outputFileConfig = checkNotNull(outputFileConfig);
+
+		this.activeBuckets = new HashMap<>();
+		this.bucketerContext = new BucketerContext();
+
+		this.bucketStateSerializer = new FileWriterBucketStateSerializer<>(
+				bucketWriter.getProperties().getInProgressFileRecoverableSerializer(),
+				bucketAssigner.getSerializer());
+	}
+
+	/**
+	 * Initializes the state after recovery from a failure.
+	 *
+	 * <p>During this process:
+	 * <ol>
+	 *     <li>we set the initial value for part counter to the maximum value used before across all tasks and buckets.
+	 *     This guarantees that we do not overwrite valid data,</li>
+	 *     <li>we commit any pending files for previous checkpoints (previous to the last successful one from which we restore),</li>
+	 *     <li>we resume writing to the previous in-progress file of each bucket, and</li>
+	 *     <li>if we receive multiple states for the same bucket, we merge them.</li>
+	 * </ol>
+	 *
+	 * @param bucketStates the state holding recovered state about active buckets.
+	 *
+	 * @throws Exception if anything goes wrong during retrieving the state or restoring/committing of any
+	 * 		in-progress/pending part files
+	 */
+	public void initializeState(List<FileWriterBucketState<BucketID>> bucketStates) throws IOException {
+		for (FileWriterBucketState<BucketID> state : bucketStates) {
+			BucketID bucketId = state.getBucketId();
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Restoring: {}", state);
+			}
+
+			FileWriterBucket<IN, BucketID> restoredBucket = bucketFactory.restoreBucket(
+					bucketWriter,
+					rollingPolicy,
+					state,
+					outputFileConfig);
+
+			updateActiveBucketId(bucketId, restoredBucket);
+		}
+	}
+
+	private void updateActiveBucketId(
+			BucketID bucketId,
+			FileWriterBucket<IN, BucketID> restoredBucket) throws IOException {
+		final FileWriterBucket<IN, BucketID> bucket = activeBuckets.get(bucketId);
+		if (bucket != null) {
+			bucket.merge(restoredBucket);
+		} else {
+			activeBuckets.put(bucketId, restoredBucket);
+		}
+	}
+
+	@Override
+	public void write(IN element, Context context) throws IOException {
+		// setting the values in the bucketer context
+		bucketerContext.update(
+				context.timestamp(),
+				context.currentWatermark());
+
+		final BucketID bucketId = bucketAssigner.getBucketId(element, bucketerContext);
+		final FileWriterBucket<IN, BucketID> bucket = getOrCreateBucketForBucketId(bucketId);
+		bucket.write(element);
+	}
+
+	@Override
+	public List<FileSinkCommittable> prepareCommit(boolean flush) throws IOException {
+		List<FileSinkCommittable> committables = new ArrayList<>();
+
+		// Every time before we prepare commit, we first check and remove the inactive
+		// buckets. Checking the activeness right before pre-committing avoid re-creating
+		// the bucket every time if the bucket use OnCheckpointingRollingPolicy.
+		Iterator<Map.Entry<BucketID, FileWriterBucket<IN, BucketID>>> activeBucketIt =
+				activeBuckets.entrySet().iterator();
+		while (activeBucketIt.hasNext()) {

Review comment:
       No, not this issue, I referred to the issue that we might need to recreate the bucket on each checkpoint period if `OnCheckpointRollingPolicy` is used and we detecting the inactiveness after send the committables. In this case the bucket will always be detected to be inactive since all committables are sent out right before the detection.

##########
File path: flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.writer;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.sink.Writer;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Writer implementation for {@link FileSink}.
+ */
+public class FileWriter<IN, BucketID>
+		implements Writer<IN, FileSinkCommittable, FileWriterBucketState<BucketID>> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(FileWriter.class);
+
+	// ------------------------ configuration fields --------------------------
+
+	private final Path basePath;
+
+	private final FileWriterBucketFactory<IN, BucketID> bucketFactory;
+
+	private final BucketAssigner<IN, BucketID> bucketAssigner;
+
+	private final BucketWriter<IN, BucketID> bucketWriter;
+
+	private final RollingPolicy<IN, BucketID> rollingPolicy;
+
+	// --------------------------- runtime fields -----------------------------
+
+	private final BucketerContext bucketerContext;
+
+	private final Map<BucketID, FileWriterBucket<IN, BucketID>> activeBuckets;
+
+	private final OutputFileConfig outputFileConfig;
+
+	// --------------------------- State Related Fields -----------------------------
+
+	private final FileWriterBucketStateSerializer<BucketID> bucketStateSerializer;
+
+	/**
+	 * A constructor creating a new empty bucket manager.
+	 *
+	 * @param basePath The base path for our buckets.
+	 * @param bucketAssigner The {@link BucketAssigner} provided by the user.
+	 * @param bucketFactory The {@link FileWriterBucketFactory} to be used to create buckets.
+	 * @param bucketWriter The {@link BucketWriter} to be used when writing data.
+	 * @param rollingPolicy The {@link RollingPolicy} as specified by the user.
+	 */
+	public FileWriter(
+			final Path basePath,
+			final BucketAssigner<IN, BucketID> bucketAssigner,
+			final FileWriterBucketFactory<IN, BucketID> bucketFactory,
+			final BucketWriter<IN, BucketID> bucketWriter,
+			final RollingPolicy<IN, BucketID> rollingPolicy,
+			final OutputFileConfig outputFileConfig) {
+
+		this.basePath = checkNotNull(basePath);
+		this.bucketAssigner = checkNotNull(bucketAssigner);
+		this.bucketFactory = checkNotNull(bucketFactory);
+		this.bucketWriter = checkNotNull(bucketWriter);
+		this.rollingPolicy = checkNotNull(rollingPolicy);
+
+		this.outputFileConfig = checkNotNull(outputFileConfig);
+
+		this.activeBuckets = new HashMap<>();
+		this.bucketerContext = new BucketerContext();
+
+		this.bucketStateSerializer = new FileWriterBucketStateSerializer<>(
+				bucketWriter.getProperties().getInProgressFileRecoverableSerializer(),
+				bucketAssigner.getSerializer());
+	}
+
+	/**
+	 * Initializes the state after recovery from a failure.
+	 *
+	 * <p>During this process:
+	 * <ol>
+	 *     <li>we set the initial value for part counter to the maximum value used before across all tasks and buckets.
+	 *     This guarantees that we do not overwrite valid data,</li>
+	 *     <li>we commit any pending files for previous checkpoints (previous to the last successful one from which we restore),</li>
+	 *     <li>we resume writing to the previous in-progress file of each bucket, and</li>
+	 *     <li>if we receive multiple states for the same bucket, we merge them.</li>
+	 * </ol>
+	 *
+	 * @param bucketStates the state holding recovered state about active buckets.
+	 *
+	 * @throws Exception if anything goes wrong during retrieving the state or restoring/committing of any
+	 * 		in-progress/pending part files
+	 */
+	public void initializeState(List<FileWriterBucketState<BucketID>> bucketStates) throws IOException {
+		for (FileWriterBucketState<BucketID> state : bucketStates) {
+			BucketID bucketId = state.getBucketId();
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Restoring: {}", state);
+			}
+
+			FileWriterBucket<IN, BucketID> restoredBucket = bucketFactory.restoreBucket(
+					bucketWriter,
+					rollingPolicy,
+					state,
+					outputFileConfig);
+
+			updateActiveBucketId(bucketId, restoredBucket);
+		}
+	}
+
+	private void updateActiveBucketId(
+			BucketID bucketId,
+			FileWriterBucket<IN, BucketID> restoredBucket) throws IOException {
+		final FileWriterBucket<IN, BucketID> bucket = activeBuckets.get(bucketId);
+		if (bucket != null) {
+			bucket.merge(restoredBucket);
+		} else {
+			activeBuckets.put(bucketId, restoredBucket);
+		}
+	}
+
+	@Override
+	public void write(IN element, Context context) throws IOException {
+		// setting the values in the bucketer context
+		bucketerContext.update(
+				context.timestamp(),
+				context.currentWatermark());
+
+		final BucketID bucketId = bucketAssigner.getBucketId(element, bucketerContext);
+		final FileWriterBucket<IN, BucketID> bucket = getOrCreateBucketForBucketId(bucketId);
+		bucket.write(element);
+	}
+
+	@Override
+	public List<FileSinkCommittable> prepareCommit(boolean flush) throws IOException {
+		List<FileSinkCommittable> committables = new ArrayList<>();
+
+		// Every time before we prepare commit, we first check and remove the inactive
+		// buckets. Checking the activeness right before pre-committing avoid re-creating
+		// the bucket every time if the bucket use OnCheckpointingRollingPolicy.
+		Iterator<Map.Entry<BucketID, FileWriterBucket<IN, BucketID>>> activeBucketIt =
+				activeBuckets.entrySet().iterator();
+		while (activeBucketIt.hasNext()) {

Review comment:
       No, not this issue, I mean the issue that we might need to recreate the bucket on each checkpoint period if `OnCheckpointRollingPolicy` is used and we detecting the inactiveness after send the committables. In this case the bucket will always be detected to be inactive since all committables are sent out right before the detection.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13740: [FLINK-19758][filesystem] Implement a new unified file sink based on the new sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13740:
URL: https://github.com/apache/flink/pull/13740#issuecomment-714266782


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "af69ad89405936040b55d54acb0ef18e6141e5ad",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8078",
       "triggerID" : "af69ad89405936040b55d54acb0ef18e6141e5ad",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * af69ad89405936040b55d54acb0ef18e6141e5ad Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8078) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13740: [FLINK-19758][filesystem] Implement a new unified file sink based on the new sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13740:
URL: https://github.com/apache/flink/pull/13740#issuecomment-714266782


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "af69ad89405936040b55d54acb0ef18e6141e5ad",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8078",
       "triggerID" : "af69ad89405936040b55d54acb0ef18e6141e5ad",
       "triggerType" : "PUSH"
     }, {
       "hash" : "db5d2e491af1b183719c3365c57c3d4e5ee55b2a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8526",
       "triggerID" : "db5d2e491af1b183719c3365c57c3d4e5ee55b2a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef1570ccdb0c2f1d148ce20d2cfb17bf02afafdd",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8534",
       "triggerID" : "ef1570ccdb0c2f1d148ce20d2cfb17bf02afafdd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8ad8ba6a6974bdc2295b372d538a95ec1a086b63",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8ad8ba6a6974bdc2295b372d538a95ec1a086b63",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ef1570ccdb0c2f1d148ce20d2cfb17bf02afafdd Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8534) 
   * 8ad8ba6a6974bdc2295b372d538a95ec1a086b63 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13740: [FLINK-19758][filesystem] Implement a new unified file sink based on the new sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13740:
URL: https://github.com/apache/flink/pull/13740#issuecomment-714266782


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "af69ad89405936040b55d54acb0ef18e6141e5ad",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8078",
       "triggerID" : "af69ad89405936040b55d54acb0ef18e6141e5ad",
       "triggerType" : "PUSH"
     }, {
       "hash" : "db5d2e491af1b183719c3365c57c3d4e5ee55b2a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8526",
       "triggerID" : "db5d2e491af1b183719c3365c57c3d4e5ee55b2a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef1570ccdb0c2f1d148ce20d2cfb17bf02afafdd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8534",
       "triggerID" : "ef1570ccdb0c2f1d148ce20d2cfb17bf02afafdd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8ad8ba6a6974bdc2295b372d538a95ec1a086b63",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8566",
       "triggerID" : "8ad8ba6a6974bdc2295b372d538a95ec1a086b63",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0986c4559d84792d12e6652b323c8a0b93f3b415",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "0986c4559d84792d12e6652b323c8a0b93f3b415",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fa784a03a5e1f2b3e1635b1151187129d04ef00d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8820",
       "triggerID" : "fa784a03a5e1f2b3e1635b1151187129d04ef00d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b8f151e4cedd920d57e4ec705cf7a79b534e0b2c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8808",
       "triggerID" : "b8f151e4cedd920d57e4ec705cf7a79b534e0b2c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b8f151e4cedd920d57e4ec705cf7a79b534e0b2c",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8887",
       "triggerID" : "721082335",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "6f5fac8cf5cf7e0ea5e87c1ca242869ee44fab17",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6f5fac8cf5cf7e0ea5e87c1ca242869ee44fab17",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0986c4559d84792d12e6652b323c8a0b93f3b415 UNKNOWN
   * b8f151e4cedd920d57e4ec705cf7a79b534e0b2c Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8808) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8887) 
   * 6f5fac8cf5cf7e0ea5e87c1ca242869ee44fab17 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on pull request #13740: [FLINK-19758][filesystem] Implement a new unified file sink based on the new sink API

Posted by GitBox <gi...@apache.org>.
aljoscha commented on pull request #13740:
URL: https://github.com/apache/flink/pull/13740#issuecomment-721082335


   @flinkbot run azure


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13740: [FLINK-19758][filesystem] Implement a new unified file sink based on the new sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13740:
URL: https://github.com/apache/flink/pull/13740#issuecomment-714266782


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "af69ad89405936040b55d54acb0ef18e6141e5ad",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8078",
       "triggerID" : "af69ad89405936040b55d54acb0ef18e6141e5ad",
       "triggerType" : "PUSH"
     }, {
       "hash" : "db5d2e491af1b183719c3365c57c3d4e5ee55b2a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8526",
       "triggerID" : "db5d2e491af1b183719c3365c57c3d4e5ee55b2a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef1570ccdb0c2f1d148ce20d2cfb17bf02afafdd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8534",
       "triggerID" : "ef1570ccdb0c2f1d148ce20d2cfb17bf02afafdd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8ad8ba6a6974bdc2295b372d538a95ec1a086b63",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8566",
       "triggerID" : "8ad8ba6a6974bdc2295b372d538a95ec1a086b63",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0986c4559d84792d12e6652b323c8a0b93f3b415",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "0986c4559d84792d12e6652b323c8a0b93f3b415",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fa784a03a5e1f2b3e1635b1151187129d04ef00d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8820",
       "triggerID" : "fa784a03a5e1f2b3e1635b1151187129d04ef00d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b8f151e4cedd920d57e4ec705cf7a79b534e0b2c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8808",
       "triggerID" : "b8f151e4cedd920d57e4ec705cf7a79b534e0b2c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b8f151e4cedd920d57e4ec705cf7a79b534e0b2c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8887",
       "triggerID" : "721082335",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "6f5fac8cf5cf7e0ea5e87c1ca242869ee44fab17",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8886",
       "triggerID" : "6f5fac8cf5cf7e0ea5e87c1ca242869ee44fab17",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e49d604917caf8326cdfb3af129946d7030dba9f",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8916",
       "triggerID" : "e49d604917caf8326cdfb3af129946d7030dba9f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0986c4559d84792d12e6652b323c8a0b93f3b415 UNKNOWN
   * 6f5fac8cf5cf7e0ea5e87c1ca242869ee44fab17 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8886) 
   * e49d604917caf8326cdfb3af129946d7030dba9f Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8916) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13740: [FLINK-19758][filesystem] Implement a new unified file sink based on the new sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13740:
URL: https://github.com/apache/flink/pull/13740#issuecomment-714266782


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "af69ad89405936040b55d54acb0ef18e6141e5ad",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8078",
       "triggerID" : "af69ad89405936040b55d54acb0ef18e6141e5ad",
       "triggerType" : "PUSH"
     }, {
       "hash" : "db5d2e491af1b183719c3365c57c3d4e5ee55b2a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8526",
       "triggerID" : "db5d2e491af1b183719c3365c57c3d4e5ee55b2a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef1570ccdb0c2f1d148ce20d2cfb17bf02afafdd",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "ef1570ccdb0c2f1d148ce20d2cfb17bf02afafdd",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * db5d2e491af1b183719c3365c57c3d4e5ee55b2a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8526) 
   * ef1570ccdb0c2f1d148ce20d2cfb17bf02afafdd UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13740: [FLINK-19758][filesystem] Implement a new unified file sink based on the new sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13740:
URL: https://github.com/apache/flink/pull/13740#issuecomment-714266782


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "af69ad89405936040b55d54acb0ef18e6141e5ad",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8078",
       "triggerID" : "af69ad89405936040b55d54acb0ef18e6141e5ad",
       "triggerType" : "PUSH"
     }, {
       "hash" : "db5d2e491af1b183719c3365c57c3d4e5ee55b2a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8526",
       "triggerID" : "db5d2e491af1b183719c3365c57c3d4e5ee55b2a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef1570ccdb0c2f1d148ce20d2cfb17bf02afafdd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8534",
       "triggerID" : "ef1570ccdb0c2f1d148ce20d2cfb17bf02afafdd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8ad8ba6a6974bdc2295b372d538a95ec1a086b63",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8566",
       "triggerID" : "8ad8ba6a6974bdc2295b372d538a95ec1a086b63",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0986c4559d84792d12e6652b323c8a0b93f3b415",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "0986c4559d84792d12e6652b323c8a0b93f3b415",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fa784a03a5e1f2b3e1635b1151187129d04ef00d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8820",
       "triggerID" : "fa784a03a5e1f2b3e1635b1151187129d04ef00d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b8f151e4cedd920d57e4ec705cf7a79b534e0b2c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8808",
       "triggerID" : "b8f151e4cedd920d57e4ec705cf7a79b534e0b2c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b8f151e4cedd920d57e4ec705cf7a79b534e0b2c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8887",
       "triggerID" : "721082335",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "6f5fac8cf5cf7e0ea5e87c1ca242869ee44fab17",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8886",
       "triggerID" : "6f5fac8cf5cf7e0ea5e87c1ca242869ee44fab17",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e49d604917caf8326cdfb3af129946d7030dba9f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8916",
       "triggerID" : "e49d604917caf8326cdfb3af129946d7030dba9f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "90aacd961fbb0ed538fda229642a707f7afcca69",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8940",
       "triggerID" : "90aacd961fbb0ed538fda229642a707f7afcca69",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0986c4559d84792d12e6652b323c8a0b93f3b415 UNKNOWN
   * 90aacd961fbb0ed538fda229642a707f7afcca69 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8940) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13740: [FLINK-19758][filesystem] Implement a new unified file sink based on the new sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13740:
URL: https://github.com/apache/flink/pull/13740#issuecomment-714266782


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "af69ad89405936040b55d54acb0ef18e6141e5ad",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8078",
       "triggerID" : "af69ad89405936040b55d54acb0ef18e6141e5ad",
       "triggerType" : "PUSH"
     }, {
       "hash" : "db5d2e491af1b183719c3365c57c3d4e5ee55b2a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8526",
       "triggerID" : "db5d2e491af1b183719c3365c57c3d4e5ee55b2a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef1570ccdb0c2f1d148ce20d2cfb17bf02afafdd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8534",
       "triggerID" : "ef1570ccdb0c2f1d148ce20d2cfb17bf02afafdd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8ad8ba6a6974bdc2295b372d538a95ec1a086b63",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8566",
       "triggerID" : "8ad8ba6a6974bdc2295b372d538a95ec1a086b63",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0986c4559d84792d12e6652b323c8a0b93f3b415",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "0986c4559d84792d12e6652b323c8a0b93f3b415",
       "triggerType" : "PUSH"
     }, {
       "hash" : "fa784a03a5e1f2b3e1635b1151187129d04ef00d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8820",
       "triggerID" : "fa784a03a5e1f2b3e1635b1151187129d04ef00d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b8f151e4cedd920d57e4ec705cf7a79b534e0b2c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8808",
       "triggerID" : "b8f151e4cedd920d57e4ec705cf7a79b534e0b2c",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b8f151e4cedd920d57e4ec705cf7a79b534e0b2c",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8887",
       "triggerID" : "721082335",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "6f5fac8cf5cf7e0ea5e87c1ca242869ee44fab17",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8886",
       "triggerID" : "6f5fac8cf5cf7e0ea5e87c1ca242869ee44fab17",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0986c4559d84792d12e6652b323c8a0b93f3b415 UNKNOWN
   * 6f5fac8cf5cf7e0ea5e87c1ca242869ee44fab17 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8886) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha closed pull request #13740: [FLINK-19758][filesystem] Implement a new unified file sink based on the new sink API

Posted by GitBox <gi...@apache.org>.
aljoscha closed pull request #13740:
URL: https://github.com/apache/flink/pull/13740


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #13740: [FLINK-19758][filesystem] Implement a new unified file sink based on the new sink API

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #13740:
URL: https://github.com/apache/flink/pull/13740#issuecomment-714266782


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "af69ad89405936040b55d54acb0ef18e6141e5ad",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8078",
       "triggerID" : "af69ad89405936040b55d54acb0ef18e6141e5ad",
       "triggerType" : "PUSH"
     }, {
       "hash" : "db5d2e491af1b183719c3365c57c3d4e5ee55b2a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8526",
       "triggerID" : "db5d2e491af1b183719c3365c57c3d4e5ee55b2a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef1570ccdb0c2f1d148ce20d2cfb17bf02afafdd",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8534",
       "triggerID" : "ef1570ccdb0c2f1d148ce20d2cfb17bf02afafdd",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8ad8ba6a6974bdc2295b372d538a95ec1a086b63",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8566",
       "triggerID" : "8ad8ba6a6974bdc2295b372d538a95ec1a086b63",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8ad8ba6a6974bdc2295b372d538a95ec1a086b63 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=8566) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] aljoscha commented on pull request #13740: [FLINK-19758][filesystem] Implement a new unified file sink based on the new sink API

Posted by GitBox <gi...@apache.org>.
aljoscha commented on pull request #13740:
URL: https://github.com/apache/flink/pull/13740#issuecomment-722274972


   Merged!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org