You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Josh Forman-Gornall (JIRA)" <ji...@apache.org> on 2016/07/11 14:59:11 UTC

[jira] [Comment Edited] (FLINK-4190) Generalise RollingSink to work with arbitrary buckets

    [ https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15370921#comment-15370921 ] 

Josh Forman-Gornall edited comment on FLINK-4190 at 7/11/16 2:58 PM:
---------------------------------------------------------------------

I needed this feature and already have a working solution - the changes I made can be found here:
https://github.com/joshfg/flink/tree/flink-4190

If people think these changes are good and will be useful to others, I will submit a PR.

There are three main changes:
- The Bucketer interface now takes the sink's input element as a generic parameter, enabling us to bucket based on attributes of the sink's input.
- While maintaining the same rolling mechanics of the existing implementation (e.g. rolling when the file size reaches a threshold), the sink implementation can now have many 'active' buckets at any point in time. The checkpointing mechanics have been extended to support maintaining the state of multiple active buckets and files, instead of just one.
- For use cases where the buckets being written to are changing over time, the sink now needs to determine when a bucket has become 'inactive', in order to flush and close the file. In the existing implementation, this is simply when the bucket path changes. Instead, we now determine a bucket as inactive if it hasn't been written to recently. To support this there are two additional user configurable settings: inactiveBucketCheckInterval and inactiveBucketThreshold.

Also, I've renamed RollingSink to BucketingSink to reflect its more general use.

Any comments are welcome!



was (Author: joshfg):
I needed this feature and already have a working solution - the changes I made can be found here:
https://github.com/joshfg/flink/tree/flink-4190

If people think these changes are good and will be useful to others, I will submit a PR.

There are three main changes:
- The Bucketer interface now takes the sink's input element as a generic parameter, enabling us to bucket based on attributes of the sink's input.
- While maintaining the same rolling mechanics of the existing implementation (e.g. rolling when the file size reaches a threshold), the sink implementation can now have many 'active' buckets at any point in time. The checkpointing mechanics have been extended to support maintaining the state of multiple active buckets and files, instead of just one.
- For use cases where the buckets being written to are changing over time, the sink now needs to determine when a bucket has become 'inactive', in order to flush and close the file. In the existing implementation, this is simply when the bucket path changes. Instead, we now determine a bucket as inactive if it hasn't been written to recently. To support this there are two additional user configurable settings: inactiveBucketCheckInterval and inactiveBucketThreshold.

Also, I've renamed RollingSink to BucketingSink to reflect its more general use.

Any comments are welcome, thanks!


> Generalise RollingSink to work with arbitrary buckets
> -----------------------------------------------------
>
>                 Key: FLINK-4190
>                 URL: https://issues.apache.org/jira/browse/FLINK-4190
>             Project: Flink
>          Issue Type: Improvement
>          Components: filesystem-connector, Streaming Connectors
>            Reporter: Josh Forman-Gornall
>            Assignee: Josh Forman-Gornall
>            Priority: Minor
>
> The current RollingSink implementation appears to be intended for writing to directories that are bucketed by system time (e.g. minutely) and to only be writing to one file within one bucket at any point in time. When the system time determines that the current bucket should be changed, the current bucket and file are closed and a new bucket and file are created. The sink cannot be used for the more general problem of writing to arbitrary buckets, perhaps determined by an attribute on the element/tuple being processed.
> There are three limitations which prevent the existing sink from being used for more general problems:
> - Only bucketing by the current system time is supported, and not by e.g. an attribute of the element being processed by the sink.
> - Whenever the sink sees a change in the bucket being written to, it flushes the file and moves on to the new bucket. Therefore the sink cannot have more than one bucket/file open at a time. Additionally the checkpointing mechanics only support saving the state of one active bucket and file.
> - The sink determines that it should 'close' an active bucket and file when the bucket path changes. We need another way to determine when a bucket has become inactive and needs to be closed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)