You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Rui Wang <ru...@google.com> on 2018/10/23 22:46:36 UTC

[SQL] Investigation of missing/wrong session_end implementation in BeamSQL

Hi community,

In BeamSQL, SESSION window is supported in GROUP BY. Example query:

    "SELECT f_int2, COUNT(*) AS `getFieldCount`,"
        + " SESSION_START(f_timestamp, INTERVAL '5' MINUTE) AS `window_start`, "
        + " SESSION_END(f_timestamp, INTERVAL '5' MINUTE) AS `window_end` "
        + " FROM TABLE_A"
        + " GROUP BY f_int2, SESSION(f_timestamp, INTERVAL '5' MINUTE)";


However, I observed SESSION_END (window_end) always returns the same
timestamp as what SESSION_START(window_start) returns, so BeamSQL misses
the implementation to SESSION_END. Here is something about the
investigation of root cause and proposed fix:

*Why we are not missing tumble_end and hop_end?*
Because when generating logical plan, Calcite replaces tumble_start and
hop_start with a reference to GROUP BY's TUMBLE/HOP. The GROUP BY's
TUMBLE/HOP is supposed to return a timestamp. Then Calcite replaces
tumble_end and hop_end with a PLUS(timestamp reference, window_size as a
constant). As tumble and hop has a fixed window size as constants in their
function signatures, Calcite generates the PLUS in logical plan, which
means for tumble and hop, we only need a timestamp (which represents
window_start in our implementation) to generate both window_start and
window_end in Projection.

We are emitting window_start timestamp as the result of TUMBLE/HOP/SESSION
functions:
https://github.com/amaliujia/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java#L84



*Why we are missing session_end?*Because Calcite does not know what's the
window size of session window, so in logical plan, Calcite generates a
reference to GROUP BY's SESSION for session_end, as the same as the
reference generated for session_start. So in logical plan, session_start =
session_end. Because in BeamSQL, we don't differentiate session with tumble
and hop, so we returns window start as the result of SESSION function, and
then in the final result, we see session_start = session_end.

*Is this a Calcite bug?*
Yes and No.

Clearly Calcite shouldn't hide window_end by creating a wrong reference in
logical plan. If Calcite does not know what's session_end, it should at
least keep it. Ideally Calcite should keep window_end in logical plan and
let us decide what it means: either a reference or a PLUS or something else.

However, Calcite leaves space for us to add the window_end back in physical
plan nodes. For example, we can add window_end back in BeamAggregationRel.
We can probably change the reference of session_end to a reference to our
window_end in BeamAggregationRel.

*What is the fix?*
In BeamAggregationRel, we should add a window_end right after window
functions. We can emit window_end timestamp for the added field. And in
Projection, we should change window_end from a PLUS (for tumble and hop)
and a wrong reference (for session) to a right reference to the newly added
window_end in Aggregation.

Jira: https://issues.apache.org/jira/browse/BEAM-5843


-Rui

Re: [SQL] Investigation of missing/wrong session_end implementation in BeamSQL

Posted by Kenneth Knowles <ke...@apache.org>.
This is some very cool digging, especially the forays into neighboring
Apache projects. We (and Flink) are clearly pushing at the edges of what
the original Calcite design foresaw. The naive insertion of Beam/Flink
style "group by window(s)" into SQL is showing a bit of wear.

Kenn

On Tue, Oct 23, 2018 at 3:46 PM Rui Wang <ru...@google.com> wrote:

> Hi community,
>
> In BeamSQL, SESSION window is supported in GROUP BY. Example query:
>
>     "SELECT f_int2, COUNT(*) AS `getFieldCount`,"
>         + " SESSION_START(f_timestamp, INTERVAL '5' MINUTE) AS `window_start`, "
>         + " SESSION_END(f_timestamp, INTERVAL '5' MINUTE) AS `window_end` "
>         + " FROM TABLE_A"
>         + " GROUP BY f_int2, SESSION(f_timestamp, INTERVAL '5' MINUTE)";
>
>
> However, I observed SESSION_END (window_end) always returns the same
> timestamp as what SESSION_START(window_start) returns, so BeamSQL misses
> the implementation to SESSION_END. Here is something about the
> investigation of root cause and proposed fix:
>
> *Why we are not missing tumble_end and hop_end?*
> Because when generating logical plan, Calcite replaces tumble_start and
> hop_start with a reference to GROUP BY's TUMBLE/HOP. The GROUP BY's
> TUMBLE/HOP is supposed to return a timestamp. Then Calcite replaces
> tumble_end and hop_end with a PLUS(timestamp reference, window_size as a
> constant). As tumble and hop has a fixed window size as constants in their
> function signatures, Calcite generates the PLUS in logical plan, which
> means for tumble and hop, we only need a timestamp (which represents
> window_start in our implementation) to generate both window_start and
> window_end in Projection.
>
> We are emitting window_start timestamp as the result of TUMBLE/HOP/SESSION
> functions:
> https://github.com/amaliujia/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java#L84
>
>
>
> *Why we are missing session_end?*Because Calcite does not know what's the
> window size of session window, so in logical plan, Calcite generates a
> reference to GROUP BY's SESSION for session_end, as the same as the
> reference generated for session_start. So in logical plan, session_start =
> session_end. Because in BeamSQL, we don't differentiate session with tumble
> and hop, so we returns window start as the result of SESSION function, and
> then in the final result, we see session_start = session_end.
>
> *Is this a Calcite bug?*
> Yes and No.
>
> Clearly Calcite shouldn't hide window_end by creating a wrong reference in
> logical plan. If Calcite does not know what's session_end, it should at
> least keep it. Ideally Calcite should keep window_end in logical plan and
> let us decide what it means: either a reference or a PLUS or something else.
>
> However, Calcite leaves space for us to add the window_end back in
> physical plan nodes. For example, we can add window_end back in
> BeamAggregationRel. We can probably change the reference of session_end to
> a reference to our window_end in BeamAggregationRel.
>
> *What is the fix?*
> In BeamAggregationRel, we should add a window_end right after window
> functions. We can emit window_end timestamp for the added field. And in
> Projection, we should change window_end from a PLUS (for tumble and hop)
> and a wrong reference (for session) to a right reference to the newly added
> window_end in Aggregation.
>
> Jira: https://issues.apache.org/jira/browse/BEAM-5843
>
>
> -Rui
>