You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Joseph Lorenzini <JL...@gohealth.com> on 2021/08/16 13:44:23 UTC

Windowed Aggregation With Event Time over a Temporary View

Hi all,



I am on Flink 1.12.3.



So here’s the scenario: I have a Kafka topic as a source, where each record
repsents a change to an append only audit log. The kafka record has the
following fields:



  * id (unique identifier for that audit log entry) 
  * operation id (is shared across multiple records)
  * operation (string)
  * start_ts (TIMESTAMP(3))
  * end_ts (TIMESTAMP(3))



I am trying to calculate the average item count and duration per operation. I
first converted the kafka source to an append only data stream and then I
attempted to run the following SQL:



      Table workLogTable = tableEnv.fromDataStream(workLogStream)      

      tableEnv.createTemporaryView("work_log", workLogTable);

         Table workLogCntTable = tableEnv.sqlQuery("select operation_id, operation, max(start_ts) as start_ts, max(end_ts) as end_ts, count(*) as item_count, max(audit_ts) as audit_ts, max(event_time) as max_event_time" +

            " FROM work_log GROUP BY operation_id, operation");

        tableEnv.createTemporaryView("work_log_cnt", workLogCntTable);

        tableEnv.executeSql("select max(audit_ts), operation, avg(item_count) as average_item_count, AVG(end_ts - start_ts) as avg_duration from" +

            " work_log_cnt" +

            " GROUP BY TUMBLE(max_event_time, INTERVAL '1' SECOND), operation").print();



The problem I am having is that I am unable to preserve the event time between
the first view and the second. I am getting this error:



caused by: org.apache.calcite.runtime.CalciteContextException: From line 1,
column 139 to line 1, column 181: Cannot apply '$TUMBLE' to arguments of type
'$TUMBLE(<BIGINT>, <INTERVAL SECOND>)'. Supported form(s):
'$TUMBLE(<DATETIME>, <DATETIME_INTERVAL>)'



My guess is that the max function in the first query is converting the event
time from DATETIME type to a BigInt. I am not sure how to apply an aggregate
to the event time in the first query such that the event time from the
original kafka stream can be used in the second view. Is there a way to make
this work?



Thanks,

Joe



Privileged/Confidential Information may be contained in this message. If you
are not the addressee indicated in this message (or responsible for delivery
of the message to such person), you may not copy or deliver this message to
anyone. In such case, you should destroy this message and kindly notify the
sender by reply email. Please advise immediately if you or your employer does
not consent to Internet email for messages of this kind. Opinions, conclusions
and other information in this message that do not relate to the official
business of my firm shall be understood as neither given nor endorsed by it.


Re: Windowed Aggregation With Event Time over a Temporary View

Posted by JING ZHANG <be...@gmail.com>.
Hi Joe,
>
> caused by: org.apache.calcite.runtime.CalciteContextException: From line
> 1, column 139 to line 1, column 181: Cannot apply '$TUMBLE' to arguments of
> type '$TUMBLE(<BIGINT>, <INTERVAL SECOND>)'. Supported form(s):
> '$TUMBLE(<DATETIME>, <DATETIME_INTERVAL>)'

The first parameter of Group Window Function [1] must be a field with time
attribute [2].
In you case, the first parameter of TUMBLE Window Function is `max_event_time`,
which is not a field with time attribute.
It originates from max(event_time) as max_event_time from the first
unbounded aggregate, please note that
the aggregate function result could not be a field with time attribute even
if it works on a time attribute (in your case,
`max` function works on event_time which is not a field with time attribute
either).

Is there a way to make this work?

First, we need define the time attribute when converting a DataStream To
Table[3], please refer document [3] for detailed information.

Secondly, we need propagate the time attribute to next sql part.
Unbounded aggregate could not propagate the time attribute. You could try
Window aggregate to propagate the time attribute,
For example, TUMBLE_ROWTIME, HOP_ROWTIME, SESSION_ROWTIME could return a
field with rowtime attribute [4].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#group-windows
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/timely-stream-processing.html#notions-of-time-event-time-and-processing-time

[3]
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/concepts/time_attributes/#during-datastream-to-table-conversion
[4]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#selecting-group-window-start-and-end-timestamps

Best,
JING ZHANG

Joseph Lorenzini <JL...@gohealth.com> 于2021年8月16日周一 下午9:45写道:

> Hi all,
>
>
>
> I am on Flink 1.12.3.
>
>
>
> So here’s the scenario: I have a Kafka topic as a source, where each
> record repsents a change to an append only audit log. The kafka record has
> the following fields:
>
>
>
>    - id (unique identifier for that audit log entry)
>    - operation id (is shared across multiple records)
>    - operation (string)
>    - start_ts (TIMESTAMP(3))
>    - end_ts (TIMESTAMP(3))
>
>
>
> I am trying to calculate the average item count and duration per
> operation. I first converted the kafka source to an append only data stream
> and then I attempted to run the following SQL:
>
>
>
>       Table workLogTable = tableEnv.fromDataStream(workLogStream)
>
>       tableEnv.createTemporaryView("work_log", workLogTable);
>
>          Table workLogCntTable = tableEnv.sqlQuery("select operation_id,
> operation, max(start_ts) as start_ts, max(end_ts) as end_ts, count(*) as
> item_count, max(audit_ts) as audit_ts, max(event_time) as max_event_time" +
>
>             " FROM work_log GROUP BY operation_id, operation");
>
>         tableEnv.createTemporaryView("work_log_cnt", workLogCntTable);
>
>         tableEnv.executeSql("select max(audit_ts), operation,
> avg(item_count) as average_item_count, AVG(end_ts - start_ts) as
> avg_duration from" +
>
>             " work_log_cnt" +
>
>             " GROUP BY TUMBLE(max_event_time, INTERVAL '1' SECOND),
> operation").print();
>
>
>
> The problem I am having is that I am unable to preserve the event time
> between the first view and the second. I am getting this error:
>
>
>
> caused by: org.apache.calcite.runtime.CalciteContextException: From line
> 1, column 139 to line 1, column 181: Cannot apply '$TUMBLE' to arguments of
> type '$TUMBLE(<BIGINT>, <INTERVAL SECOND>)'. Supported form(s):
> '$TUMBLE(<DATETIME>, <DATETIME_INTERVAL>)'
>
>
>
> My guess is that the max function in the first query is converting the
> event time from DATETIME type to a BigInt. I am not sure how to apply an
> aggregate to the event time in the first query such that the event time
> from the original kafka stream can be used in the second view. Is there a
> way to make this work?
>
>
>
> Thanks,
>
> Joe
>
>
> Privileged/Confidential Information may be contained in this message. If
> you are not the addressee indicated in this message (or responsible for
> delivery of the message to such person), you may not copy or deliver this
> message to anyone. In such case, you should destroy this message and kindly
> notify the sender by reply email. Please advise immediately if you or your
> employer does not consent to Internet email for messages of this kind.
> Opinions, conclusions and other information in this message that do not
> relate to the official business of my firm shall be understood as neither
> given nor endorsed by it.
>