You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Piyush Narang <p....@criteo.com> on 2019/03/12 02:53:31 UTC

Expressing Flink array aggregation using Table / SQL API

Hi folks,

I’m getting started with Flink and trying to figure out how to express aggregating some rows into an array to finally sink data into an AppendStreamTableSink.
My data looks something like this:
userId, clientId, eventType, timestamp, dataField

I need to compute some custom aggregations using a UDAF while grouping by userId, clientId over a sliding window (10 mins, triggered every 1 min). My first attempt is:
SELECT userId, clientId, my_aggregation(eventType, `timestamp`, dataField) as custom_aggregated
FROM my_kafka_stream_table
GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)

This query works as I expect it to. In every time window I end up with inserts for unique userId + clientId combinations. What I want to do though, is generate a single row per userId in each time window and this is what I’m struggling with expressing along with the restriction that I want to sink this to an AppendStreamTableSink. I was hoping to do something like this:

SELECT userId, COLLECT(client_custom_aggregated)
FROM
(
  SELECT userId, MAP[clientId, my_aggregation(eventType, `timestamp`, dataField) as custom_aggregated] as client_custom_aggregated
  FROM my_kafka_stream_table
  GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)
) GROUP BY userId

Unfortunately when I try this (and a few other variants), I run into the error, “AppendStreamTableSink requires that Table has only insert changes”. Does anyone know if there’s a way for me to compute my collect aggregation to produce one row per userId for a given time window?

Thanks,

-- Piyush


Re: Expressing Flink array aggregation using Table / SQL API

Posted by Kurt Young <yk...@gmail.com>.
Another choice we used before is we create a Retract/Upsert sink directly
on current kafka sink, and provide
a choice to just drop the "delete" message it received. You could do the
dedup work in the following jobs who
will consume this result.

Best,
Kurt


On Fri, Mar 15, 2019 at 9:44 PM Piyush Narang <p....@criteo.com> wrote:

> Hi Kurt,
>
>
>
> Thanks for getting back and explaining this. The behavior in this case
> makes more sense now after your explanation + reading the dynamic tables
> article. I was able to hook up the Scoped aggregation like you suggested so
> I have a workaround for now. I guess the part that I’m trying to figure out
> is if there’s any way to express the query I had to be able to sink to an
> append sink (apart from this custom aggregation). I tried including the
> time window in the outer query as well but I was running into errors there.
> Or would you typically in such scenarios go the route of either having a
> retractable sink / sink that can update partial results by key?
>
>
>
> Thanks,
>
>
>
> -- Piyush
>
>
>
>
>
> *From: *Kurt Young <yk...@gmail.com>
> *Date: *Tuesday, March 12, 2019 at 11:51 PM
> *To: *Piyush Narang <p....@criteo.com>
> *Cc: *"user@flink.apache.org" <us...@flink.apache.org>
> *Subject: *Re: Expressing Flink array aggregation using Table / SQL API
>
>
>
> Hi Piyush,
>
>
>
> I think your second sql is correct, but the problem you have encountered
> is the outside aggregation (GROUP BY userId
> & COLLECT(client_custom_aggregated)) will
>
> emit result immediately when receiving results from the inner
> aggregation.  Hence Flink need the sink to
>
> 1. either has ability to retract the former emitted result, the sink
> should be a `RetractStreamTableSink` or
>
> 2. the sink has something like primary key and can update result by key.
> In your case, userId should be the key.
>
>
>
> I think you are trying to emit the result to a `AppendStreamTableSink`,
> so here is why you see error like that.
>
>
>
> Best,
>
> Kurt
>
>
>
>
>
> On Tue, Mar 12, 2019 at 9:46 PM Piyush Narang <p....@criteo.com> wrote:
>
> Thanks for getting back Kurt. Yeah this might be an option to try out. I
> was hoping there would be a way to express this directly in the SQL though
> ☹.
>
>
>
> -- Piyush
>
>
>
>
>
> *From: *Kurt Young <yk...@gmail.com>
> *Date: *Tuesday, March 12, 2019 at 2:25 AM
> *To: *Piyush Narang <p....@criteo.com>
> *Cc: *"user@flink.apache.org" <us...@flink.apache.org>
> *Subject: *Re: Expressing Flink array aggregation using Table / SQL API
>
>
>
> Hi Piyush,
>
>
>
> Could you try to add clientId into your aggregate function, and to track
> the map of <clientId, your_original_aggregation> inside your new aggregate
> function, and assemble what ever result when emit.
>
> The SQL will looks like:
>
>
>
> SELECT userId, some_aggregation(clientId, eventType, `timestamp`,
> dataField)
>
> FROM my_kafka_stream_table
>
> GROUP BY userId,  HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)
>
>
>
> Kurt
>
>
>
>
>
> On Tue, Mar 12, 2019 at 11:03 AM Piyush Narang <p....@criteo.com>
> wrote:
>
> Hi folks,
>
>
>
> I’m getting started with Flink and trying to figure out how to express
> aggregating some rows into an array to finally sink data into an
> AppendStreamTableSink.
>
> My data looks something like this:
>
> userId, clientId, eventType, timestamp, dataField
>
>
>
> I need to compute some custom aggregations using a UDAF while grouping by
> userId, clientId over a sliding window (10 mins, triggered every 1 min). My
> first attempt is:
>
> SELECT userId, clientId, my_aggregation(eventType, `timestamp`, dataField)
> as custom_aggregated
>
> FROM my_kafka_stream_table
>
> GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL
> '1' HOUR)
>
>
>
> This query works as I expect it to. In every time window I end up with
> inserts for unique userId + clientId combinations. What I want to do
> though, is generate a single row per userId in each time window and this is
> what I’m struggling with expressing along with the restriction that I want
> to sink this to an AppendStreamTableSink. I was hoping to do something like
> this:
>
>
>
> SELECT userId, COLLECT(client_custom_aggregated)
>
> FROM
>
> (
>
>   SELECT userId, MAP[clientId, my_aggregation(eventType, `timestamp`,
> dataField) as custom_aggregated] as client_custom_aggregated
>
>   FROM my_kafka_stream_table
>
>   GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE,
> INTERVAL '1' HOUR)
>
> ) GROUP BY userId
>
>
>
> Unfortunately when I try this (and a few other variants), I run into the
> error, “AppendStreamTableSink requires that Table has only insert changes”.
> Does anyone know if there’s a way for me to compute my collect aggregation
> to produce one row per userId for a given time window?
>
>
>
> Thanks,
>
>
>
> -- Piyush
>
>
>
>

Re: Expressing Flink array aggregation using Table / SQL API

Posted by Piyush Narang <p....@criteo.com>.
Hi Kurt,

Thanks for getting back and explaining this. The behavior in this case makes more sense now after your explanation + reading the dynamic tables article. I was able to hook up the Scoped aggregation like you suggested so I have a workaround for now. I guess the part that I’m trying to figure out is if there’s any way to express the query I had to be able to sink to an append sink (apart from this custom aggregation). I tried including the time window in the outer query as well but I was running into errors there. Or would you typically in such scenarios go the route of either having a retractable sink / sink that can update partial results by key?

Thanks,

-- Piyush


From: Kurt Young <yk...@gmail.com>
Date: Tuesday, March 12, 2019 at 11:51 PM
To: Piyush Narang <p....@criteo.com>
Cc: "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: Expressing Flink array aggregation using Table / SQL API

Hi Piyush,

I think your second sql is correct, but the problem you have encountered is the outside aggregation (GROUP BY userId & COLLECT(client_custom_aggregated)) will
emit result immediately when receiving results from the inner aggregation.  Hence Flink need the sink to
1. either has ability to retract the former emitted result, the sink should be a `RetractStreamTableSink` or
2. the sink has something like primary key and can update result by key. In your case, userId should be the key.

I think you are trying to emit the result to a `AppendStreamTableSink`, so here is why you see error like that.

Best,
Kurt


On Tue, Mar 12, 2019 at 9:46 PM Piyush Narang <p....@criteo.com>> wrote:
Thanks for getting back Kurt. Yeah this might be an option to try out. I was hoping there would be a way to express this directly in the SQL though ☹.

-- Piyush


From: Kurt Young <yk...@gmail.com>>
Date: Tuesday, March 12, 2019 at 2:25 AM
To: Piyush Narang <p....@criteo.com>>
Cc: "user@flink.apache.org<ma...@flink.apache.org>" <us...@flink.apache.org>>
Subject: Re: Expressing Flink array aggregation using Table / SQL API

Hi Piyush,

Could you try to add clientId into your aggregate function, and to track the map of <clientId, your_original_aggregation> inside your new aggregate function, and assemble what ever result when emit.
The SQL will looks like:

SELECT userId, some_aggregation(clientId, eventType, `timestamp`, dataField)
FROM my_kafka_stream_table
GROUP BY userId,  HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)

Kurt


On Tue, Mar 12, 2019 at 11:03 AM Piyush Narang <p....@criteo.com>> wrote:
Hi folks,

I’m getting started with Flink and trying to figure out how to express aggregating some rows into an array to finally sink data into an AppendStreamTableSink.
My data looks something like this:
userId, clientId, eventType, timestamp, dataField

I need to compute some custom aggregations using a UDAF while grouping by userId, clientId over a sliding window (10 mins, triggered every 1 min). My first attempt is:
SELECT userId, clientId, my_aggregation(eventType, `timestamp`, dataField) as custom_aggregated
FROM my_kafka_stream_table
GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)

This query works as I expect it to. In every time window I end up with inserts for unique userId + clientId combinations. What I want to do though, is generate a single row per userId in each time window and this is what I’m struggling with expressing along with the restriction that I want to sink this to an AppendStreamTableSink. I was hoping to do something like this:

SELECT userId, COLLECT(client_custom_aggregated)
FROM
(
  SELECT userId, MAP[clientId, my_aggregation(eventType, `timestamp`, dataField) as custom_aggregated] as client_custom_aggregated
  FROM my_kafka_stream_table
  GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)
) GROUP BY userId

Unfortunately when I try this (and a few other variants), I run into the error, “AppendStreamTableSink requires that Table has only insert changes”. Does anyone know if there’s a way for me to compute my collect aggregation to produce one row per userId for a given time window?

Thanks,

-- Piyush


Re: Expressing Flink array aggregation using Table / SQL API

Posted by Kurt Young <yk...@gmail.com>.
Hi Piyush,

I think your second sql is correct, but the problem you have encountered is
the outside aggregation (GROUP BY userId
& COLLECT(client_custom_aggregated)) will
emit result immediately when receiving results from the inner aggregation.
Hence Flink need the sink to
1. either has ability to retract the former emitted result, the sink should
be a `RetractStreamTableSink` or
2. the sink has something like primary key and can update result by key. In
your case, userId should be the key.

I think you are trying to emit the result to a `AppendStreamTableSink`, so
here is why you see error like that.

Best,
Kurt


On Tue, Mar 12, 2019 at 9:46 PM Piyush Narang <p....@criteo.com> wrote:

> Thanks for getting back Kurt. Yeah this might be an option to try out. I
> was hoping there would be a way to express this directly in the SQL though
> ☹.
>
>
>
> -- Piyush
>
>
>
>
>
> *From: *Kurt Young <yk...@gmail.com>
> *Date: *Tuesday, March 12, 2019 at 2:25 AM
> *To: *Piyush Narang <p....@criteo.com>
> *Cc: *"user@flink.apache.org" <us...@flink.apache.org>
> *Subject: *Re: Expressing Flink array aggregation using Table / SQL API
>
>
>
> Hi Piyush,
>
>
>
> Could you try to add clientId into your aggregate function, and to track
> the map of <clientId, your_original_aggregation> inside your new aggregate
> function, and assemble what ever result when emit.
>
> The SQL will looks like:
>
>
>
> SELECT userId, some_aggregation(clientId, eventType, `timestamp`,
> dataField)
>
> FROM my_kafka_stream_table
>
> GROUP BY userId,  HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)
>
>
>
> Kurt
>
>
>
>
>
> On Tue, Mar 12, 2019 at 11:03 AM Piyush Narang <p....@criteo.com>
> wrote:
>
> Hi folks,
>
>
>
> I’m getting started with Flink and trying to figure out how to express
> aggregating some rows into an array to finally sink data into an
> AppendStreamTableSink.
>
> My data looks something like this:
>
> userId, clientId, eventType, timestamp, dataField
>
>
>
> I need to compute some custom aggregations using a UDAF while grouping by
> userId, clientId over a sliding window (10 mins, triggered every 1 min). My
> first attempt is:
>
> SELECT userId, clientId, my_aggregation(eventType, `timestamp`, dataField)
> as custom_aggregated
>
> FROM my_kafka_stream_table
>
> GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL
> '1' HOUR)
>
>
>
> This query works as I expect it to. In every time window I end up with
> inserts for unique userId + clientId combinations. What I want to do
> though, is generate a single row per userId in each time window and this is
> what I’m struggling with expressing along with the restriction that I want
> to sink this to an AppendStreamTableSink. I was hoping to do something like
> this:
>
>
>
> SELECT userId, COLLECT(client_custom_aggregated)
>
> FROM
>
> (
>
>   SELECT userId, MAP[clientId, my_aggregation(eventType, `timestamp`,
> dataField) as custom_aggregated] as client_custom_aggregated
>
>   FROM my_kafka_stream_table
>
>   GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE,
> INTERVAL '1' HOUR)
>
> ) GROUP BY userId
>
>
>
> Unfortunately when I try this (and a few other variants), I run into the
> error, “AppendStreamTableSink requires that Table has only insert changes”.
> Does anyone know if there’s a way for me to compute my collect aggregation
> to produce one row per userId for a given time window?
>
>
>
> Thanks,
>
>
>
> -- Piyush
>
>
>
>

Re: Expressing Flink array aggregation using Table / SQL API

Posted by Piyush Narang <p....@criteo.com>.
Thanks for getting back Kurt. Yeah this might be an option to try out. I was hoping there would be a way to express this directly in the SQL though ☹.

-- Piyush


From: Kurt Young <yk...@gmail.com>
Date: Tuesday, March 12, 2019 at 2:25 AM
To: Piyush Narang <p....@criteo.com>
Cc: "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: Expressing Flink array aggregation using Table / SQL API

Hi Piyush,

Could you try to add clientId into your aggregate function, and to track the map of <clientId, your_original_aggregation> inside your new aggregate function, and assemble what ever result when emit.
The SQL will looks like:

SELECT userId, some_aggregation(clientId, eventType, `timestamp`, dataField)
FROM my_kafka_stream_table
GROUP BY userId,  HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)

Kurt


On Tue, Mar 12, 2019 at 11:03 AM Piyush Narang <p....@criteo.com>> wrote:
Hi folks,

I’m getting started with Flink and trying to figure out how to express aggregating some rows into an array to finally sink data into an AppendStreamTableSink.
My data looks something like this:
userId, clientId, eventType, timestamp, dataField

I need to compute some custom aggregations using a UDAF while grouping by userId, clientId over a sliding window (10 mins, triggered every 1 min). My first attempt is:
SELECT userId, clientId, my_aggregation(eventType, `timestamp`, dataField) as custom_aggregated
FROM my_kafka_stream_table
GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)

This query works as I expect it to. In every time window I end up with inserts for unique userId + clientId combinations. What I want to do though, is generate a single row per userId in each time window and this is what I’m struggling with expressing along with the restriction that I want to sink this to an AppendStreamTableSink. I was hoping to do something like this:

SELECT userId, COLLECT(client_custom_aggregated)
FROM
(
  SELECT userId, MAP[clientId, my_aggregation(eventType, `timestamp`, dataField) as custom_aggregated] as client_custom_aggregated
  FROM my_kafka_stream_table
  GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)
) GROUP BY userId

Unfortunately when I try this (and a few other variants), I run into the error, “AppendStreamTableSink requires that Table has only insert changes”. Does anyone know if there’s a way for me to compute my collect aggregation to produce one row per userId for a given time window?

Thanks,

-- Piyush


Re: Expressing Flink array aggregation using Table / SQL API

Posted by Kurt Young <yk...@gmail.com>.
Hi Piyush,

Could you try to add clientId into your aggregate function, and to track
the map of <clientId, your_original_aggregation> inside your new aggregate
function, and assemble what ever result when emit.
The SQL will looks like:

SELECT userId, some_aggregation(clientId, eventType, `timestamp`,
dataField)
FROM my_kafka_stream_table
GROUP BY userId,  HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)

Kurt


On Tue, Mar 12, 2019 at 11:03 AM Piyush Narang <p....@criteo.com> wrote:

> Hi folks,
>
>
>
> I’m getting started with Flink and trying to figure out how to express
> aggregating some rows into an array to finally sink data into an
> AppendStreamTableSink.
>
> My data looks something like this:
>
> userId, clientId, eventType, timestamp, dataField
>
>
>
> I need to compute some custom aggregations using a UDAF while grouping by
> userId, clientId over a sliding window (10 mins, triggered every 1 min). My
> first attempt is:
>
> SELECT userId, clientId, my_aggregation(eventType, `timestamp`, dataField)
> as custom_aggregated
>
> FROM my_kafka_stream_table
>
> GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE, INTERVAL
> '1' HOUR)
>
>
>
> This query works as I expect it to. In every time window I end up with
> inserts for unique userId + clientId combinations. What I want to do
> though, is generate a single row per userId in each time window and this is
> what I’m struggling with expressing along with the restriction that I want
> to sink this to an AppendStreamTableSink. I was hoping to do something like
> this:
>
>
>
> SELECT userId, COLLECT(client_custom_aggregated)
>
> FROM
>
> (
>
>   SELECT userId, MAP[clientId, my_aggregation(eventType, `timestamp`,
> dataField) as custom_aggregated] as client_custom_aggregated
>
>   FROM my_kafka_stream_table
>
>   GROUP BY userId, clientId, HOP(`timestamp`, INTERVAL '1' MINUTE,
> INTERVAL '1' HOUR)
>
> ) GROUP BY userId
>
>
>
> Unfortunately when I try this (and a few other variants), I run into the
> error, “AppendStreamTableSink requires that Table has only insert changes”.
> Does anyone know if there’s a way for me to compute my collect aggregation
> to produce one row per userId for a given time window?
>
>
>
> Thanks,
>
>
>
> -- Piyush
>
>
>