You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by liuxiangcao <xi...@gmail.com> on 2022/04/15 19:06:40 UTC

How to define event time and Watermark for intermediary joined table in FlinkSQL?

Hi Flink community,

*Here is the context: *
Theoretically, I would like to write following query but it won't work
since we can only define the WATERMARK in a table DDL:

INSERT into tableC
select tableA.field1
         SUM(1) as `count`,
         time_ltz AS getEventTimeInNS(tableA.timestamp, tableB.timestamp),
         WATERMARK FOR time_ltz AS time_ltz - INTERVAL '20' SECONDS
from tableA join tableB
on tableA.joinCol == tableB.joinCol
group by TUMBLE(time_ltz, INTERVAL '30' SECOND), tableA.field1
(note: getEventTimeInNS is a UDF that calculates event time using
tableA.timestamp and tableB.timestamp)


so I have to define a intermediary table to store the results from joining,
and defining event time and watermark in the table DDL, then performs
tumbling windowing on the intermediary table:

CREATE TABLE IntermediaryTable (
   field1,
  `eventTimestampInNanoseconds`  BIGINT,
   time_ltz AS TO_TIMESTAMP_LTZ(eventTimestampInNanoseconds/1000000, 3),
   WATERMARK FOR time_ltz AS time_ltz - INTERVAL '20' SECONDS
) WITH (
  'connector' = 'kafka',
  'topic' = 'IntermediaryTable',
  'properties.bootstrap.servers' = 'xxxxxx',
  'properties.group.id' = 'contextevent-streaming-sql',
  'format' = 'avro'
);

INSERT INTO IntermediaryTable
select tableA.field1
          tableB.field2,
          getEventTimeInNS(tableA.timestamp, tableB.timestamp),
from tableA join tableB
on tableA.joinCol == tableB.joinCol;

Then, I can perform tumbling window aggregation on the IntermediaryTable:

INSERT INTO countTable
(select event.field1
        SUM(1) as `count`
 from IntermediaryTable event
 GROUP BY
  TUMBLE(event.time_ltz, INTERVAL '30' SECOND),
  event.field1
);


This is not convenient because the IntermediaryTable writes to another
kafka topic that is only used by the tumbling window aggregation. When I
try to group the two INSERT INTO statements within "BEGIN STATEMENT SET;
END;", it will fail complaining the topic does not exist. I either have to
first create this kafka topic beforehand, or run a separate job to INSERT
INTO IntermediaryTable.

In Java DataStream API, you can easily do so within flink topology without
having to create a separate kafka topic:

final DataStream<xxx> joinedStream =
                 StreamA.join(StreamB)
                 .where(xxxx)
                 .equalTo(xxxx)
                 .window(TumblingProcessingTimeWindows.of(Time.seconds(30)))
                 .apply(aggregation);


*Question:*
Does the Flink community have any suggestions on how to do this in FlinkSQL
in a friendly way? Would it be a good idea for FlinkSQL to support defining
eventtime and watermark on the fly without a table ddl? Would love to hear
any suggestions. Thanks a lot in advance.

-- 
Best Wishes & Regards
Shawn Xiangcao Liu

Re: Re: How to define event time and Watermark for intermediary joined table in FlinkSQL?

Posted by liuxiangcao <xi...@gmail.com>.
Thanks for the jira link.

Actually my comment in the initial email "In Java DataStream API, you can
easily do so within flink topology without having to create a separate
kafka topic: "  is incorrect.

I took a closer look and realized Flink Java DataStream also does not
support redefining TimestampAssigner on a JoinedStreams
<https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java>.
It will simply use the event timestamp and watermark from the input
streams.

On Fri, Apr 29, 2022 at 7:35 AM Xuyang <xy...@163.com> wrote:

> I think it's not a good idea to defining a watermark on a view, because
> currently the view is only a set of SQL query text in Flink , and a query
> should not contain a watermark definition. You can see the discussion here:
> https://issues.apache.org/jira/browse/FLINK-22804
> Maybe you can open a jira again to discuss the behavior you expect.
>
> 在 2022-04-29 13:30:34,"liuxiangcao" <xi...@gmail.com> 写道:
>
> Hi Shengkai,
>
> Thank you for the reply.
>
> The UDF getEventTimeInNS uses timestamps of both streamA and streamB to
> calculate the true event time for streamB events.
>
> For illustrating purpose, we can consider it to be like this:
>
> public Long eval(
>         Long baseTimeStampFromA,
>         Long timestampA
>         Long timestampB) {
>   return baseTimeStampFromA + timestampB - timestampA;
> }
>
> Basically I need to redefine the event timestamp and watermark for the
> output stream of a join operator.
>
> You are right. Ideally I hope FlinkSQL can support defining a watermark on
> a view.  Do you know if this was discussed in the Flink community before?
> Wondering whether this may be supported in future.
>
> On Thu, Apr 21, 2022 at 2:44 AM Shengkai Fang <fs...@gmail.com> wrote:
>
>> Hi,
>>
>> The watermark of the join operator is the minimum of the watermark of the
>> input streams.
>>
>> ```
>> JoinOperator.watermark = min(left.watermark, right.watermark);
>> ```
>>
>> I think it's enough for most cases.  Could you share more details about
>> the logic in the UDF getEventTimeInNS?
>>
>> I think the better solution comparing to the intermediate table is to
>> define the watermark on the VIEW. But Flink doesn't support it now.
>>
>> Best,
>> Shengkai
>>
>>
>>
>>
>> liuxiangcao <xi...@gmail.com> 于2022年4月16日周六 03:07写道:
>>
>>> Hi Flink community,
>>>
>>> *Here is the context: *
>>> Theoretically, I would like to write following query but it won't work
>>> since we can only define the WATERMARK in a table DDL:
>>>
>>> INSERT into tableC
>>> select tableA.field1
>>>          SUM(1) as `count`,
>>>          time_ltz AS getEventTimeInNS(tableA.timestamp, tableB.timestamp),
>>>          WATERMARK FOR time_ltz AS time_ltz - INTERVAL '20' SECONDS
>>> from tableA join tableB
>>> on tableA.joinCol == tableB.joinCol
>>> group by TUMBLE(time_ltz, INTERVAL '30' SECOND), tableA.field1
>>> (note: getEventTimeInNS is a UDF that calculates event time using tableA.timestamp and tableB.timestamp)
>>>
>>>
>>> so I have to define a intermediary table to store the results from
>>> joining, and defining event time and watermark in the table DDL, then
>>> performs tumbling windowing on the intermediary table:
>>>
>>> CREATE TABLE IntermediaryTable (
>>>    field1,
>>>   `eventTimestampInNanoseconds`  BIGINT,
>>>    time_ltz AS TO_TIMESTAMP_LTZ(eventTimestampInNanoseconds/1000000, 3),
>>>    WATERMARK FOR time_ltz AS time_ltz - INTERVAL '20' SECONDS
>>> ) WITH (
>>>   'connector' = 'kafka',
>>>   'topic' = 'IntermediaryTable',
>>>   'properties.bootstrap.servers' = 'xxxxxx',
>>>   'properties.group.id' = 'contextevent-streaming-sql',
>>>   'format' = 'avro'
>>> );
>>>
>>> INSERT INTO IntermediaryTable
>>> select tableA.field1
>>>           tableB.field2,
>>>           getEventTimeInNS(tableA.timestamp, tableB.timestamp),
>>> from tableA join tableB
>>> on tableA.joinCol == tableB.joinCol;
>>>
>>> Then, I can perform tumbling window aggregation on the IntermediaryTable:
>>>
>>> INSERT INTO countTable
>>> (select event.field1
>>>         SUM(1) as `count`
>>>  from IntermediaryTable event
>>>  GROUP BY
>>>   TUMBLE(event.time_ltz, INTERVAL '30' SECOND),
>>>   event.field1
>>> );
>>>
>>>
>>> This is not convenient because the IntermediaryTable writes to another
>>> kafka topic that is only used by the tumbling window aggregation. When I
>>> try to group the two INSERT INTO statements within "BEGIN STATEMENT SET;
>>> END;", it will fail complaining the topic does not exist. I either have to
>>> first create this kafka topic beforehand, or run a separate job to INSERT
>>> INTO IntermediaryTable.
>>>
>>> In Java DataStream API, you can easily do so within flink topology
>>> without having to create a separate kafka topic:
>>>
>>> final DataStream<xxx> joinedStream =
>>>                  StreamA.join(StreamB)
>>>                  .where(xxxx)
>>>                  .equalTo(xxxx)
>>>                  .window(TumblingProcessingTimeWindows.of(Time.seconds(30)))
>>>                  .apply(aggregation);
>>>
>>>
>>> *Question:*
>>> Does the Flink community have any suggestions on how to do this in
>>> FlinkSQL in a friendly way? Would it be a good idea for FlinkSQL to support
>>> defining eventtime and watermark on the fly without a table ddl? Would love
>>> to hear any suggestions. Thanks a lot in advance.
>>>
>>> --
>>> Best Wishes & Regards
>>> Shawn Xiangcao Liu
>>>
>>
>
> --
> Best Wishes & Regards
> Shawn Xiangcao Liu
>
>

-- 
Best Wishes & Regards
Shawn Xiangcao Liu

Re:Re: How to define event time and Watermark for intermediary joined table in FlinkSQL?

Posted by Xuyang <xy...@163.com>.
I think it's not a good idea to defining a watermark on a view, because currently the view is only a set of SQL query text in Flink , and a query should not contain a watermark definition. You can see the discussion here: https://issues.apache.org/jira/browse/FLINK-22804
Maybe you can open a jira again to discuss the behavior you expect.



在 2022-04-29 13:30:34,"liuxiangcao" <xi...@gmail.com> 写道:

Hi Shengkai, 


Thank you for the reply. 


The UDF getEventTimeInNS uses timestamps of both streamA and streamB to calculate the true event time for streamB events. 


For illustrating purpose, we can consider it to be like this: 
public Long eval(
        Long baseTimeStampFromA,
Long timestampA
        Long timestampB) {
return baseTimeStampFromA + timestampB - timestampA;
}
Basically I need to redefine the event timestamp and watermark for the output stream of a join operator. 


You are right. Ideally I hope FlinkSQL can support defining a watermark on a view.  Do you know if this was discussed in the Flink community before? Wondering whether this may be supported in future. 



On Thu, Apr 21, 2022 at 2:44 AM Shengkai Fang <fs...@gmail.com> wrote:

Hi, 


The watermark of the join operator is the minimum of the watermark of the input streams. 


```
JoinOperator.watermark = min(left.watermark, right.watermark);
```


I think it's enough for most cases.  Could you share more details about the logic in the UDF getEventTimeInNS? 

I think the better solution comparing to the intermediate table is to define the watermark on the VIEW. But Flink doesn't support it now.


Best,
Shengkai








liuxiangcao <xi...@gmail.com> 于2022年4月16日周六 03:07写道:

Hi Flink community, 


Here is the context: 
Theoretically, I would like to write following query but it won't work since we can only define the WATERMARK in a table DDL:

INSERT into tableC
select tableA.field1 
SUM(1) as `count`, 
         time_ltz AS getEventTimeInNS(tableA.timestamp, tableB.timestamp),
         WATERMARK FOR time_ltz AS time_ltz - INTERVAL '20' SECONDS
from tableA join tableB
on tableA.joinCol == tableB.joinCol
group by TUMBLE(time_ltz, INTERVAL '30' SECOND), tableA.field1
(note: getEventTimeInNS is a UDF that calculates event time using tableA.timestamp and tableB.timestamp)


so I have to define a intermediary table to store the results from joining, and defining event time and watermark in the table DDL, then performs tumbling windowing on the intermediary table: 
CREATE TABLE IntermediaryTable (
   field1,
  `eventTimestampInNanoseconds`  BIGINT,
   time_ltz AS TO_TIMESTAMP_LTZ(eventTimestampInNanoseconds/1000000, 3),
   WATERMARK FOR time_ltz AS time_ltz - INTERVAL '20' SECONDS
) WITH (
'connector' = 'kafka',
'topic' = 'IntermediaryTable',
'properties.bootstrap.servers' = 'xxxxxx',
'properties.group.id' = 'contextevent-streaming-sql',
'format' = 'avro'
);
INSERT INTO IntermediaryTable
select tableA.field1
          tableB.field2,
          getEventTimeInNS(tableA.timestamp, tableB.timestamp),
from tableA join tableB
on tableA.joinCol == tableB.joinCol;
Then, I can perform tumbling window aggregation on the IntermediaryTable:
INSERT INTO countTable
(select event.field1
SUM(1) as `count`
from IntermediaryTable event
GROUP BY
TUMBLE(event.time_ltz, INTERVAL '30' SECOND),
  event.field1
);


This is not convenient because the IntermediaryTable writes to another kafka topic that is only used by the tumbling window aggregation. When I try to group the two INSERT INTO statements within "BEGIN STATEMENT SET; END;", it will fail complaining the topic does not exist. I either have to first create this kafka topic beforehand, or run a separate job to INSERT INTO IntermediaryTable. 


In Java DataStream API, you can easily do so within flink topology without having to create a separate kafka topic: 
final DataStream<xxx> joinedStream =
                 StreamA.join(StreamB)
                 .where(xxxx)
                 .equalTo(xxxx)
                 .window(TumblingProcessingTimeWindows.of(Time.seconds(30)))
                 .apply(aggregation);


Question:
Does the Flink community have any suggestions on how to do this in FlinkSQL in a friendly way? Would it be a good idea for FlinkSQL to support defining eventtime and watermark on the fly without a table ddl? Would love to hear any suggestions. Thanks a lot in advance. 


--

Best Wishes & Regards
Shawn Xiangcao Liu





--

Best Wishes & Regards
Shawn Xiangcao Liu

Re: How to define event time and Watermark for intermediary joined table in FlinkSQL?

Posted by liuxiangcao <xi...@gmail.com>.
Hi Shengkai,

Thank you for the reply.

The UDF getEventTimeInNS uses timestamps of both streamA and streamB to
calculate the true event time for streamB events.

For illustrating purpose, we can consider it to be like this:

public Long eval(
        Long baseTimeStampFromA,
        Long timestampA
        Long timestampB) {
  return baseTimeStampFromA + timestampB - timestampA;
}

Basically I need to redefine the event timestamp and watermark for the
output stream of a join operator.

You are right. Ideally I hope FlinkSQL can support defining a watermark on
a view.  Do you know if this was discussed in the Flink community before?
Wondering whether this may be supported in future.

On Thu, Apr 21, 2022 at 2:44 AM Shengkai Fang <fs...@gmail.com> wrote:

> Hi,
>
> The watermark of the join operator is the minimum of the watermark of the
> input streams.
>
> ```
> JoinOperator.watermark = min(left.watermark, right.watermark);
> ```
>
> I think it's enough for most cases.  Could you share more details about
> the logic in the UDF getEventTimeInNS?
>
> I think the better solution comparing to the intermediate table is to
> define the watermark on the VIEW. But Flink doesn't support it now.
>
> Best,
> Shengkai
>
>
>
>
> liuxiangcao <xi...@gmail.com> 于2022年4月16日周六 03:07写道:
>
>> Hi Flink community,
>>
>> *Here is the context: *
>> Theoretically, I would like to write following query but it won't work
>> since we can only define the WATERMARK in a table DDL:
>>
>> INSERT into tableC
>> select tableA.field1
>>          SUM(1) as `count`,
>>          time_ltz AS getEventTimeInNS(tableA.timestamp, tableB.timestamp),
>>          WATERMARK FOR time_ltz AS time_ltz - INTERVAL '20' SECONDS
>> from tableA join tableB
>> on tableA.joinCol == tableB.joinCol
>> group by TUMBLE(time_ltz, INTERVAL '30' SECOND), tableA.field1
>> (note: getEventTimeInNS is a UDF that calculates event time using tableA.timestamp and tableB.timestamp)
>>
>>
>> so I have to define a intermediary table to store the results from
>> joining, and defining event time and watermark in the table DDL, then
>> performs tumbling windowing on the intermediary table:
>>
>> CREATE TABLE IntermediaryTable (
>>    field1,
>>   `eventTimestampInNanoseconds`  BIGINT,
>>    time_ltz AS TO_TIMESTAMP_LTZ(eventTimestampInNanoseconds/1000000, 3),
>>    WATERMARK FOR time_ltz AS time_ltz - INTERVAL '20' SECONDS
>> ) WITH (
>>   'connector' = 'kafka',
>>   'topic' = 'IntermediaryTable',
>>   'properties.bootstrap.servers' = 'xxxxxx',
>>   'properties.group.id' = 'contextevent-streaming-sql',
>>   'format' = 'avro'
>> );
>>
>> INSERT INTO IntermediaryTable
>> select tableA.field1
>>           tableB.field2,
>>           getEventTimeInNS(tableA.timestamp, tableB.timestamp),
>> from tableA join tableB
>> on tableA.joinCol == tableB.joinCol;
>>
>> Then, I can perform tumbling window aggregation on the IntermediaryTable:
>>
>> INSERT INTO countTable
>> (select event.field1
>>         SUM(1) as `count`
>>  from IntermediaryTable event
>>  GROUP BY
>>   TUMBLE(event.time_ltz, INTERVAL '30' SECOND),
>>   event.field1
>> );
>>
>>
>> This is not convenient because the IntermediaryTable writes to another
>> kafka topic that is only used by the tumbling window aggregation. When I
>> try to group the two INSERT INTO statements within "BEGIN STATEMENT SET;
>> END;", it will fail complaining the topic does not exist. I either have to
>> first create this kafka topic beforehand, or run a separate job to INSERT
>> INTO IntermediaryTable.
>>
>> In Java DataStream API, you can easily do so within flink topology
>> without having to create a separate kafka topic:
>>
>> final DataStream<xxx> joinedStream =
>>                  StreamA.join(StreamB)
>>                  .where(xxxx)
>>                  .equalTo(xxxx)
>>                  .window(TumblingProcessingTimeWindows.of(Time.seconds(30)))
>>                  .apply(aggregation);
>>
>>
>> *Question:*
>> Does the Flink community have any suggestions on how to do this in
>> FlinkSQL in a friendly way? Would it be a good idea for FlinkSQL to support
>> defining eventtime and watermark on the fly without a table ddl? Would love
>> to hear any suggestions. Thanks a lot in advance.
>>
>> --
>> Best Wishes & Regards
>> Shawn Xiangcao Liu
>>
>

-- 
Best Wishes & Regards
Shawn Xiangcao Liu

Re: How to define event time and Watermark for intermediary joined table in FlinkSQL?

Posted by Shengkai Fang <fs...@gmail.com>.
Hi,

The watermark of the join operator is the minimum of the watermark of the
input streams.

```
JoinOperator.watermark = min(left.watermark, right.watermark);
```

I think it's enough for most cases.  Could you share more details about the
logic in the UDF getEventTimeInNS?

I think the better solution comparing to the intermediate table is to
define the watermark on the VIEW. But Flink doesn't support it now.

Best,
Shengkai




liuxiangcao <xi...@gmail.com> 于2022年4月16日周六 03:07写道:

> Hi Flink community,
>
> *Here is the context: *
> Theoretically, I would like to write following query but it won't work
> since we can only define the WATERMARK in a table DDL:
>
> INSERT into tableC
> select tableA.field1
>          SUM(1) as `count`,
>          time_ltz AS getEventTimeInNS(tableA.timestamp, tableB.timestamp),
>          WATERMARK FOR time_ltz AS time_ltz - INTERVAL '20' SECONDS
> from tableA join tableB
> on tableA.joinCol == tableB.joinCol
> group by TUMBLE(time_ltz, INTERVAL '30' SECOND), tableA.field1
> (note: getEventTimeInNS is a UDF that calculates event time using tableA.timestamp and tableB.timestamp)
>
>
> so I have to define a intermediary table to store the results from
> joining, and defining event time and watermark in the table DDL, then
> performs tumbling windowing on the intermediary table:
>
> CREATE TABLE IntermediaryTable (
>    field1,
>   `eventTimestampInNanoseconds`  BIGINT,
>    time_ltz AS TO_TIMESTAMP_LTZ(eventTimestampInNanoseconds/1000000, 3),
>    WATERMARK FOR time_ltz AS time_ltz - INTERVAL '20' SECONDS
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'IntermediaryTable',
>   'properties.bootstrap.servers' = 'xxxxxx',
>   'properties.group.id' = 'contextevent-streaming-sql',
>   'format' = 'avro'
> );
>
> INSERT INTO IntermediaryTable
> select tableA.field1
>           tableB.field2,
>           getEventTimeInNS(tableA.timestamp, tableB.timestamp),
> from tableA join tableB
> on tableA.joinCol == tableB.joinCol;
>
> Then, I can perform tumbling window aggregation on the IntermediaryTable:
>
> INSERT INTO countTable
> (select event.field1
>         SUM(1) as `count`
>  from IntermediaryTable event
>  GROUP BY
>   TUMBLE(event.time_ltz, INTERVAL '30' SECOND),
>   event.field1
> );
>
>
> This is not convenient because the IntermediaryTable writes to another
> kafka topic that is only used by the tumbling window aggregation. When I
> try to group the two INSERT INTO statements within "BEGIN STATEMENT SET;
> END;", it will fail complaining the topic does not exist. I either have to
> first create this kafka topic beforehand, or run a separate job to INSERT
> INTO IntermediaryTable.
>
> In Java DataStream API, you can easily do so within flink topology without
> having to create a separate kafka topic:
>
> final DataStream<xxx> joinedStream =
>                  StreamA.join(StreamB)
>                  .where(xxxx)
>                  .equalTo(xxxx)
>                  .window(TumblingProcessingTimeWindows.of(Time.seconds(30)))
>                  .apply(aggregation);
>
>
> *Question:*
> Does the Flink community have any suggestions on how to do this in
> FlinkSQL in a friendly way? Would it be a good idea for FlinkSQL to support
> defining eventtime and watermark on the fly without a table ddl? Would love
> to hear any suggestions. Thanks a lot in advance.
>
> --
> Best Wishes & Regards
> Shawn Xiangcao Liu
>