You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Basanth Gowda <ba...@gmail.com> on 2017/08/13 18:09:14 UTC

Aggregation by key hierarchy

Hi,
I want to aggregate hits by Country, State, City. I would these as tags in
my sample data.

How would I do aggregation at different levels ? Input data would be single
record

Should I do flatMap transformation first and create 3 records from 1 input
record, or is there a better way to do it ?

thank you,
basanth

Re: Aggregation by key hierarchy

Posted by Nico Kruber <ni...@data-artisans.com>.
Hi Basant,
no, you cannot add data streams or re-wire your program during runtime.
As for any other program changes, you'd have to take a savepoint (to keep 
operator state and exactly-once semantics) and restart the new program code 
from there.

For a few combinations, I'd probably choose the second option for simplicity 
but for more combinations, option 1 seems better (mapping your key 
combinations to different tuple-keys, key-by this one and applying window 
operations afterwards).

Option 2 may also require more slots to be available since it has more 
operators [1] and may not be evenly balanced based on your input data and the 
work associated with it. Since option 1's window operators aggregate all 
different tuples, load distribution may be better. Other than that, the 
communication pattern is similar. To get a better understanding of the 
performance impacts, you'd have to benchmark with your aggregation and input 
data though.


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/concepts/
programming-model.html#parallel-dataflows

On Wednesday, 16 August 2017 13:24:42 CEST Basanth Gowda wrote:
> Thanks Nico.
> 
> As there are 2 ways to achieve this which is better ?
> 
> 1st option -> dataStream.flatMap( ... ) -> this takes in out and provides
> me N number of outputs, depending on my key combination . On each of the
> output the same windowing logic is applied
> 
> or the one you suggested
> 
> 2nd option -> use keyBy to create N number of streams
> 
> With the fist option I would use an external config, and it allows me to
> change the number of combinations dynamically at runtime. Would it be
> possible with 2nd option as well ? Can I modify or add data stream at
> runtime without restarting  ?
> 
> On Wed, Aug 16, 2017 at 4:37 AM, Nico Kruber <ni...@data-artisans.com> wrote:
> > [back to the ml...]
> > 
> > also including your other mail's additional content...
> > 
> > > I have been able to do this by the following and repeating this for
> > > every
> > > key + window combination. So in the above case there would be 8 blocks
> > 
> > like
> > 
> > > below. (4 combinations and 2 window period for each combination)
> > > 
> > > modelDataStream.keyBy("campaiginId","addId")
> > > 
> > >         .timeWindow(Time.minutes(1))
> > >         .trigger(CountTrigger.of(2))
> > >         .reduce(..)
> > 
> > As mentioned in my last email, I only see one way for reducing duplication
> > (for the key combinations) but this involves more handling from your side
> > and
> > I'd probably not recommend this. Regarding the different windows, I do not
> > see
> > something you may do otherwise here.
> > 
> > Maybe Aljoscha (cc'd) has an idea of how to do this better
> > 
> > 
> > Nico
> > 
> > On Monday, 14 August 2017 19:08:29 CEST Basanth Gowda wrote:
> > > Hi Nico,
> > > Thank you . This is pretty much what I am doing , was wondering if there
> > 
> > is
> > 
> > > a better way.
> > > 
> > > If there are 10 dimensions on which I want to aggregate with 2 windows -
> > > this would become about 20 different combinations
> > > 
> > > Thank you
> > > Basanth
> > > 
> > > On Mon, Aug 14, 2017 at 12:50 PM Nico Kruber <ni...@data-artisans.com>
> > 
> > wrote:
> > > > Hi Basanth,
> > > > Let's assume you have records of the form
> > > > Record = {timestamp, country, state, city, value}
> > > > Then you'd like to create aggregates, e.g. the average, for the
> > 
> > following
> > 
> > > > combinations?
> > > > 1) avg per country
> > > > 2) avg per state and country
> > > > 3) avg per city and state and country
> > > > 
> > > > * You could create three streams and aggregate each individually:
> > > > DataStream<Record> ds = //...
> > > > DataStream<Record> ds1 = ds.keyBy("country");
> > > > DataStream<Record> ds2 = ds.keyBy("country","state");
> > > > DataStream<Record> ds3 = ds.keyBy("country","state","city");
> > > > // + your aggregation per stream ds1, ds2, ds3
> > > > 
> > > > You probably want to do different things for each of the resulting
> > > > aggregations anyway, so having separate streams is probably right for
> > 
> > you.
> > 
> > > > * Alternatively, you could go with ds1 only and create the aggregates
> > 
> > of
> > 
> > > > the
> > > > per-state (2) and per-city (3) ones in a stateful aggregation function
> > > > yourself, e.g. in a MapState [1]. At the end of your aggregation
> > 
> > window,
> > 
> > > > you
> > > > could then emit those with different keys to be able to distinguish
> > > > between
> > > > them.
> > > > 
> > > > 
> > > > Nico
> > > > 
> > > > [1]
> > > > https://ci.apache.org/projects/flink/flink-docs-> > 
> > release-1.3/dev/stream/
> > 
> > > > state.html
> > > > <https://ci.apache.org/projects/flink/flink-docs-> > 
> > release-1.3/dev/stream/st
> > 
> > > > ate.html>>
> > > > 
> > > > On Sunday, 13 August 2017 23:13:00 CEST Basanth Gowda wrote:
> > > > > For example - this is a sample model from one of the Apache Apex
> > > > > presentation.
> > > > > 
> > > > > I would want to aggregate for different combinations, and different
> > 
> > time
> > 
> > > > > buckets. What is the best way to do this in Flink ?
> > > > > 
> > > > > {"keys":[{"name":"campaignId","type":"integer"},
> > > > > 
> > > > >  {"name":"adId","type":"integer"},
> > > > >  {"name":"creativeId","type":"integer"},
> > > > >  {"name":"publisherId","type":"integer"},
> > > > >  {"name":"adOrderId","type":"integer"}],
> > > > >  "timeBuckets":["1h","1d"],
> > > > > 
> > > > >  "values":
> > > > > [{"name":"impressions","type":"integer","aggregators":["SUM"]}
> > > > > ,
> > > > > 
> > > > >  {"name":"clicks","type":"integer","aggregators":["SUM"]},
> > > > >  {"name":"revenue","type":"integer"}],
> > > > >  "dimensions":
> > > > >  [{"combination":["campaignId","adId"]},
> > > > >  {"combination":["creativeId","campaignId"]},
> > > > >  {"combination":["campaignId"]},
> > > > >  {"combination":["publisherId","adOrderId","campaignId"],
> > > > > 
> > > > > "additionalValues":["revenue:SUM"]}]
> > > > > }
> > > > > 
> > > > > 
> > > > > thank you,
> > > > > B
> > > > > 
> > > > > On Sun, Aug 13, 2017 at 2:09 PM, Basanth Gowda <
> > 
> > basanth.gowda@gmail.com>
> > 
> > > > > wrote:
> > > > > > Hi,
> > > > > > I want to aggregate hits by Country, State, City. I would these as
> > > > 
> > > > tags in
> > > > 
> > > > > > my sample data.
> > > > > > 
> > > > > > How would I do aggregation at different levels ? Input data would
> > 
> > be
> > 
> > > > > > single record
> > > > > > 
> > > > > > Should I do flatMap transformation first and create 3 records from
> > 
> > 1
> > 
> > > > input
> > > > 
> > > > > > record, or is there a better way to do it ?
> > > > > > 
> > > > > > thank you,
> > > > > > basanth


Re: Aggregation by key hierarchy

Posted by Basanth Gowda <ba...@gmail.com>.
Thanks Nico.

As there are 2 ways to achieve this which is better ?

1st option -> dataStream.flatMap( ... ) -> this takes in out and provides
me N number of outputs, depending on my key combination . On each of the
output the same windowing logic is applied

or the one you suggested

2nd option -> use keyBy to create N number of streams

With the fist option I would use an external config, and it allows me to
change the number of combinations dynamically at runtime. Would it be
possible with 2nd option as well ? Can I modify or add data stream at
runtime without restarting  ?

On Wed, Aug 16, 2017 at 4:37 AM, Nico Kruber <ni...@data-artisans.com> wrote:

> [back to the ml...]
>
> also including your other mail's additional content...
> > I have been able to do this by the following and repeating this for every
> > key + window combination. So in the above case there would be 8 blocks
> like
> > below. (4 combinations and 2 window period for each combination)
> >
> > modelDataStream.keyBy("campaiginId","addId")
> >         .timeWindow(Time.minutes(1))
> >         .trigger(CountTrigger.of(2))
> >         .reduce(..)
>
> As mentioned in my last email, I only see one way for reducing duplication
> (for the key combinations) but this involves more handling from your side
> and
> I'd probably not recommend this. Regarding the different windows, I do not
> see
> something you may do otherwise here.
>
> Maybe Aljoscha (cc'd) has an idea of how to do this better
>
>
> Nico
>
> On Monday, 14 August 2017 19:08:29 CEST Basanth Gowda wrote:
> > Hi Nico,
> > Thank you . This is pretty much what I am doing , was wondering if there
> is
> > a better way.
> >
> > If there are 10 dimensions on which I want to aggregate with 2 windows -
> > this would become about 20 different combinations
> >
> > Thank you
> > Basanth
> >
> > On Mon, Aug 14, 2017 at 12:50 PM Nico Kruber <ni...@data-artisans.com>
> wrote:
> > > Hi Basanth,
> > > Let's assume you have records of the form
> > > Record = {timestamp, country, state, city, value}
> > > Then you'd like to create aggregates, e.g. the average, for the
> following
> > > combinations?
> > > 1) avg per country
> > > 2) avg per state and country
> > > 3) avg per city and state and country
> > >
> > > * You could create three streams and aggregate each individually:
> > > DataStream<Record> ds = //...
> > > DataStream<Record> ds1 = ds.keyBy("country");
> > > DataStream<Record> ds2 = ds.keyBy("country","state");
> > > DataStream<Record> ds3 = ds.keyBy("country","state","city");
> > > // + your aggregation per stream ds1, ds2, ds3
> > >
> > > You probably want to do different things for each of the resulting
> > > aggregations anyway, so having separate streams is probably right for
> you.
> > >
> > > * Alternatively, you could go with ds1 only and create the aggregates
> of
> > > the
> > > per-state (2) and per-city (3) ones in a stateful aggregation function
> > > yourself, e.g. in a MapState [1]. At the end of your aggregation
> window,
> > > you
> > > could then emit those with different keys to be able to distinguish
> > > between
> > > them.
> > >
> > >
> > > Nico
> > >
> > > [1]
> > > https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/stream/
> > > state.html
> > > <https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/dev/stream/st
> > > ate.html>>
> > > On Sunday, 13 August 2017 23:13:00 CEST Basanth Gowda wrote:
> > > > For example - this is a sample model from one of the Apache Apex
> > > > presentation.
> > > >
> > > > I would want to aggregate for different combinations, and different
> time
> > > > buckets. What is the best way to do this in Flink ?
> > > >
> > > > {"keys":[{"name":"campaignId","type":"integer"},
> > > >
> > > >  {"name":"adId","type":"integer"},
> > > >  {"name":"creativeId","type":"integer"},
> > > >  {"name":"publisherId","type":"integer"},
> > > >  {"name":"adOrderId","type":"integer"}],
> > > >  "timeBuckets":["1h","1d"],
> > > >
> > > >  "values":
> > > > [{"name":"impressions","type":"integer","aggregators":["SUM"]}
> > > > ,
> > > >
> > > >  {"name":"clicks","type":"integer","aggregators":["SUM"]},
> > > >  {"name":"revenue","type":"integer"}],
> > > >  "dimensions":
> > > >  [{"combination":["campaignId","adId"]},
> > > >  {"combination":["creativeId","campaignId"]},
> > > >  {"combination":["campaignId"]},
> > > >  {"combination":["publisherId","adOrderId","campaignId"],
> > > >
> > > > "additionalValues":["revenue:SUM"]}]
> > > > }
> > > >
> > > >
> > > > thank you,
> > > > B
> > > >
> > > > On Sun, Aug 13, 2017 at 2:09 PM, Basanth Gowda <
> basanth.gowda@gmail.com>
> > > >
> > > > wrote:
> > > > > Hi,
> > > > > I want to aggregate hits by Country, State, City. I would these as
> > >
> > > tags in
> > >
> > > > > my sample data.
> > > > >
> > > > > How would I do aggregation at different levels ? Input data would
> be
> > > > > single record
> > > > >
> > > > > Should I do flatMap transformation first and create 3 records from
> 1
> > >
> > > input
> > >
> > > > > record, or is there a better way to do it ?
> > > > >
> > > > > thank you,
> > > > > basanth
>
>

Re: Aggregation by key hierarchy

Posted by Nico Kruber <ni...@data-artisans.com>.
[back to the ml...]

also including your other mail's additional content...
> I have been able to do this by the following and repeating this for every
> key + window combination. So in the above case there would be 8 blocks like
> below. (4 combinations and 2 window period for each combination)
> 
> modelDataStream.keyBy("campaiginId","addId")
>         .timeWindow(Time.minutes(1))
>         .trigger(CountTrigger.of(2))
>         .reduce(..)

As mentioned in my last email, I only see one way for reducing duplication 
(for the key combinations) but this involves more handling from your side and 
I'd probably not recommend this. Regarding the different windows, I do not see 
something you may do otherwise here.

Maybe Aljoscha (cc'd) has an idea of how to do this better


Nico

On Monday, 14 August 2017 19:08:29 CEST Basanth Gowda wrote:
> Hi Nico,
> Thank you . This is pretty much what I am doing , was wondering if there is
> a better way.
> 
> If there are 10 dimensions on which I want to aggregate with 2 windows -
> this would become about 20 different combinations
> 
> Thank you
> Basanth
> 
> On Mon, Aug 14, 2017 at 12:50 PM Nico Kruber <ni...@data-artisans.com> wrote:
> > Hi Basanth,
> > Let's assume you have records of the form
> > Record = {timestamp, country, state, city, value}
> > Then you'd like to create aggregates, e.g. the average, for the following
> > combinations?
> > 1) avg per country
> > 2) avg per state and country
> > 3) avg per city and state and country
> > 
> > * You could create three streams and aggregate each individually:
> > DataStream<Record> ds = //...
> > DataStream<Record> ds1 = ds.keyBy("country");
> > DataStream<Record> ds2 = ds.keyBy("country","state");
> > DataStream<Record> ds3 = ds.keyBy("country","state","city");
> > // + your aggregation per stream ds1, ds2, ds3
> > 
> > You probably want to do different things for each of the resulting
> > aggregations anyway, so having separate streams is probably right for you.
> > 
> > * Alternatively, you could go with ds1 only and create the aggregates of
> > the
> > per-state (2) and per-city (3) ones in a stateful aggregation function
> > yourself, e.g. in a MapState [1]. At the end of your aggregation window,
> > you
> > could then emit those with different keys to be able to distinguish
> > between
> > them.
> > 
> > 
> > Nico
> > 
> > [1]
> > https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/
> > state.html
> > <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/st
> > ate.html>> 
> > On Sunday, 13 August 2017 23:13:00 CEST Basanth Gowda wrote:
> > > For example - this is a sample model from one of the Apache Apex
> > > presentation.
> > > 
> > > I would want to aggregate for different combinations, and different time
> > > buckets. What is the best way to do this in Flink ?
> > > 
> > > {"keys":[{"name":"campaignId","type":"integer"},
> > > 
> > >  {"name":"adId","type":"integer"},
> > >  {"name":"creativeId","type":"integer"},
> > >  {"name":"publisherId","type":"integer"},
> > >  {"name":"adOrderId","type":"integer"}],
> > >  "timeBuckets":["1h","1d"],
> > > 
> > >  "values":
> > > [{"name":"impressions","type":"integer","aggregators":["SUM"]}
> > > ,
> > > 
> > >  {"name":"clicks","type":"integer","aggregators":["SUM"]},
> > >  {"name":"revenue","type":"integer"}],
> > >  "dimensions":
> > >  [{"combination":["campaignId","adId"]},
> > >  {"combination":["creativeId","campaignId"]},
> > >  {"combination":["campaignId"]},
> > >  {"combination":["publisherId","adOrderId","campaignId"],
> > > 
> > > "additionalValues":["revenue:SUM"]}]
> > > }
> > > 
> > > 
> > > thank you,
> > > B
> > > 
> > > On Sun, Aug 13, 2017 at 2:09 PM, Basanth Gowda <ba...@gmail.com>
> > > 
> > > wrote:
> > > > Hi,
> > > > I want to aggregate hits by Country, State, City. I would these as
> > 
> > tags in
> > 
> > > > my sample data.
> > > > 
> > > > How would I do aggregation at different levels ? Input data would be
> > > > single record
> > > > 
> > > > Should I do flatMap transformation first and create 3 records from 1
> > 
> > input
> > 
> > > > record, or is there a better way to do it ?
> > > > 
> > > > thank you,
> > > > basanth


Re: Aggregation by key hierarchy

Posted by Nico Kruber <ni...@data-artisans.com>.
Hi Basanth,
Let's assume you have records of the form
Record = {timestamp, country, state, city, value}
Then you'd like to create aggregates, e.g. the average, for the following 
combinations?
1) avg per country
2) avg per state and country
3) avg per city and state and country

* You could create three streams and aggregate each individually:
DataStream<Record> ds = //...
DataStream<Record> ds1 = ds.keyBy("country");
DataStream<Record> ds2 = ds.keyBy("country","state");
DataStream<Record> ds3 = ds.keyBy("country","state","city");
// + your aggregation per stream ds1, ds2, ds3

You probably want to do different things for each of the resulting 
aggregations anyway, so having separate streams is probably right for you.

* Alternatively, you could go with ds1 only and create the aggregates of the 
per-state (2) and per-city (3) ones in a stateful aggregation function 
yourself, e.g. in a MapState [1]. At the end of your aggregation window, you 
could then emit those with different keys to be able to distinguish between 
them.


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/
state.html

On Sunday, 13 August 2017 23:13:00 CEST Basanth Gowda wrote:
> For example - this is a sample model from one of the Apache Apex
> presentation.
> 
> I would want to aggregate for different combinations, and different time
> buckets. What is the best way to do this in Flink ?
> 
> {"keys":[{"name":"campaignId","type":"integer"},
>  {"name":"adId","type":"integer"},
>  {"name":"creativeId","type":"integer"},
>  {"name":"publisherId","type":"integer"},
>  {"name":"adOrderId","type":"integer"}],
>  "timeBuckets":["1h","1d"],
>  "values":
> [{"name":"impressions","type":"integer","aggregators":["SUM"]}
> ,
>  {"name":"clicks","type":"integer","aggregators":["SUM"]},
>  {"name":"revenue","type":"integer"}],
>  "dimensions":
>  [{"combination":["campaignId","adId"]},
>  {"combination":["creativeId","campaignId"]},
>  {"combination":["campaignId"]},
>  {"combination":["publisherId","adOrderId","campaignId"],
> "additionalValues":["revenue:SUM"]}]
> }
> 
> 
> thank you,
> B
> 
> On Sun, Aug 13, 2017 at 2:09 PM, Basanth Gowda <ba...@gmail.com>
> 
> wrote:
> > Hi,
> > I want to aggregate hits by Country, State, City. I would these as tags in
> > my sample data.
> > 
> > How would I do aggregation at different levels ? Input data would be
> > single record
> > 
> > Should I do flatMap transformation first and create 3 records from 1 input
> > record, or is there a better way to do it ?
> > 
> > thank you,
> > basanth


Re: Aggregation by key hierarchy

Posted by Basanth Gowda <ba...@gmail.com>.
For example - this is a sample model from one of the Apache Apex
presentation.

I would want to aggregate for different combinations, and different time
buckets. What is the best way to do this in Flink ?

{"keys":[{"name":"campaignId","type":"integer"},
 {"name":"adId","type":"integer"},
 {"name":"creativeId","type":"integer"},
 {"name":"publisherId","type":"integer"},
 {"name":"adOrderId","type":"integer"}],
 "timeBuckets":["1h","1d"],
 "values":
[{"name":"impressions","type":"integer","aggregators":["SUM"]}
,
 {"name":"clicks","type":"integer","aggregators":["SUM"]},
 {"name":"revenue","type":"integer"}],
 "dimensions":
 [{"combination":["campaignId","adId"]},
 {"combination":["creativeId","campaignId"]},
 {"combination":["campaignId"]},
 {"combination":["publisherId","adOrderId","campaignId"],
"additionalValues":["revenue:SUM"]}]
}


thank you,
B

On Sun, Aug 13, 2017 at 2:09 PM, Basanth Gowda <ba...@gmail.com>
wrote:

> Hi,
> I want to aggregate hits by Country, State, City. I would these as tags in
> my sample data.
>
> How would I do aggregation at different levels ? Input data would be
> single record
>
> Should I do flatMap transformation first and create 3 records from 1 input
> record, or is there a better way to do it ?
>
> thank you,
> basanth
>