You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by ro...@hyatt.com on 2016/10/12 12:10:48 UTC

bucketing in RollingSink

Hi Flinksters,

At one stage in my data stream, I want to save the stream to a set of rolling files where the file name used (i.e. the bucket) is chosen based on an attribute of each data record.  Specifically, I’m using a windowing function to create aggregates of certain metrics and I want to save that data in a file with a name that identifies the window.

I was planning to write my own bucketer for this, but in version 1.1.2 the Bucketer interface doesn’t allow for the element being processed to be passed to the relevant methods (e.g. getNextBucketPath and shouldStartNewBucket).  I see that this is taken care of in 1.2, but since that isn’t available yet, can anyone recommend a workaround?  Alternatively, is there a way to have the DateTimeBucketer use assigned timestamps instead of system time?

________________________________
The information contained in this communication is confidential and intended only for the use of the recipient named above, and may be legally privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this communication in error, please resend it to the sender and delete the original message and copy of it from your computer system. Opinions, conclusions and other information in this message that do not relate to our official business should be understood as neither given nor endorsed by the company.

Re: bucketing in RollingSink

Posted by ro...@hyatt.com.
Hi Robert,

Thanks for the info on 1.2.

I copied over all 4 classes in fs.bucketing.  I had to make some changes in my version of BucketingSink.  BucketingSink relies on an updated StreamingRuntimeContext that provides a TimeServiceProvider, which the version from 1.1.2 doesn’t.

It was easy enough to get around that by changing each occurrence of this:
processingTimeService.getCurrentProcessingTime();
to this:
((StreamingRuntimeContext) getRuntimeContext()).registerTimer(currentProcessingTime + inactiveBucketCheckInterval, this);

And this:
processingTimeService.registerTimer(currentProcessingTime + inactiveBucketCheckInterval, this);
to this:
((StreamingRuntimeContext) getRuntimeContext()).registerTimer(currentProcessingTime + inactiveBucketCheckInterval, this);

After making those changes, I implemented a custom Bucketer and was able to use my slightly altered version of BucketingSink without any problems.

Thanks!


From: Robert Metzger <rm...@apache.org>
Reply-To: "user@flink.apache.org" <us...@flink.apache.org>
Date: Thursday, October 13, 2016 at 11:35 AM
To: "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: bucketing in RollingSink

Hi,

Let me know if you come across any issues with approach #2.

Predicting release dates is always hard. I would say we are in the middle of the development cycle. I can imagine that we have 1.2 released by the end of this year.
There is no tool that shows the progress, but I've started a Wiki page recently, tracking which features are probably into which release: https://cwiki.apache.org/confluence/display/FLINK/Flink+Release+and+Feature+Plan and by tracking those features (via JIRA, pull requests, discussions on the dev@ list), you can estimate the progress.

Regards,
Robert



On Wed, Oct 12, 2016 at 6:33 PM, <ro...@hyatt.com>> wrote:
Hi Robert,

Thanks!   I’ll likely pursue option #2 and see if I can copy over the code from org.apache.flink….fs.bucketing.

Do you know a general timeline for when 1.2 will be released or perhaps a location where I could follow its progress?

Thanks again!

From: Robert Metzger <rm...@apache.org>>
Reply-To: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Date: Wednesday, October 12, 2016 at 5:50 PM
To: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: Re: bucketing in RollingSink

Hi Robert,

I see two possible workarounds:
1) You use the unreleased Flink 1.2-SNAPSHOT version. From time to time, there are some unstable commits in that version, but most of the time, its quite stable.
We provide nightly binaries and maven artifacts for snapshot versions here: http://flink.apache.org/contribute-code.html#snapshots-nightly-builds

2) If that's too risky for you, you can also copy the code of the new (1.2-SNAPSHOT) bucketer from master into your project. The Apache license allows you to copy the code into your own projects, and I think the bucketer code doesn't rely on any features / APIs added in 1.2-SNAPSHOT, so you can probably run the new code on Flink 1.1 as well.

I hope that helps,

Regards,
Robert


On Wed, Oct 12, 2016 at 2:10 PM, <ro...@hyatt.com>> wrote:
Hi Flinksters,

At one stage in my data stream, I want to save the stream to a set of rolling files where the file name used (i.e. the bucket) is chosen based on an attribute of each data record.  Specifically, I’m using a windowing function to create aggregates of certain metrics and I want to save that data in a file with a name that identifies the window.

I was planning to write my own bucketer for this, but in version 1.1.2 the Bucketer interface doesn’t allow for the element being processed to be passed to the relevant methods (e.g. getNextBucketPath and shouldStartNewBucket).  I see that this is taken care of in 1.2, but since that isn’t available yet, can anyone recommend a workaround?  Alternatively, is there a way to have the DateTimeBucketer use assigned timestamps instead of system time?

________________________________
The information contained in this communication is confidential and intended only for the use of the recipient named above, and may be legally privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this communication in error, please resend it to the sender and delete the original message and copy of it from your computer system. Opinions, conclusions and other information in this message that do not relate to our official business should be understood as neither given nor endorsed by the company.


________________________________
The information contained in this communication is confidential and intended only for the use of the recipient named above, and may be legally privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this communication in error, please resend it to the sender and delete the original message and copy of it from your computer system. Opinions, conclusions and other information in this message that do not relate to our official business should be understood as neither given nor endorsed by the company.


________________________________
The information contained in this communication is confidential and intended only for the use of the recipient named above, and may be legally privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this communication in error, please resend it to the sender and delete the original message and copy of it from your computer system. Opinions, conclusions and other information in this message that do not relate to our official business should be understood as neither given nor endorsed by the company.

Re: bucketing in RollingSink

Posted by Robert Metzger <rm...@apache.org>.
Hi,

Let me know if you come across any issues with approach #2.

Predicting release dates is always hard. I would say we are in the middle
of the development cycle. I can imagine that we have 1.2 released by the
end of this year.
There is no tool that shows the progress, but I've started a Wiki page
recently, tracking which features are probably into which release:
https://cwiki.apache.org/confluence/display/FLINK/Flink+Release+and+Feature+Plan
and by tracking those features (via JIRA, pull requests, discussions on the
dev@ list), you can estimate the progress.

Regards,
Robert



On Wed, Oct 12, 2016 at 6:33 PM, <ro...@hyatt.com> wrote:

> Hi Robert,
>
>
>
> Thanks!   I’ll likely pursue option #2 and see if I can copy over the code
> from org.apache.flink….fs.bucketing.
>
>
>
> Do you know a general timeline for when 1.2 will be released or perhaps a
> location where I could follow its progress?
>
>
>
> Thanks again!
>
>
>
> *From: *Robert Metzger <rm...@apache.org>
> *Reply-To: *"user@flink.apache.org" <us...@flink.apache.org>
> *Date: *Wednesday, October 12, 2016 at 5:50 PM
> *To: *"user@flink.apache.org" <us...@flink.apache.org>
> *Subject: *Re: bucketing in RollingSink
>
>
>
> Hi Robert,
>
>
>
> I see two possible workarounds:
>
> 1) You use the unreleased Flink 1.2-SNAPSHOT version. From time to time,
> there are some unstable commits in that version, but most of the time, its
> quite stable.
> We provide nightly binaries and maven artifacts for snapshot versions
> here: http://flink.apache.org/contribute-code.html#
> snapshots-nightly-builds
>
>
>
> 2) If that's too risky for you, you can also copy the code of the new
> (1.2-SNAPSHOT) bucketer from master into your project. The Apache license
> allows you to copy the code into your own projects, and I think the
> bucketer code doesn't rely on any features / APIs added in 1.2-SNAPSHOT, so
> you can probably run the new code on Flink 1.1 as well.
>
>
>
> I hope that helps,
>
>
>
> Regards,
>
> Robert
>
>
>
>
>
> On Wed, Oct 12, 2016 at 2:10 PM, <ro...@hyatt.com> wrote:
>
> Hi Flinksters,
>
>
>
> At one stage in my data stream, I want to save the stream to a set of
> rolling files where the file name used (i.e. the bucket) is chosen based on
> an attribute of each data record.  Specifically, I’m using a windowing
> function to create aggregates of certain metrics and I want to save that
> data in a file with a name that identifies the window.
>
>
>
> I was planning to write my own bucketer for this, but in version 1.1.2 the
> Bucketer interface doesn’t allow for the element being processed to be
> passed to the relevant methods (e.g. getNextBucketPath and
> shouldStartNewBucket).  I see that this is taken care of in 1.2, but since
> that isn’t available yet, can anyone recommend a workaround?
> Alternatively, is there a way to have the DateTimeBucketer use assigned
> timestamps instead of system time?
>
>
> ------------------------------
>
> The information contained in this communication is confidential and
> intended only for the use of the recipient named above, and may be legally
> privileged and exempt from disclosure under applicable law. If the reader
> of this message is not the intended recipient, you are hereby notified that
> any dissemination, distribution or copying of this communication is
> strictly prohibited. If you have received this communication in error,
> please resend it to the sender and delete the original message and copy of
> it from your computer system. Opinions, conclusions and other information
> in this message that do not relate to our official business should be
> understood as neither given nor endorsed by the company.
>
>
>
> ------------------------------
> The information contained in this communication is confidential and
> intended only for the use of the recipient named above, and may be legally
> privileged and exempt from disclosure under applicable law. If the reader
> of this message is not the intended recipient, you are hereby notified that
> any dissemination, distribution or copying of this communication is
> strictly prohibited. If you have received this communication in error,
> please resend it to the sender and delete the original message and copy of
> it from your computer system. Opinions, conclusions and other information
> in this message that do not relate to our official business should be
> understood as neither given nor endorsed by the company.
>

Re: bucketing in RollingSink

Posted by ro...@hyatt.com.
Hi Robert,

Thanks!   I’ll likely pursue option #2 and see if I can copy over the code from org.apache.flink….fs.bucketing.

Do you know a general timeline for when 1.2 will be released or perhaps a location where I could follow its progress?

Thanks again!

From: Robert Metzger <rm...@apache.org>
Reply-To: "user@flink.apache.org" <us...@flink.apache.org>
Date: Wednesday, October 12, 2016 at 5:50 PM
To: "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: bucketing in RollingSink

Hi Robert,

I see two possible workarounds:
1) You use the unreleased Flink 1.2-SNAPSHOT version. From time to time, there are some unstable commits in that version, but most of the time, its quite stable.
We provide nightly binaries and maven artifacts for snapshot versions here: http://flink.apache.org/contribute-code.html#snapshots-nightly-builds

2) If that's too risky for you, you can also copy the code of the new (1.2-SNAPSHOT) bucketer from master into your project. The Apache license allows you to copy the code into your own projects, and I think the bucketer code doesn't rely on any features / APIs added in 1.2-SNAPSHOT, so you can probably run the new code on Flink 1.1 as well.

I hope that helps,

Regards,
Robert


On Wed, Oct 12, 2016 at 2:10 PM, <ro...@hyatt.com>> wrote:
Hi Flinksters,

At one stage in my data stream, I want to save the stream to a set of rolling files where the file name used (i.e. the bucket) is chosen based on an attribute of each data record.  Specifically, I’m using a windowing function to create aggregates of certain metrics and I want to save that data in a file with a name that identifies the window.

I was planning to write my own bucketer for this, but in version 1.1.2 the Bucketer interface doesn’t allow for the element being processed to be passed to the relevant methods (e.g. getNextBucketPath and shouldStartNewBucket).  I see that this is taken care of in 1.2, but since that isn’t available yet, can anyone recommend a workaround?  Alternatively, is there a way to have the DateTimeBucketer use assigned timestamps instead of system time?

________________________________
The information contained in this communication is confidential and intended only for the use of the recipient named above, and may be legally privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this communication in error, please resend it to the sender and delete the original message and copy of it from your computer system. Opinions, conclusions and other information in this message that do not relate to our official business should be understood as neither given nor endorsed by the company.


________________________________
The information contained in this communication is confidential and intended only for the use of the recipient named above, and may be legally privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this communication in error, please resend it to the sender and delete the original message and copy of it from your computer system. Opinions, conclusions and other information in this message that do not relate to our official business should be understood as neither given nor endorsed by the company.

Re: bucketing in RollingSink

Posted by Robert Metzger <rm...@apache.org>.
Hi Robert,

I see two possible workarounds:
1) You use the unreleased Flink 1.2-SNAPSHOT version. From time to time,
there are some unstable commits in that version, but most of the time, its
quite stable.
We provide nightly binaries and maven artifacts for snapshot versions here:
http://flink.apache.org/contribute-code.html#snapshots-nightly-builds

2) If that's too risky for you, you can also copy the code of the new
(1.2-SNAPSHOT) bucketer from master into your project. The Apache license
allows you to copy the code into your own projects, and I think the
bucketer code doesn't rely on any features / APIs added in 1.2-SNAPSHOT, so
you can probably run the new code on Flink 1.1 as well.

I hope that helps,

Regards,
Robert


On Wed, Oct 12, 2016 at 2:10 PM, <ro...@hyatt.com> wrote:

> Hi Flinksters,
>
>
>
> At one stage in my data stream, I want to save the stream to a set of
> rolling files where the file name used (i.e. the bucket) is chosen based on
> an attribute of each data record.  Specifically, I’m using a windowing
> function to create aggregates of certain metrics and I want to save that
> data in a file with a name that identifies the window.
>
>
>
> I was planning to write my own bucketer for this, but in version 1.1.2 the
> Bucketer interface doesn’t allow for the element being processed to be
> passed to the relevant methods (e.g. getNextBucketPath and
> shouldStartNewBucket).  I see that this is taken care of in 1.2, but since
> that isn’t available yet, can anyone recommend a workaround?
> Alternatively, is there a way to have the DateTimeBucketer use assigned
> timestamps instead of system time?
>
> ------------------------------
> The information contained in this communication is confidential and
> intended only for the use of the recipient named above, and may be legally
> privileged and exempt from disclosure under applicable law. If the reader
> of this message is not the intended recipient, you are hereby notified that
> any dissemination, distribution or copying of this communication is
> strictly prohibited. If you have received this communication in error,
> please resend it to the sender and delete the original message and copy of
> it from your computer system. Opinions, conclusions and other information
> in this message that do not relate to our official business should be
> understood as neither given nor endorsed by the company.
>