You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/10/05 07:43:00 UTC

[jira] [Commented] (FLINK-9752) Add an S3 RecoverableWriter

    [ https://issues.apache.org/jira/browse/FLINK-9752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16639420#comment-16639420 ] 

ASF GitHub Bot commented on FLINK-9752:
---------------------------------------

kl0u opened a new pull request #6795: [FLINK-9752][s3-fs-connector] Add s3 recoverable writer.
URL: https://github.com/apache/flink/pull/6795
 
 
   ## What is the purpose of the change
   
   Adds the recoverable writer for S3. The new recoverable writer is only available for **Hadoop S3**
   (**not Presto** for now) and uses the MultiPart feature to upload part files.
   
   The user is supposed to call `fs.createRecoverableWriter()`, which will give back an `S3RecoverableWriter`. This allows to: `open(Path)` which give an `S3RecoverableFsDataOutputStream` or call `recover()` a previous such stream from a checkpoint.
   
   The main functionality is implemented by the `S3RecoverableFsDataOutputStream`. This uses:
   1) a `RefCountedFSOutputStream` - a file stream backed by a local tmp file which is reference counted so when there are no references it gets deleted - to write a part of the multi-part upload.
   2) a `RecoverableMultiPartUpload` to take snapshots of in-flight Multi-Part Uploads (MPU) and upload already ready parts.
   3) From the stream, the user can also get a `Committer` which allows him to complete the MPU, i.e. "publish" the data. 
   
   The whole process is a two-phase commit, with files being staged for commit, and then committed as a unit.
   
   **Checkpointing / Recovery**
   
   As the user writes data to the stream, when the part reaches a minimum size, it gets uploaded to S3, and a new part-file is opened. An uploaded part is identified by its `PartETag` which is further used when "committing" the MPU. So the list of `PartETag` 's associated with the MPU are stored in state. 
   
   When `persist`  is called, the "current" part file which has not yet reached the minimum size is uploaded to S3 as an independent object (not as part of the MPU), and its handle is stored in state. Apart from that, we keep writing to the same part file.
   
   Upon recovery, we retrieve the set of valid `PartEtags` from the state and we download the "in-progress" part file, which was uploaded as an independent object, and we resume writing from there. Any uploaded `PartEtags` between the last successful checkpoint and the failure are simply discarded. This alleviates any need for active `truncating`.
   
   ## Brief change log
   
   This PR consists of mainly new code so the changelog is not much help. The reviewer can find above a description of what the code does.
   
   ## Verifying this change
   
   This change added Unit Tests and can be verified as follows:
   * `RefCountedBufferingFileStreamTest`
   * `RefCountedFileTest`
   * `IncompletePartPrefixTest`
   * `S3RecoverableFsDataOutputStreamTest`
   
   And semi end-to-end test against actually S3:
   * `HadoopS3RecoverableWriterTest`
   * `HadoopS3RecoverableWriterExceptionTest`
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / n**o** / don't know)
     - The S3 file system connector: (**yes** / no / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (**yes** / no)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) yet
   
   R @pnowojski 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Add an S3 RecoverableWriter
> ---------------------------
>
>                 Key: FLINK-9752
>                 URL: https://issues.apache.org/jira/browse/FLINK-9752
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Streaming Connectors
>            Reporter: Stephan Ewen
>            Assignee: Kostas Kloudas
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.7.0, 1.6.2
>
>
> S3 offers persistence only when uploads are complete. That means at the end of simple uploads and uploads of parts of a MultiPartUpload.
> We should implement a RecoverableWriter for S3 that does a MultiPartUpload with a Part per checkpoint.
> Recovering the reader needs the MultiPartUploadID and the list of ETags of previous parts.
> We need additional staging of data in Flink state to work around the fact that
>  - Parts in a MultiPartUpload must be at least 5MB
>  - Part sizes must be known up front. (Note that data can still be streamed in the upload)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)