You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Marco Villalobos <mv...@kineteque.com> on 2020/12/08 15:51:16 UTC

How can I optimize joins or cache misses in SQL api?

scenario:

kafka stream enriched with tableS in postgresql

Let's pretend that the postgres has an organizations, departments, and
persons table, and we want to join the full name of the kafka table
that has the person id.  I also want to determine if the person id is
missing.

This requires a left join.

SELECT o.id, d.id, p.fullname, k.ssn, SUM(k.amount)
FROM purchases k
JOIN organizations o ON o.code = k.organization
JOIN departmentS d ON d.code = k.department
LEFT JOIN persons FOR SYSTEM_TIME AS OF k.procTime AS p ON
p.department_id = d.id
WHERE p.ssn = k.ssn
GROUP BY
TUMBLE(s.procTime, INTERVAL '3' MINUTE), o.id, d.id, p.fullname, k.ssn

Let's say that the TTL for organizations and departments is 12 months,
but for persons
it is 1 month.

observations:

If six unique people enter the kafka topic, then that will issue six
separate queries to the database of the form:

SELECT id, ssn, fullname, dob FROM persons WHERE.deparment_id = $1 AND ssn = $2

However, since this is a tumbling, it would be more efficient to do
one query with six parameters in an IN clause.  Example:

SELECT id, ssn, fullname, dob FROM persons WHERE.(deparment_id, ssn)
IN (($1,$2), ($3,$4),($5,$6),($7,$8),($9,$10)($11,$12))

Q: Is there a way to control that? I don't want the N + 1 query problem.

Q: Are these queries performed asynchronously?  If there were 200000
unique persons, I would not want 20000 synchronous queries.

Q: Is there a way to preload persons table, since it changes only
about once every two weeks and then do a LEFT JOIN on it?

Let's say that the person does not exist. I am impressed that Flink
caches that a person does not exist.  However, I want to cache if a
person exists for a month, but if the person does not exist, I only
want to remember that for a day.

Q: Is there a way to control a shorter cache time for non-existent items?

I really like the expressiveness and succinctness of the SQL api in
Flink, however, I am worried that I need use the data-stream API in
order to control the scenarios above.

I appreciate any advice, thank you.

Re: How can I optimize joins or cache misses in SQL api?

Posted by Danny Chan <da...@apache.org>.
Hi, Marco Villalobos ~

It's nice to see that you choose the SQL API which is more concise and
expressive.

To answer some of your questions:

> Q: Is there a way to control that? I don't want the N + 1 query problem.

No, the SQL evaluate row by row, there maybe some optimizations internal
that buffer the data first, but there is no logic to combine the ad-hoc
query into one IN.

> Q: Is there a way to preload persons table, since it changes only
about once every two weeks and then do a LEFT JOIN on it?

Yes, the temporal table have a configuration to cache the data, but by
default, this feature is disabled. [1]

> Q: Is there a way to control a shorter cache time for non-existent items?

You can configure the state TTL of stream-stream join through [2] or modify
the temporal cache TTL through the options above.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#connector-options
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/query_configuration.html#idle-state-retention-time

Marco Villalobos <mv...@kineteque.com> 于2020年12月8日周二 下午11:51写道:

> scenario:
>
> kafka stream enriched with tableS in postgresql
>
> Let's pretend that the postgres has an organizations, departments, and
> persons table, and we want to join the full name of the kafka table
> that has the person id.  I also want to determine if the person id is
> missing.
>
> This requires a left join.
>
> SELECT o.id, d.id, p.fullname, k.ssn, SUM(k.amount)
> FROM purchases k
> JOIN organizations o ON o.code = k.organization
> JOIN departmentS d ON d.code = k.department
> LEFT JOIN persons FOR SYSTEM_TIME AS OF k.procTime AS p ON
> p.department_id = d.id
> WHERE p.ssn = k.ssn
> GROUP BY
> TUMBLE(s.procTime, INTERVAL '3' MINUTE), o.id, d.id, p.fullname, k.ssn
>
> Let's say that the TTL for organizations and departments is 12 months,
> but for persons
> it is 1 month.
>
> observations:
>
> If six unique people enter the kafka topic, then that will issue six
> separate queries to the database of the form:
>
> SELECT id, ssn, fullname, dob FROM persons WHERE.deparment_id = $1 AND ssn
> = $2
>
> However, since this is a tumbling, it would be more efficient to do
> one query with six parameters in an IN clause.  Example:
>
> SELECT id, ssn, fullname, dob FROM persons WHERE.(deparment_id, ssn)
> IN (($1,$2), ($3,$4),($5,$6),($7,$8),($9,$10)($11,$12))
>
> Q: Is there a way to control that? I don't want the N + 1 query problem.
>
> Q: Are these queries performed asynchronously?  If there were 200000
> unique persons, I would not want 20000 synchronous queries.
>
> Q: Is there a way to preload persons table, since it changes only
> about once every two weeks and then do a LEFT JOIN on it?
>
> Let's say that the person does not exist. I am impressed that Flink
> caches that a person does not exist.  However, I want to cache if a
> person exists for a month, but if the person does not exist, I only
> want to remember that for a day.
>
> Q: Is there a way to control a shorter cache time for non-existent items?
>
> I really like the expressiveness and succinctness of the SQL api in
> Flink, however, I am worried that I need use the data-stream API in
> order to control the scenarios above.
>
> I appreciate any advice, thank you.
>