You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Faye Pressly <Fa...@outlook.com> on 2020/08/07 19:28:27 UTC

GroupBy with count on a joint table only let met write using toRetractStream

Hello,

I have a steam of events (coming from a Kinesis Stream) of this form:

impressionId | advertid | variationName | eventType | eventTime

The end goal is to output back on a Kinesis Stream the count of event of type 'impression' and the count of events of type 'click'

however, I need to drop (or ignore) event of type clicks that don't have a matching impressionId with an event of type 'impression' (So basically I need to discard click event that don't have an impression)

This is how tackled my solution:

// Convert the stream into a table
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
Table eventsTable = tEnv.fromDataStream(eventsStream, "impressionId, advertId, variationName, eventType, eventTime.rowtime");
tEnv.registerTable("Events", eventsTable);

// Create a table with only event of type clicks
Table clicksTable = eventsTable
      .where("eventType = 'click'")
      .window(Slide.over("24.hour").every("1.minute").on("eventTime").as("minuteWindow"))
      .groupBy("impressionId, advertId, variationName, minuteWindow")
      .select("impressionId as clickImpId, creativeId as clickAdvertId, variationName as clickVariationName, minuteWindow.rowTime as clickMinute");

// Create a table with only event of type impression
Table impressionsTable = eventsTable
      .where("eventType = 'impression'")
      .window(Slide.over("24.hour").every("1.minute").on("eventTime").as("minuteWindow"))
      .groupBy("impressionId, advertId, variationName, minuteWindow")
      .select("impressionId as impImpressionId, advertId as impAdvertId, variationName as impVariationName, eventTime, minuteWindow.rowTime as impMinute");

// left join the impression with the clicks using the impressionId as well as the temporal field
//and then group by to generate a count of all the click that have a matching impression (aka row where clickAdvertId is not null)
Table allImpressionTable = impressionsTable
      .leftOuterJoin(clicksTable, "clickImpId = impImpressionId && clickMinute = impMinute")
      .groupBy("impAdvertId, impVariationName, impMinute")
      .select("impAdvertId, impVariationName, clickAdvertId.count as clickCount, impMinute")
       .where("clickCount != null");
[.... same logic to count impressions]

Now to debug and to see if the counts are correct I usually use "tEnv.toAppendStream(allImpressionTable, Result.class).print()" and I'm able to use that new created stream to send it back on a kinesis Stream

However I have an error saying that I cannot use toAppendStream and that instead I have to use toRetractStream. It indeed works and I can see the counts in the output are correct however I don't understand how I can use the result contained in this new stream because it has multiple rows with "true"/"false" and the correct count is usuall the last entry with the "true" key.

I have multiple question:

1) I'm very new with Flink and I would like to know if my approach to filter-out un-matching events is the correct one ? (stream -> table and joins -> stream)
Is there a much easier way of doing this ? Is it perhaps possible to filter all these events directly in the DataStream?


2) How do I use the retractStream? How do use it in order to send the final counts to a sink and not the entirety of the "true/False" insert/Delete rows?


Thank you!

Re: GroupBy with count on a joint table only let met write using toRetractStream

Posted by godfrey he <go...@gmail.com>.
Hi Faye,

1) In your sql, different events are for different groups, it seems hard to
extract a global Filter into DataStream.
2) AFAK, you can just drop the retract message (the flag is false), and
then convert the retract stream to append stream.
 The downstream job needs to duplicate the records, just like [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication

Best,
Godfrey

Faye Pressly <Fa...@outlook.com> 于2020年8月8日周六 上午3:30写道:

> Sorry just notice I made a typo in the last table (clickAdvertId != null
> instead of clickCount !=null)
>
> Table allImpressionTable = impressionsTable
>       .leftOuterJoin(clicksTable, "clickImpId = impImpressionId &&
> clickMinute = impMinute")
>       .groupBy("impAdvertId, impVariationName, impMinute")
>       .select("impAdvertId, impVariationName, clickAdvertId.count as
> clickCount, impMinute")
>        .where("clickAdvertId != null");
>
> ------------------------------
> *From:* Faye Pressly
> *Sent:* Friday, August 7, 2020 9:28 PM
> *To:* user@flink.apache.org <us...@flink.apache.org>
> *Subject:* GroupBy with count on a joint table only let met write using
> toRetractStream
>
> Hello,
>
> I have a steam of events (coming from a Kinesis Stream) of this form:
>
> impressionId | advertid | variationName | eventType | eventTime
>
> The end goal is to output back on a Kinesis Stream the count of event of
> type 'impression' and the count of events of type 'click'
>
> however, I need to drop (or ignore) event of type clicks that don't have a
> matching impressionId with an event of type 'impression' (So basically I
> need to discard click event that don't have an impression)
>
> This is how tackled my solution:
>
> // Convert the stream into a table
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> Table eventsTable = tEnv.fromDataStream(eventsStream, "impressionId,
> advertId, variationName, eventType, eventTime.rowtime");
> tEnv.registerTable("Events", eventsTable);
>
> // Create a table with only event of type clicks
> Table clicksTable = eventsTable
>       .where("eventType = 'click'")
>
> .window(Slide.over("24.hour").every("1.minute").on("eventTime").as("minuteWindow"))
>       .groupBy("impressionId, advertId, variationName, minuteWindow")
>       .select("impressionId as clickImpId, creativeId as clickAdvertId,
> variationName as clickVariationName, minuteWindow.rowTime as clickMinute");
>
> // Create a table with only event of type impression
> Table impressionsTable = eventsTable
>       .where("eventType = 'impression'")
>
> .window(Slide.over("24.hour").every("1.minute").on("eventTime").as("minuteWindow"))
>       .groupBy("impressionId, advertId, variationName, minuteWindow")
>       .select("impressionId as impImpressionId, advertId as impAdvertId,
> variationName as impVariationName, eventTime, minuteWindow.rowTime as
> impMinute");
>
> // left join the impression with the clicks using the impressionId as well
> as the temporal field
> //and then group by to generate a count of all the click that have a
> matching impression (aka row where clickAdvertId is not null)
> Table allImpressionTable = impressionsTable
>       .leftOuterJoin(clicksTable, "clickImpId = impImpressionId &&
> clickMinute = impMinute")
>       .groupBy("impAdvertId, impVariationName, impMinute")
>       .select("impAdvertId, impVariationName, clickAdvertId.count as
> clickCount, impMinute")
>        .where("clickCount != null");
> [.... same logic to count impressions]
>
> Now to debug and to see if the counts are correct I usually use "
> tEnv.toAppendStream(allImpressionTable, Result.class).print()" and I'm
> able to use that new created stream to send it back on a kinesis Stream
>
> However I have an error saying that I cannot use toAppendStream and that
> instead I have to use toRetractStream. It indeed works and I can see the
> counts in the output are correct however I don't understand how I can use
> the result contained in this new stream because it has multiple rows with
> "true"/"false" and the correct count is usuall the last entry with the
> "true" key.
>
> I have multiple question:
>
> 1) I'm very new with Flink and I would like to know if my approach to
> filter-out un-matching events is the correct one ? (stream -> table and
> joins -> stream)
> Is there a much easier way of doing this ? Is it perhaps possible to
> filter all these events directly in the DataStream?
>
>
> 2) How do I use the retractStream? How do use it in order to send the
> final counts to a sink and not the entirety of the "true/False"
> insert/Delete rows?
>
>
> Thank you!
>

Re: GroupBy with count on a joint table only let met write using toRetractStream

Posted by Faye Pressly <Fa...@outlook.com>.
Sorry just notice I made a typo in the last table (clickAdvertId != null instead of clickCount !=null)

Table allImpressionTable = impressionsTable
      .leftOuterJoin(clicksTable, "clickImpId = impImpressionId && clickMinute = impMinute")
      .groupBy("impAdvertId, impVariationName, impMinute")
      .select("impAdvertId, impVariationName, clickAdvertId.count as clickCount, impMinute")
       .where("clickAdvertId != null");

________________________________
From: Faye Pressly
Sent: Friday, August 7, 2020 9:28 PM
To: user@flink.apache.org <us...@flink.apache.org>
Subject: GroupBy with count on a joint table only let met write using toRetractStream

Hello,

I have a steam of events (coming from a Kinesis Stream) of this form:

impressionId | advertid | variationName | eventType | eventTime

The end goal is to output back on a Kinesis Stream the count of event of type 'impression' and the count of events of type 'click'

however, I need to drop (or ignore) event of type clicks that don't have a matching impressionId with an event of type 'impression' (So basically I need to discard click event that don't have an impression)

This is how tackled my solution:

// Convert the stream into a table
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
Table eventsTable = tEnv.fromDataStream(eventsStream, "impressionId, advertId, variationName, eventType, eventTime.rowtime");
tEnv.registerTable("Events", eventsTable);

// Create a table with only event of type clicks
Table clicksTable = eventsTable
      .where("eventType = 'click'")
      .window(Slide.over("24.hour").every("1.minute").on("eventTime").as("minuteWindow"))
      .groupBy("impressionId, advertId, variationName, minuteWindow")
      .select("impressionId as clickImpId, creativeId as clickAdvertId, variationName as clickVariationName, minuteWindow.rowTime as clickMinute");

// Create a table with only event of type impression
Table impressionsTable = eventsTable
      .where("eventType = 'impression'")
      .window(Slide.over("24.hour").every("1.minute").on("eventTime").as("minuteWindow"))
      .groupBy("impressionId, advertId, variationName, minuteWindow")
      .select("impressionId as impImpressionId, advertId as impAdvertId, variationName as impVariationName, eventTime, minuteWindow.rowTime as impMinute");

// left join the impression with the clicks using the impressionId as well as the temporal field
//and then group by to generate a count of all the click that have a matching impression (aka row where clickAdvertId is not null)
Table allImpressionTable = impressionsTable
      .leftOuterJoin(clicksTable, "clickImpId = impImpressionId && clickMinute = impMinute")
      .groupBy("impAdvertId, impVariationName, impMinute")
      .select("impAdvertId, impVariationName, clickAdvertId.count as clickCount, impMinute")
       .where("clickCount != null");
[.... same logic to count impressions]

Now to debug and to see if the counts are correct I usually use "tEnv.toAppendStream(allImpressionTable, Result.class).print()" and I'm able to use that new created stream to send it back on a kinesis Stream

However I have an error saying that I cannot use toAppendStream and that instead I have to use toRetractStream. It indeed works and I can see the counts in the output are correct however I don't understand how I can use the result contained in this new stream because it has multiple rows with "true"/"false" and the correct count is usuall the last entry with the "true" key.

I have multiple question:

1) I'm very new with Flink and I would like to know if my approach to filter-out un-matching events is the correct one ? (stream -> table and joins -> stream)
Is there a much easier way of doing this ? Is it perhaps possible to filter all these events directly in the DataStream?


2) How do I use the retractStream? How do use it in order to send the final counts to a sink and not the entirety of the "true/False" insert/Delete rows?


Thank you!