You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dominik Wosiński <wo...@gmail.com> on 2020/03/16 15:55:18 UTC

Issues with Watermark generation after join

Hey,
I have noticed a weird behavior with a job that I am currently working on.
I have 4 different streams from Kafka, lets call them A, B, C and D. Now
the idea is that first I do SQL Join of A & B based on some field, then I
create append stream from Joined A&B, let's call it E. Then I need to
assign timestamps to E since it is a result of joining and Flink can't
figure out the timestamps.

Next, I union E & C, to create some F stream. Then finally I connect E & C
using `keyBy` and CoProcessFunction. Now the issue I am facing is that if I
try to, it works fine if I enforce the parallelism of E to be 1 by invoking
*setParallelism*. But if parallelism is higher than 1, for the same data -
the watermark is not progressing correctly. I can see that *CoProcessFunction
*methods are invoked and that data is produced, but the Watermark is never
progressing for this function. What I can see is that watermark is always
equal to (0 - allowedOutOfOrderness). I can see that timestamps are
correctly extracted and when I add debug prints I can actually see that
Watermarks are generated for all streams, but for some reason, if the
parallelism is > 1 they will never progress up to connect function. Is
there anything that needs to be done after SQL joins that I don't know of
??

Best Regards,
Dom.

Re: Issues with Watermark generation after join

Posted by Timo Walther <tw...@apache.org>.
Or better: "But for sources, you need to emit a watermark from all 
sources in order to have progress in event-time."


On 24.03.20 13:09, Timo Walther wrote:
> Hi,
> 
> 1) yes with "partition" I meant "parallel instance".
> 
> If the watermarking is correct in the DataStream API. The Table API and 
> SQL will take care that it remains correct. E.g. you can only perform a 
> TUMBLE window if the timestamp column has not lost its time attribute 
> property. A regular JOIN (not time-versioned) does not work with 
> watermarks, thus, the result will not have time attributes anymore. A 
> subsequent TUMBLE window usage will fail with an exception.
> 
> 2) You don't need output. Most operators deal with watermarking logic. 
> But for sources, you need output from all sources in order to have 
> progress in event-time.
> 
> Regards,
> Timo
> 
> 
> On 24.03.20 12:21, Dominik Wosiński wrote:
>> Hey Timo,
>> Thanks a lot for this answer! I was mostly using the DataStream API, 
>> so that's good to know the difference.
>> I have followup questions then, I will be glad for clarification:
>>
>> 1) So, for the SQL Join operator, is the /partition /the parallel 
>> instance of operator or is it the table partitioning as defined by 
>> /partitionBy ??/
>> 2) Assuming that this is instance of parallel operator, does this mean 
>> that we need output from ALL operators so that the watermark 
>> progresses and the output is generated?
>>
>> Best Regards,
>> Dom.
>>
>> wt., 24 mar 2020 o 10:01 Timo Walther <twalthr@apache.org 
>> <ma...@apache.org>> napisał(a):
>>
>>     Hi Dominik,
>>
>>     the big conceptual difference between DataStream and Table API is 
>> that
>>     record timestamps are part of the schema in Table API whereas they 
>> are
>>     attached internally to each record in DataStream API. When you call
>>     `y.rowtime` during a stream to table conversion, the runtime will
>>     extract the internal timestamp and will copy it into the field `y`.
>>
>>     Even if the timestamp is not internally anymore, Flink makes sure 
>> that
>>     the watermarking (which still happens internally) remains valid.
>>     However, this means that timestamps and watermarks must already be
>>     correct when entering the Table API. If they were not correct before,
>>     they will also not trigger time-based operations correctly.
>>
>>     If there is no output for a parallelism > 1, usually this means that
>>     one
>>     source parition has not emitted a watermark to have progress globally
>>     for the job:
>>
>>     watermark of operator = min(previous operator partition 1, previous
>>     operator partition 2, ...)
>>
>>     I hope this helps.
>>
>>     Regards,
>>     Timo
>>
>>
>>     On 19.03.20 16:38, Dominik Wosiński wrote:
>>      > I have created a simple minimal reproducible example that shows
>>     what I
>>      > am talking about:
>>      > https://github.com/DomWos/FlinkTTF/tree/sql-ttf
>>      >
>>      > It contains a test that shows that even if the output is in order
>>     which
>>      > is enforced by multiple sleeps, then for parallelism > 1 there 
>> is no
>>      > output and for parallelism == 1, the output is produced normally.
>>      >
>>      > Best Regards,
>>      > Dom.
>>


Re: Issues with Watermark generation after join

Posted by Timo Walther <tw...@apache.org>.
Hi,

1) yes with "partition" I meant "parallel instance".

If the watermarking is correct in the DataStream API. The Table API and 
SQL will take care that it remains correct. E.g. you can only perform a 
TUMBLE window if the timestamp column has not lost its time attribute 
property. A regular JOIN (not time-versioned) does not work with 
watermarks, thus, the result will not have time attributes anymore. A 
subsequent TUMBLE window usage will fail with an exception.

2) You don't need output. Most operators deal with watermarking logic. 
But for sources, you need output from all sources in order to have 
progress in event-time.

Regards,
Timo


On 24.03.20 12:21, Dominik Wosiński wrote:
> Hey Timo,
> Thanks a lot for this answer! I was mostly using the DataStream API, so 
> that's good to know the difference.
> I have followup questions then, I will be glad for clarification:
> 
> 1) So, for the SQL Join operator, is the /partition /the parallel 
> instance of operator or is it the table partitioning as defined by 
> /partitionBy ??/
> 2) Assuming that this is instance of parallel operator, does this mean 
> that we need output from ALL operators so that the watermark progresses 
> and the output is generated?
> 
> Best Regards,
> Dom.
> 
> wt., 24 mar 2020 o 10:01 Timo Walther <twalthr@apache.org 
> <ma...@apache.org>> napisał(a):
> 
>     Hi Dominik,
> 
>     the big conceptual difference between DataStream and Table API is that
>     record timestamps are part of the schema in Table API whereas they are
>     attached internally to each record in DataStream API. When you call
>     `y.rowtime` during a stream to table conversion, the runtime will
>     extract the internal timestamp and will copy it into the field `y`.
> 
>     Even if the timestamp is not internally anymore, Flink makes sure that
>     the watermarking (which still happens internally) remains valid.
>     However, this means that timestamps and watermarks must already be
>     correct when entering the Table API. If they were not correct before,
>     they will also not trigger time-based operations correctly.
> 
>     If there is no output for a parallelism > 1, usually this means that
>     one
>     source parition has not emitted a watermark to have progress globally
>     for the job:
> 
>     watermark of operator = min(previous operator partition 1, previous
>     operator partition 2, ...)
> 
>     I hope this helps.
> 
>     Regards,
>     Timo
> 
> 
>     On 19.03.20 16:38, Dominik Wosiński wrote:
>      > I have created a simple minimal reproducible example that shows
>     what I
>      > am talking about:
>      > https://github.com/DomWos/FlinkTTF/tree/sql-ttf
>      >
>      > It contains a test that shows that even if the output is in order
>     which
>      > is enforced by multiple sleeps, then for parallelism > 1 there is no
>      > output and for parallelism == 1, the output is produced normally.
>      >
>      > Best Regards,
>      > Dom.
> 


Re: Issues with Watermark generation after join

Posted by Dominik Wosiński <wo...@gmail.com>.
Hey Timo,
Thanks a lot for this answer! I was mostly using the DataStream API, so
that's good to know the difference.
I have followup questions then, I will be glad for clarification:

1) So, for the SQL Join operator, is the *partition *the parallel instance
of operator or is it the table partitioning as defined by *partitionBy ??*
2) Assuming that this is instance of parallel operator, does this mean that
we need output from ALL operators so that the watermark progresses and the
output is generated?

Best Regards,
Dom.

wt., 24 mar 2020 o 10:01 Timo Walther <tw...@apache.org> napisał(a):

> Hi Dominik,
>
> the big conceptual difference between DataStream and Table API is that
> record timestamps are part of the schema in Table API whereas they are
> attached internally to each record in DataStream API. When you call
> `y.rowtime` during a stream to table conversion, the runtime will
> extract the internal timestamp and will copy it into the field `y`.
>
> Even if the timestamp is not internally anymore, Flink makes sure that
> the watermarking (which still happens internally) remains valid.
> However, this means that timestamps and watermarks must already be
> correct when entering the Table API. If they were not correct before,
> they will also not trigger time-based operations correctly.
>
> If there is no output for a parallelism > 1, usually this means that one
> source parition has not emitted a watermark to have progress globally
> for the job:
>
> watermark of operator = min(previous operator partition 1, previous
> operator partition 2, ...)
>
> I hope this helps.
>
> Regards,
> Timo
>
>
> On 19.03.20 16:38, Dominik Wosiński wrote:
> > I have created a simple minimal reproducible example that shows what I
> > am talking about:
> > https://github.com/DomWos/FlinkTTF/tree/sql-ttf
> >
> > It contains a test that shows that even if the output is in order which
> > is enforced by multiple sleeps, then for parallelism > 1 there is no
> > output and for parallelism == 1, the output is produced normally.
> >
> > Best Regards,
> > Dom.
>
>

Re: Issues with Watermark generation after join

Posted by Timo Walther <tw...@apache.org>.
Hi Dominik,

the big conceptual difference between DataStream and Table API is that 
record timestamps are part of the schema in Table API whereas they are 
attached internally to each record in DataStream API. When you call 
`y.rowtime` during a stream to table conversion, the runtime will 
extract the internal timestamp and will copy it into the field `y`.

Even if the timestamp is not internally anymore, Flink makes sure that 
the watermarking (which still happens internally) remains valid. 
However, this means that timestamps and watermarks must already be 
correct when entering the Table API. If they were not correct before, 
they will also not trigger time-based operations correctly.

If there is no output for a parallelism > 1, usually this means that one 
source parition has not emitted a watermark to have progress globally 
for the job:

watermark of operator = min(previous operator partition 1, previous 
operator partition 2, ...)

I hope this helps.

Regards,
Timo


On 19.03.20 16:38, Dominik Wosiński wrote:
> I have created a simple minimal reproducible example that shows what I 
> am talking about:
> https://github.com/DomWos/FlinkTTF/tree/sql-ttf
> 
> It contains a test that shows that even if the output is in order which 
> is enforced by multiple sleeps, then for parallelism > 1 there is no 
> output and for parallelism == 1, the output is produced normally.
> 
> Best Regards,
> Dom.


Re: Issues with Watermark generation after join

Posted by Dominik Wosiński <wo...@gmail.com>.
I have created a simple minimal reproducible example that shows what I am
talking about:
https://github.com/DomWos/FlinkTTF/tree/sql-ttf

It contains a test that shows that even if the output is in order which is
enforced by multiple sleeps, then for parallelism > 1 there is no output
and for parallelism == 1, the output is produced normally.

Best Regards,
Dom.

Re: Issues with Watermark generation after join

Posted by Dominik Wosiński <wo...@gmail.com>.
Hey sure,
the original Temporal Table SQL is:

|SELECT e.*, f.level as level FROM
| enablers AS e,
| LATERAL TABLE (Detectors(e.timestamp)) AS f
| WHERE e.id= f.id
|""

And the previous SQL query to join A&B is something like :

SELECT *
| FROM A te,
| B s
| WHERE s.id = te.id AND s.level = te.level AND s.timestamp = te.timestamp


Also, if I replace the SQL to Join A&B with BroadcastProcessFunction this
works like a charm, everything is calculated correctly. Even if I don't
change the parallelism.

I have noticed one more weird behavior, after the temporal table Join I
have a windowing function to process the data. Now I have two options, in
TTF I can select the rowtime with type Timestamp and assign it to field in
output class, this automatically passes the Timestamp over so I don't need
to assign it again. But I could also select just a Long field that is not
marked as rowtime (even if they actually have the same value but this field
was not marked with *.rowtime* on declaration) and then I will need to
assign the timestamps and watermarks again, since Flink doesn't now what is
the timestamp. Now, the former solution works like a charm, but for the
latter one there is actually no output visible from the windowing function.
My expectation is that both solutions should work exactly the same and pass
the timestamps in the same manner, but apparently they are don't.

Best Regards,
Dom.

>

Re: Issues with Watermark generation after join

Posted by Kurt Young <yk...@gmail.com>.
Hi, could you share the SQL you written for your original purpose, not the
one you attached ProcessFunction for debugging?

Best,
Kurt


On Tue, Mar 17, 2020 at 3:08 AM Dominik Wosiński <wo...@gmail.com> wrote:

> Actually, I just put this process function there for debugging purposes.
> My main goal is to join the E & C using the Temporal Table function, but I
> have observed exactly the same behavior i.e. when the parallelism was > 1
> there was no output and when I was setting it to 1 then the output was
> generated. So, I have switched to process function to see whether the
> watermarks are reaching this stage.
>
> Best Regards,
> Dom.
>
> pon., 16 mar 2020 o 19:46 Theo Diefenthal <
> theo.diefenthal@scoop-software.de> napisał(a):
>
>> Hi Dominik,
>>
>> I had the same once with a custom processfunction. My processfunction
>> buffered the data for a while and then output it again. As the proces
>> function can do anything with the data (transforming, buffering,
>> aggregating...), I think it's just not safe for flink to reason about the
>> watermark of the output.
>>
>> I solved all my issues by calling `assignTimestampsAndWatermarks`
>> directly post to the (co-)process function.
>>
>> Best regards
>> Theo
>>
>> ------------------------------
>> *Von: *"Dominik Wosiński" <wo...@gmail.com>
>> *An: *"user" <us...@flink.apache.org>
>> *Gesendet: *Montag, 16. März 2020 16:55:18
>> *Betreff: *Issues with Watermark generation after join
>>
>> Hey,
>> I have noticed a weird behavior with a job that I am currently working
>> on. I have 4 different streams from Kafka, lets call them A, B, C and D.
>> Now the idea is that first I do SQL Join of A & B based on some field, then
>> I create append stream from Joined A&B, let's call it E. Then I need to
>> assign timestamps to E since it is a result of joining and Flink can't
>> figure out the timestamps.
>>
>> Next, I union E & C, to create some F stream. Then finally I connect E &
>> C using `keyBy` and CoProcessFunction. Now the issue I am facing is that if
>> I try to, it works fine if I enforce the parallelism of E to be 1 by
>> invoking *setParallelism*. But if parallelism is higher than 1, for the
>> same data - the watermark is not progressing correctly. I can see that *CoProcessFunction
>> *methods are invoked and that data is produced, but the Watermark is
>> never progressing for this function. What I can see is that watermark is
>> always equal to (0 - allowedOutOfOrderness). I can see that timestamps are
>> correctly extracted and when I add debug prints I can actually see that
>> Watermarks are generated for all streams, but for some reason, if the
>> parallelism is > 1 they will never progress up to connect function. Is
>> there anything that needs to be done after SQL joins that I don't know of
>> ??
>>
>> Best Regards,
>> Dom.
>>
>

Re: Issues with Watermark generation after join

Posted by Dominik Wosiński <wo...@gmail.com>.
Actually, I just put this process function there for debugging purposes. My
main goal is to join the E & C using the Temporal Table function, but I
have observed exactly the same behavior i.e. when the parallelism was > 1
there was no output and when I was setting it to 1 then the output was
generated. So, I have switched to process function to see whether the
watermarks are reaching this stage.

Best Regards,
Dom.

pon., 16 mar 2020 o 19:46 Theo Diefenthal <th...@scoop-software.de>
napisał(a):

> Hi Dominik,
>
> I had the same once with a custom processfunction. My processfunction
> buffered the data for a while and then output it again. As the proces
> function can do anything with the data (transforming, buffering,
> aggregating...), I think it's just not safe for flink to reason about the
> watermark of the output.
>
> I solved all my issues by calling `assignTimestampsAndWatermarks` directly
> post to the (co-)process function.
>
> Best regards
> Theo
>
> ------------------------------
> *Von: *"Dominik Wosiński" <wo...@gmail.com>
> *An: *"user" <us...@flink.apache.org>
> *Gesendet: *Montag, 16. März 2020 16:55:18
> *Betreff: *Issues with Watermark generation after join
>
> Hey,
> I have noticed a weird behavior with a job that I am currently working on.
> I have 4 different streams from Kafka, lets call them A, B, C and D. Now
> the idea is that first I do SQL Join of A & B based on some field, then I
> create append stream from Joined A&B, let's call it E. Then I need to
> assign timestamps to E since it is a result of joining and Flink can't
> figure out the timestamps.
>
> Next, I union E & C, to create some F stream. Then finally I connect E & C
> using `keyBy` and CoProcessFunction. Now the issue I am facing is that if I
> try to, it works fine if I enforce the parallelism of E to be 1 by invoking
> *setParallelism*. But if parallelism is higher than 1, for the same data
> - the watermark is not progressing correctly. I can see that *CoProcessFunction
> *methods are invoked and that data is produced, but the Watermark is
> never progressing for this function. What I can see is that watermark is
> always equal to (0 - allowedOutOfOrderness). I can see that timestamps are
> correctly extracted and when I add debug prints I can actually see that
> Watermarks are generated for all streams, but for some reason, if the
> parallelism is > 1 they will never progress up to connect function. Is
> there anything that needs to be done after SQL joins that I don't know of
> ??
>
> Best Regards,
> Dom.
>

Re: Issues with Watermark generation after join

Posted by Theo Diefenthal <th...@scoop-software.de>.
Hi Dominik, 

I had the same once with a custom processfunction. My processfunction buffered the data for a while and then output it again. As the proces function can do anything with the data (transforming, buffering, aggregating...), I think it's just not safe for flink to reason about the watermark of the output. 

I solved all my issues by calling `assignTimestampsAndWatermarks` directly post to the (co-)process function. 

Best regards 
Theo 


Von: "Dominik Wosiński" <wo...@gmail.com> 
An: "user" <us...@flink.apache.org> 
Gesendet: Montag, 16. März 2020 16:55:18 
Betreff: Issues with Watermark generation after join 

Hey, 
I have noticed a weird behavior with a job that I am currently working on. I have 4 different streams from Kafka, lets call them A, B, C and D. Now the idea is that first I do SQL Join of A & B based on some field, then I create append stream from Joined A&B, let's call it E. Then I need to assign timestamps to E since it is a result of joining and Flink can't figure out the timestamps. 

Next, I union E & C, to create some F stream. Then finally I connect E & C using `keyBy` and CoProcessFunction. Now the issue I am facing is that if I try to, it works fine if I enforce the parallelism of E to be 1 by invoking setParallelism . But if parallelism is higher than 1, for the same data - the watermark is not progressing correctly. I can see that CoProcessFunction methods are invoked and that data is produced, but the Watermark is never progressing for this function. What I can see is that watermark is always equal to (0 - allowedOutOfOrderness). I can see that timestamps are correctly extracted and when I add debug prints I can actually see that Watermarks are generated for all streams, but for some reason, if the parallelism is > 1 they will never progress up to connect function. Is there anything that needs to be done after SQL joins that I don't know of ?? 

Best Regards, 
Dom.