You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Christopher Santiago <ch...@ninjametrics.com> on 2016/04/28 06:03:40 UTC

Multiple windows with large number of partitions

I've been working through the flink demo applications and started in on a
prototype, but have run into an issue with how to approach the problem of
getting a daily unique user count from a traffic stream.  I'm using a time
characteristic event time.

Sample event stream
(timestamp,userid):
2015-12-02T01:13:21.002Z,bc030136a91aa46eb436dcb28fa72fed
2015-12-02T01:13:21.003Z,bc030136a91aa46eb436dcb28fa72fed
2015-12-02T01:37:48.003Z,bc030136a91aa46eb436dcb28fa72fed
2015-12-02T01:37:48.004Z,bc030136a91aa46eb436dcb28fa72fed
2015-12-02T02:02:15.004Z,bc030136a91aa46eb436dcb28fa72fed
2015-12-02T00:00:00.000Z,5dd63d9756a975d0f4be6a6856005381
2015-12-02T00:16:58.000Z,5dd63d9756a975d0f4be6a6856005381
2015-12-02T00:00:00.000Z,ccd72e4535c92c499bb66eea6f4f9aab
2015-12-02T00:14:56.000Z,ccd72e4535c92c499bb66eea6f4f9aab
2015-12-02T00:14:56.001Z,ccd72e4535c92c499bb66eea6f4f9aab
2015-12-02T00:29:52.001Z,ccd72e4535c92c499bb66eea6f4f9aab
2015-12-02T00:29:52.002Z,ccd72e4535c92c499bb66eea6f4f9aab
2015-12-02T00:44:48.002Z,ccd72e4535c92c499bb66eea6f4f9aab

Requirements:
1.  Get a count of the unique users by day.
2.  Early results should be emitted as quickly as possible.  (I've been
trying to use 30/60 seconds windows)
3.  Events are accepted up to 2 days late.

I've used the following as guidance:

EventTimeTriggerWithEarlyAndLateFiring
https://raw.githubusercontent.com/kl0u/flink-examples/master/src/main/java/com/dataartisans/flinksolo/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java

Multi-window transformations
http://mail-archives.apache.org/mod_mbox/flink-user/201604.mbox/%3CBAY182-W870B521BDC5709973990D5ED9A0%40phx.gbl%3E

Summing over aggregate windows
http://stackoverflow.com/questions/36791816/how-to-declare-1-minute-tumbling-window

but I can't get the reduce/aggregation logic working correctly.  Here's a
sample of how I have the windows setup with a datastream of tuple3 with
timestamp, date only, userid:

DataStream<Tuple3<DateTime, String, String>> uniqueLogins = logins
.keyBy(1,2)
.timeWindow(Time.days(1))
.trigger(EventTimeNoLog.create()  //Same as
EventTimeTriggerWithEarlyAndLateFiring, just modified logging since it is
potentially 2-3x per event read in
.withEarlyFiringEvery(Time.seconds(60))
.withLateFiringEvery(Time.seconds(600))
.withAllowedLateness(Time.days(2)))
//Reduce to earliest timestamp for a given day for a user
.reduce(new ReduceFunction<Tuple3<DateTime, String, String>>() {
public Tuple3<DateTime, String, String> reduce(Tuple3<DateTime, String,
String> event1, Tuple3<DateTime, String, String> event2) {
return event1;
}
});

SingleOutputStreamOperator<Tuple2<String, Long>> window = uniqueLogins
.timeWindowAll(Time.days(1))
.trigger(EventTimeTriggerWithEarlyAndLateFiring.create()
.withEarlyFiringEvery(Time.seconds(60))
.withLateFiringEvery(Time.seconds(600))
.withAllowedLateness(Time.days(2))
.aggregator())  //Modified EventTimeTriggerWithEarlyAndLateFiring that does
a fire_and_purge on onProcessingTime when aggregator is set
//Manually count
.apply(new AllWindowFunction<Tuple3<DateTime,String,String>, Tuple2<String,
Long>, TimeWindow>() {
@Override
public void apply(TimeWindow window,
Iterable<Tuple3<DateTime,String,String>> input, Collector<Tuple2<String,
Long>> out) throws Exception {
int count = 0;
String windowTime = null;

 for (Tuple3<DateTime,String,String> login: input) {
 windowTime = login.f1;
 count++;
 }
 out.collect (new Tuple2<String, Long>(windowTime, new Long(count)));
}
});

>From the logging I've put in place, it seems that there is a performance
issue with the first keyBy where there now a unique window for each
date/user combination (in my sample data around 500k windows) which when
reducing is not emitting results at a constant enough rate for the second
window to perform its aggregation at a scheduleable interval.  Is there a
better approach to performing this type of calculation directly in flink?

Re: Multiple windows with large number of partitions

Posted by Aljoscha Krettek <al...@apache.org>.
Yes, please go ahead. That would be helpful.

On Mon, 2 May 2016 at 21:56 Christopher Santiago <ch...@ninjametrics.com>
wrote:

> Hi Aljoscha,
>
> Yes, there is still a high partition/window count since I have to keyby
> the userid so that I get unique users.  I believe what I see happening is
> that the second window with the timeWindowAll is not getting all the
> results or the results from the previous window are changing when the
> second window is running.  I can see the date/unique user count increase
> and decrease as it is running for a particular day.
>
> I can share the eclipse project and the sample data file I am working off
> of with you if that would be helpful.
>
> Thanks,
> Chris
>
> On Mon, May 2, 2016 at 12:55 AM, Aljoscha Krettek [via Apache Flink User
> Mailing List archive.] <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=6626&i=0>> wrote:
>
>> Hi,
>> what do you mean by "still experiencing the same issues"? Is the key
>> count still very hight, i.e. 500k windows?
>>
>> For the watermark generation, specifying a lag of 2 days is very
>> conservative. If the watermark is this conservative I guess there will
>> never arrive elements that are behind the watermark, thus you wouldn't need
>> the late-element handling in your triggers. The late-element handling in
>> Triggers is only required to compensate for the fact that the watermark can
>> be a heuristic and not always correct.
>>
>> Cheers,
>> Aljoscha
>>
>> On Thu, 28 Apr 2016 at 21:24 Christopher Santiago <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=6601&i=0>> wrote:
>>
>>> Hi Aljoscha,
>>>
>>>
>>> Aljoscha Krettek wrote
>>> >>is there are reason for keying on both the "date only" field and the
>>> "userid". I think you should be fine by just specifying that you want
>>> 1-day
>>> windows on your timestamps.
>>>
>>> My mistake, this was from earlier tests that I had performed.  I removed
>>> it
>>> and went to keyBy(2) and I am still experiencing the same issues.
>>>
>>>
>>> Aljoscha Krettek wrote
>>> >>Also, do you have a timestamp extractor in place that takes the
>>> timestamp
>>> from your data and sets it as the internal timestamp field.
>>>
>>> Yes there is, it is from the BoundedOutOfOrdernessGenerator example:
>>>
>>>     public static class BoundedOutOfOrdernessGenerator implements
>>> AssignerWithPeriodicWatermarks<Tuple3&lt;DateTime, String, String>> {
>>>         private static final long serialVersionUID = 1L;
>>>         private final long maxOutOfOrderness =
>>> Time.days(2).toMilliseconds();
>>>         private long currentMaxTimestamp;
>>>
>>>         @Override
>>>         public long extractTimestamp(Tuple3<DateTime, String, String>
>>> element, long previousElementTimestamp) {
>>>             long timestamp = element.f0.getMillis();
>>>             currentMaxTimestamp = Math.max(timestamp,
>>> currentMaxTimestamp);
>>>             return timestamp;
>>>         }
>>>
>>>         @Override
>>>         public Watermark getCurrentWatermark() {
>>>             return new Watermark(currentMaxTimestamp -
>>> maxOutOfOrderness);
>>>         }
>>>     }
>>>
>>> Thanks,
>>> Chris
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Multiple-windows-with-large-number-of-partitions-tp6521p6562.html
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive at Nabble.com.
>>>
>>
>>
>> ------------------------------
>> If you reply to this email, your message will be added to the discussion
>> below:
>>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Multiple-windows-with-large-number-of-partitions-tp6521p6601.html
>> To unsubscribe from Multiple windows with large number of partitions, click
>> here.
>> NAML
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>>
>
>
> ------------------------------
> View this message in context: Re: Multiple windows with large number of
> partitions
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Multiple-windows-with-large-number-of-partitions-tp6521p6626.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com.
>

Re: Multiple windows with large number of partitions

Posted by Christopher Santiago <ch...@ninjametrics.com>.
Hi Aljoscha,

Yes, there is still a high partition/window count since I have to keyby the
userid so that I get unique users.  I believe what I see happening is that
the second window with the timeWindowAll is not getting all the results or
the results from the previous window are changing when the second window is
running.  I can see the date/unique user count increase and decrease as it
is running for a particular day.

I can share the eclipse project and the sample data file I am working off
of with you if that would be helpful.

Thanks,
Chris

On Mon, May 2, 2016 at 12:55 AM, Aljoscha Krettek [via Apache Flink User
Mailing List archive.] <ml...@n4.nabble.com> wrote:

> Hi,
> what do you mean by "still experiencing the same issues"? Is the key count
> still very hight, i.e. 500k windows?
>
> For the watermark generation, specifying a lag of 2 days is very
> conservative. If the watermark is this conservative I guess there will
> never arrive elements that are behind the watermark, thus you wouldn't need
> the late-element handling in your triggers. The late-element handling in
> Triggers is only required to compensate for the fact that the watermark can
> be a heuristic and not always correct.
>
> Cheers,
> Aljoscha
>
> On Thu, 28 Apr 2016 at 21:24 Christopher Santiago <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=6601&i=0>> wrote:
>
>> Hi Aljoscha,
>>
>>
>> Aljoscha Krettek wrote
>> >>is there are reason for keying on both the "date only" field and the
>> "userid". I think you should be fine by just specifying that you want
>> 1-day
>> windows on your timestamps.
>>
>> My mistake, this was from earlier tests that I had performed.  I removed
>> it
>> and went to keyBy(2) and I am still experiencing the same issues.
>>
>>
>> Aljoscha Krettek wrote
>> >>Also, do you have a timestamp extractor in place that takes the
>> timestamp
>> from your data and sets it as the internal timestamp field.
>>
>> Yes there is, it is from the BoundedOutOfOrdernessGenerator example:
>>
>>     public static class BoundedOutOfOrdernessGenerator implements
>> AssignerWithPeriodicWatermarks<Tuple3&lt;DateTime, String, String>> {
>>         private static final long serialVersionUID = 1L;
>>         private final long maxOutOfOrderness =
>> Time.days(2).toMilliseconds();
>>         private long currentMaxTimestamp;
>>
>>         @Override
>>         public long extractTimestamp(Tuple3<DateTime, String, String>
>> element, long previousElementTimestamp) {
>>             long timestamp = element.f0.getMillis();
>>             currentMaxTimestamp = Math.max(timestamp,
>> currentMaxTimestamp);
>>             return timestamp;
>>         }
>>
>>         @Override
>>         public Watermark getCurrentWatermark() {
>>             return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
>>         }
>>     }
>>
>> Thanks,
>> Chris
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Multiple-windows-with-large-number-of-partitions-tp6521p6562.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>
> ------------------------------
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Multiple-windows-with-large-number-of-partitions-tp6521p6601.html
> To unsubscribe from Multiple windows with large number of partitions, click
> here
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=6521&code=Y2hyaXNAbmluamFtZXRyaWNzLmNvbXw2NTIxfC01MTI2ODMwNjU=>
> .
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Multiple-windows-with-large-number-of-partitions-tp6521p6626.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Multiple windows with large number of partitions

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
what do you mean by "still experiencing the same issues"? Is the key count
still very hight, i.e. 500k windows?

For the watermark generation, specifying a lag of 2 days is very
conservative. If the watermark is this conservative I guess there will
never arrive elements that are behind the watermark, thus you wouldn't need
the late-element handling in your triggers. The late-element handling in
Triggers is only required to compensate for the fact that the watermark can
be a heuristic and not always correct.

Cheers,
Aljoscha

On Thu, 28 Apr 2016 at 21:24 Christopher Santiago <ch...@ninjametrics.com>
wrote:

> Hi Aljoscha,
>
>
> Aljoscha Krettek wrote
> >>is there are reason for keying on both the "date only" field and the
> "userid". I think you should be fine by just specifying that you want 1-day
> windows on your timestamps.
>
> My mistake, this was from earlier tests that I had performed.  I removed it
> and went to keyBy(2) and I am still experiencing the same issues.
>
>
> Aljoscha Krettek wrote
> >>Also, do you have a timestamp extractor in place that takes the timestamp
> from your data and sets it as the internal timestamp field.
>
> Yes there is, it is from the BoundedOutOfOrdernessGenerator example:
>
>     public static class BoundedOutOfOrdernessGenerator implements
> AssignerWithPeriodicWatermarks<Tuple3&lt;DateTime, String, String>> {
>         private static final long serialVersionUID = 1L;
>         private final long maxOutOfOrderness =
> Time.days(2).toMilliseconds();
>         private long currentMaxTimestamp;
>
>         @Override
>         public long extractTimestamp(Tuple3<DateTime, String, String>
> element, long previousElementTimestamp) {
>             long timestamp = element.f0.getMillis();
>             currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
>             return timestamp;
>         }
>
>         @Override
>         public Watermark getCurrentWatermark() {
>             return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
>         }
>     }
>
> Thanks,
> Chris
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Multiple-windows-with-large-number-of-partitions-tp6521p6562.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: Multiple windows with large number of partitions

Posted by Christopher Santiago <ch...@ninjametrics.com>.
Hi Aljoscha,


Aljoscha Krettek wrote
>>is there are reason for keying on both the "date only" field and the
"userid". I think you should be fine by just specifying that you want 1-day
windows on your timestamps.

My mistake, this was from earlier tests that I had performed.  I removed it
and went to keyBy(2) and I am still experiencing the same issues.


Aljoscha Krettek wrote
>>Also, do you have a timestamp extractor in place that takes the timestamp
from your data and sets it as the internal timestamp field. 

Yes there is, it is from the BoundedOutOfOrdernessGenerator example:

    public static class BoundedOutOfOrdernessGenerator implements
AssignerWithPeriodicWatermarks<Tuple3&lt;DateTime, String, String>> {
    	private static final long serialVersionUID = 1L;
    	private final long maxOutOfOrderness = Time.days(2).toMilliseconds();
        private long currentMaxTimestamp;

        @Override
        public long extractTimestamp(Tuple3<DateTime, String, String>
element, long previousElementTimestamp) {
            long timestamp = element.f0.getMillis(); 
            currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
            return timestamp;
        }

        @Override
        public Watermark getCurrentWatermark() {
            return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
        }
    }

Thanks,
Chris



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Multiple-windows-with-large-number-of-partitions-tp6521p6562.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Multiple windows with large number of partitions

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
is there are reason for keying on both the "date only" field and the
"userid". I think you should be fine by just specifying that you want 1-day
windows on your timestamps.

Also, do you have a timestamp extractor in place that takes the timestamp
from your data and sets it as the internal timestamp field. This is
explained in more detail here:
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/event_timestamps_watermarks.html#timestamp-assigners--watermark-generators

Cheers,
Aljoscha

On Thu, 28 Apr 2016 at 06:04 Christopher Santiago <ch...@ninjametrics.com>
wrote:

> I've been working through the flink demo applications and started in on a
> prototype, but have run into an issue with how to approach the problem of
> getting a daily unique user count from a traffic stream.  I'm using a time
> characteristic event time.
>
> Sample event stream
> (timestamp,userid):
> 2015-12-02T01:13:21.002Z,bc030136a91aa46eb436dcb28fa72fed
> 2015-12-02T01:13:21.003Z,bc030136a91aa46eb436dcb28fa72fed
> 2015-12-02T01:37:48.003Z,bc030136a91aa46eb436dcb28fa72fed
> 2015-12-02T01:37:48.004Z,bc030136a91aa46eb436dcb28fa72fed
> 2015-12-02T02:02:15.004Z,bc030136a91aa46eb436dcb28fa72fed
> 2015-12-02T00:00:00.000Z,5dd63d9756a975d0f4be6a6856005381
> 2015-12-02T00:16:58.000Z,5dd63d9756a975d0f4be6a6856005381
> 2015-12-02T00:00:00.000Z,ccd72e4535c92c499bb66eea6f4f9aab
> 2015-12-02T00:14:56.000Z,ccd72e4535c92c499bb66eea6f4f9aab
> 2015-12-02T00:14:56.001Z,ccd72e4535c92c499bb66eea6f4f9aab
> 2015-12-02T00:29:52.001Z,ccd72e4535c92c499bb66eea6f4f9aab
> 2015-12-02T00:29:52.002Z,ccd72e4535c92c499bb66eea6f4f9aab
> 2015-12-02T00:44:48.002Z,ccd72e4535c92c499bb66eea6f4f9aab
>
> Requirements:
> 1.  Get a count of the unique users by day.
> 2.  Early results should be emitted as quickly as possible.  (I've been
> trying to use 30/60 seconds windows)
> 3.  Events are accepted up to 2 days late.
>
> I've used the following as guidance:
>
> EventTimeTriggerWithEarlyAndLateFiring
>
> https://raw.githubusercontent.com/kl0u/flink-examples/master/src/main/java/com/dataartisans/flinksolo/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java
>
> Multi-window transformations
>
> http://mail-archives.apache.org/mod_mbox/flink-user/201604.mbox/%3CBAY182-W870B521BDC5709973990D5ED9A0%40phx.gbl%3E
>
> Summing over aggregate windows
>
> http://stackoverflow.com/questions/36791816/how-to-declare-1-minute-tumbling-window
>
> but I can't get the reduce/aggregation logic working correctly.  Here's a
> sample of how I have the windows setup with a datastream of tuple3 with
> timestamp, date only, userid:
>
> DataStream<Tuple3<DateTime, String, String>> uniqueLogins = logins
> .keyBy(1,2)
> .timeWindow(Time.days(1))
> .trigger(EventTimeNoLog.create()  //Same as
> EventTimeTriggerWithEarlyAndLateFiring, just modified logging since it is
> potentially 2-3x per event read in
> .withEarlyFiringEvery(Time.seconds(60))
> .withLateFiringEvery(Time.seconds(600))
> .withAllowedLateness(Time.days(2)))
> //Reduce to earliest timestamp for a given day for a user
> .reduce(new ReduceFunction<Tuple3<DateTime, String, String>>() {
> public Tuple3<DateTime, String, String> reduce(Tuple3<DateTime, String,
> String> event1, Tuple3<DateTime, String, String> event2) {
> return event1;
> }
> });
>
> SingleOutputStreamOperator<Tuple2<String, Long>> window = uniqueLogins
> .timeWindowAll(Time.days(1))
> .trigger(EventTimeTriggerWithEarlyAndLateFiring.create()
> .withEarlyFiringEvery(Time.seconds(60))
> .withLateFiringEvery(Time.seconds(600))
> .withAllowedLateness(Time.days(2))
> .aggregator())  //Modified EventTimeTriggerWithEarlyAndLateFiring that
> does a fire_and_purge on onProcessingTime when aggregator is set
> //Manually count
> .apply(new AllWindowFunction<Tuple3<DateTime,String,String>,
> Tuple2<String, Long>, TimeWindow>() {
> @Override
> public void apply(TimeWindow window,
> Iterable<Tuple3<DateTime,String,String>> input, Collector<Tuple2<String,
> Long>> out) throws Exception {
> int count = 0;
> String windowTime = null;
>
>  for (Tuple3<DateTime,String,String> login: input) {
>  windowTime = login.f1;
>  count++;
>  }
>  out.collect (new Tuple2<String, Long>(windowTime, new Long(count)));
> }
> });
>
> From the logging I've put in place, it seems that there is a performance
> issue with the first keyBy where there now a unique window for each
> date/user combination (in my sample data around 500k windows) which when
> reducing is not emitting results at a constant enough rate for the second
> window to perform its aggregation at a scheduleable interval.  Is there a
> better approach to performing this type of calculation directly in flink?
>