You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by John Smith <my...@gmail.com> on 2021/09/22 08:44:46 UTC

Stream join with (changing) dimension in Kafka

Hi,

I'm trying to use temporal join in Table API to enrich a stream of pageview
events with a slowly changing dimension of user information.
The pageview events are in a kafka topic called *pageviews* and the user
information are in a kafka topic keyed by *userid* and whenever there is an
updated user event, it is appended to the *users* topic.
I declare a table on the pageview topic with watermark strategy of
*WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1))* and a
table on the users topic with watermark strategy of
* WatermarkStrategy.forMonotonousTimestamps().*

Here is the code for the temporal join:

Table pv = getPageview(env, tableEnv, properties).
        select(
        $("timestamp").as("pv_ts"),
        $("userid").as("pv_userid"),
        $("pageid").as("pv_pageid")
);
Table usr = getUsers(env, tableEnv, properties)
        .select(
        $("timestamp").as("u_ts"),
        $("userid").as("u_userid"),
        $("regionid"),
        $("gender")
);

TemporalTableFunction userFunction =
usr.createTemporalTableFunction($("u_ts"), $("u_userid"));
tableEnv.createTemporaryFunction("usrFun", userFunction);

Table enrichedPV = pv.joinLateral(call("usrFun", $("pv_ts")),
$("pv_userid").isEqual($("u_userid")));

enrichedPV.execute().print();

When I run this, I get result like the following which only is triggered
when there are new messages pushed into both pageviews and users topics:

+----+-------------------------+--------------------------------+--------------------------------+-------------------------+--------------------------------+--------------------------------+--------------------------------+
| op |                   pv_ts |                      pv_userid |
           pv_pageid |                    u_ts |
u_userid |                       regionid |                         gender |
+----+-------------------------+--------------------------------+--------------------------------+-------------------------+--------------------------------+--------------------------------+--------------------------------+
| +I | 2021-09-22 08:28:05.346 |                         User_8 |
             Page_99 | 2021-09-22 08:28:04.769 |
User_8 |                       Region_1 |                          OTHER |
| +I | 2021-09-22 08:28:12.377 |                         User_3 |
             Page_88 | 2021-09-22 08:28:08.823 |
User_3 |                       Region_8 |                         FEMALE |
| +I | 2021-09-22 08:28:15.385 |                         User_7 |
             Page_73 | 2021-09-22 08:28:07.817 |
User_7 |                       Region_9 |                          OTHER |
| +I | 2021-09-22 08:28:16.391 |                         User_7 |
             Page_97 | 2021-09-22 08:28:07.817 |
User_7 |                       Region_9 |                          OTHER |
| +I | 2021-09-22 08:28:17.396 |                         User_7 |
             Page_43 | 2021-09-22 08:28:07.817 |
User_7 |                       Region_9 |                          OTHER |
| +I | 2021-09-22 08:28:18.400 |                         User_6 |
             Page_43 | 2021-09-22 08:28:15.854 |
User_6 |                       Region_5 |                          OTHER |

However, I want to trigger a result whenever a new pageview message arrives
and not wait on the user side.
Do I have any obvious mistake in my code that I cannot get this behavior?
Also is there any code example that I can try where the main stream is
enriched when there is a new event regardless of having any new event in
the dimension side? Flink documentation on temporal join especially for
TableAPI is really thin!

Thanks in advance.

Re: Stream join with (changing) dimension in Kafka

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

What type of time attribute is u_ts? If it is an event time attribute then
this query you're running is an event time temporal table join, which will
pause outputting records until the watermark from both inputs has risen
above the row time of that record.

As the dimension table is changing quite slowly, I would recommend you
using the processing time temporal table join (or the so called lookup
table join) instead. See this example from the scala API unit test:

val proctimeOrders: Table = util.addDataStream[(Long, String)](
  "ProctimeOrders", 'o_amount, 'o_currency, 'o_proctime.proctime)
val proctimeRatesHistory: Table = util.addDataStream[(String, Int)](
  "ProctimeRatesHistory", 'currency, 'rate, 'proctime.proctime)

The documentation of Table API indeed lacks quite a lot of information. I
would recommend you to try out the SQL API instead, which is the super set
of Table API and will more expressive and easier to understand.


John Smith <my...@gmail.com> 于2021年9月22日周三 下午4:45写道:

> Hi,
>
> I'm trying to use temporal join in Table API to enrich a stream of
> pageview events with a slowly changing dimension of user information.
> The pageview events are in a kafka topic called *pageviews* and the user
> information are in a kafka topic keyed by *userid* and whenever there is
> an updated user event, it is appended to the *users* topic.
> I declare a table on the pageview topic with watermark strategy of
> *WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1))* and a
> table on the users topic with watermark strategy of
> * WatermarkStrategy.forMonotonousTimestamps().*
>
> Here is the code for the temporal join:
>
> Table pv = getPageview(env, tableEnv, properties).
>         select(
>         $("timestamp").as("pv_ts"),
>         $("userid").as("pv_userid"),
>         $("pageid").as("pv_pageid")
> );
> Table usr = getUsers(env, tableEnv, properties)
>         .select(
>         $("timestamp").as("u_ts"),
>         $("userid").as("u_userid"),
>         $("regionid"),
>         $("gender")
> );
>
> TemporalTableFunction userFunction = usr.createTemporalTableFunction($("u_ts"), $("u_userid"));
> tableEnv.createTemporaryFunction("usrFun", userFunction);
>
> Table enrichedPV = pv.joinLateral(call("usrFun", $("pv_ts")), $("pv_userid").isEqual($("u_userid")));
>
> enrichedPV.execute().print();
>
> When I run this, I get result like the following which only is triggered
> when there are new messages pushed into both pageviews and users topics:
>
>
> +----+-------------------------+--------------------------------+--------------------------------+-------------------------+--------------------------------+--------------------------------+--------------------------------+
> | op |                   pv_ts |                      pv_userid |
>              pv_pageid |                    u_ts |
> u_userid |                       regionid |                         gender |
>
> +----+-------------------------+--------------------------------+--------------------------------+-------------------------+--------------------------------+--------------------------------+--------------------------------+
> | +I | 2021-09-22 08:28:05.346 |                         User_8 |
>                Page_99 | 2021-09-22 08:28:04.769 |
> User_8 |                       Region_1 |                          OTHER |
> | +I | 2021-09-22 08:28:12.377 |                         User_3 |
>                Page_88 | 2021-09-22 08:28:08.823 |
> User_3 |                       Region_8 |                         FEMALE |
> | +I | 2021-09-22 08:28:15.385 |                         User_7 |
>                Page_73 | 2021-09-22 08:28:07.817 |
> User_7 |                       Region_9 |                          OTHER |
> | +I | 2021-09-22 08:28:16.391 |                         User_7 |
>                Page_97 | 2021-09-22 08:28:07.817 |
> User_7 |                       Region_9 |                          OTHER |
> | +I | 2021-09-22 08:28:17.396 |                         User_7 |
>                Page_43 | 2021-09-22 08:28:07.817 |
> User_7 |                       Region_9 |                          OTHER |
> | +I | 2021-09-22 08:28:18.400 |                         User_6 |
>                Page_43 | 2021-09-22 08:28:15.854 |
> User_6 |                       Region_5 |                          OTHER |
>
> However, I want to trigger a result whenever a new pageview message
> arrives and not wait on the user side.
> Do I have any obvious mistake in my code that I cannot get this behavior?
> Also is there any code example that I can try where the main stream is
> enriched when there is a new event regardless of having any new event in
> the dimension side? Flink documentation on temporal join especially for
> TableAPI is really thin!
>
> Thanks in advance.
>
>
>
>
>