You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Seth Wiesman <sw...@mediamath.com> on 2017/12/13 16:10:19 UTC

Watermark in broadcast

Hi,

How are watermarks propagated during a broadcast partition? I have a TwoInputStreamTransformation that takes a broadcast stream as one of its inputs. Both streams are assigned timestamps and watermarks before being connected however I only ever see watermarks from my non-broadcast stream. Is this expected behavior? Currently I have overridden processWatermark1 to unconditionally call processWatermark but that does not seem like an ideal solution.

Thank you,
[cid:image001.png@01D37402.F5C0B480]<http://www.mediamath.com/>

Seth Wiesman | Software Engineer, Data

4 World Trade Center, 46th Floor, New York, NY 10007




Re: Watermark in broadcast

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Seth,

Some clarifications to point out:

Quick follow up question. Is there some way to notify a TimestampAssigner that is consuming from an idle source? 

In Flink, idle sources would emit a special idleness marker event that notifies downstream time-based operators to not wait for its watermark.
This would avoid the need for the TimestampAssigner to generate watermarks just for the sake of letting downstream operators to advance their watermark in the event of idle sources.
However, there is 2 cases for idle sources and only one of them is handled at the moment: 1) the source subtask simply has no Kafka partitions to read from, or 2) the Kafka partitions do not have any records.
Only case 1) is handled, as of Flink 1.3+.

I think you are correct. This stream is consumed from Kafka and the number of partitions is much less than the parallelism of the program so there would be many partitions that never forward watermarks greater than Long.Min_Value.

In this case, Flink consumer subtasks that do not have Kafka partitions would mark themselves as idle and emit the special idleness marker.
Therefore, the expected behavior is that downstream time-based operators will not wait on these idle sources, even if they don’t produce watermarks.

Best,
Gordon

On 14 December 2017 at 6:08:20 PM, Fabian Hueske (fhueske@gmail.com) wrote:

Hi Seth,

that's not possible with the current interface.
There have been some discussions about how to address issues of idle sources (or partitions).
Aljoscha (in CC) should know more about that.

Best, Fabian

2017-12-13 18:13 GMT+01:00 Seth Wiesman <sw...@mediamath.com>:
Quick follow up question. Is there some way to notify a TimestampAssigner that is consuming from an idle source?

 



Seth Wiesman | Software Engineer, Data


4 World Trade Center, 46th Floor, New York, NY 10007


 

 

From: Seth Wiesman <sw...@mediamath.com>
Date: Wednesday, December 13, 2017 at 12:04 PM
To: Timo Walther <tw...@apache.org>, "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: Watermark in broadcast

 

Hi Timo,

 

I think you are correct. This stream is consumed from Kafka and the number of partitions is much less than the parallelism of the program so there would be many partitions that never forward watermarks greater than Long.Min_Value.

 

Thank you for the quick response.  

 



Seth Wiesman | Software Engineer, Data


4 World Trade Center, 46th Floor, New York, NY 10007


 

 

From: Timo Walther <tw...@apache.org>
Date: Wednesday, December 13, 2017 at 11:46 AM
To: "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: Watermark in broadcast

 

Hi Seth,

are you sure that all partitions of the broadcasted stream send a watermark? processWatermark is only called if a minimum watermark arrived from all partitions.

Regards,
Timo

Am 12/13/17 um 5:10 PM schrieb Seth Wiesman:

Hi,

 

How are watermarks propagated during a broadcast partition? I have a TwoInputStreamTransformation that takes a broadcast stream as one of its inputs. Both streams are assigned timestamps and watermarks before being connected however I only ever see watermarks from my non-broadcast stream. Is this expected behavior? Currently I have overridden processWatermark1 to unconditionally call processWatermark but that does not seem like an ideal solution.

 

Thank you,



Seth Wiesman | Software Engineer, Data


4 World Trade Center, 46th Floor, New York, NY 10007


 

 



Re: Watermark in broadcast

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Seth,

that's not possible with the current interface.
There have been some discussions about how to address issues of idle
sources (or partitions).
Aljoscha (in CC) should know more about that.

Best, Fabian

2017-12-13 18:13 GMT+01:00 Seth Wiesman <sw...@mediamath.com>:

> Quick follow up question. Is there some way to notify a TimestampAssigner
> that is consuming from an idle source?
>
>
>
> <http://www.mediamath.com/>
>
> *Seth Wiesman *| Software Engineer, Data
>
> 4 World Trade Center, 46th Floor, New York, NY 10007
>
>
>
>
>
> *From: *Seth Wiesman <sw...@mediamath.com>
> *Date: *Wednesday, December 13, 2017 at 12:04 PM
> *To: *Timo Walther <tw...@apache.org>, "user@flink.apache.org" <
> user@flink.apache.org>
> *Subject: *Re: Watermark in broadcast
>
>
>
> Hi Timo,
>
>
>
> I think you are correct. This stream is consumed from Kafka and the number
> of partitions is much less than the parallelism of the program so there
> would be many partitions that never forward watermarks greater than
> Long.Min_Value.
>
>
>
> Thank you for the quick response.
>
>
>
> <http://www.mediamath.com/>
>
> *Seth Wiesman *| Software Engineer, Data
>
> 4 World Trade Center, 46th Floor, New York, NY 10007
>
>
>
>
>
> *From: *Timo Walther <tw...@apache.org>
> *Date: *Wednesday, December 13, 2017 at 11:46 AM
> *To: *"user@flink.apache.org" <us...@flink.apache.org>
> *Subject: *Re: Watermark in broadcast
>
>
>
> Hi Seth,
>
> are you sure that all partitions of the broadcasted stream send a
> watermark? processWatermark is only called if a minimum watermark arrived
> from all partitions.
>
> Regards,
> Timo
>
> Am 12/13/17 um 5:10 PM schrieb Seth Wiesman:
>
> Hi,
>
>
>
> How are watermarks propagated during a broadcast partition? I have a
> TwoInputStreamTransformation that takes a broadcast stream as one of its
> inputs. Both streams are assigned timestamps and watermarks before being
> connected however I only ever see watermarks from my non-broadcast stream.
> Is this expected behavior? Currently I have overridden processWatermark1 to
> unconditionally call processWatermark but that does not seem like an ideal
> solution.
>
>
>
> Thank you,
>
> <http://www.mediamath.com/>
>
> *Seth Wiesman *| Software Engineer, Data
>
> 4 World Trade Center, 46th Floor, New York, NY 10007
>
>
>
>
>

Re: Watermark in broadcast

Posted by Seth Wiesman <sw...@mediamath.com>.
Quick follow up question. Is there some way to notify a TimestampAssigner that is consuming from an idle source?

[cid:image001.png@01D3740B.CADE87C0]<http://www.mediamath.com/>

Seth Wiesman | Software Engineer, Data

4 World Trade Center, 46th Floor, New York, NY 10007




From: Seth Wiesman <sw...@mediamath.com>
Date: Wednesday, December 13, 2017 at 12:04 PM
To: Timo Walther <tw...@apache.org>, "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: Watermark in broadcast

Hi Timo,

I think you are correct. This stream is consumed from Kafka and the number of partitions is much less than the parallelism of the program so there would be many partitions that never forward watermarks greater than Long.Min_Value.

Thank you for the quick response.

[cid:image001.png@01D3740B.CADE87C0]<http://www.mediamath.com/>

Seth Wiesman | Software Engineer, Data

4 World Trade Center, 46th Floor, New York, NY 10007




From: Timo Walther <tw...@apache.org>
Date: Wednesday, December 13, 2017 at 11:46 AM
To: "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: Watermark in broadcast

Hi Seth,

are you sure that all partitions of the broadcasted stream send a watermark? processWatermark is only called if a minimum watermark arrived from all partitions.

Regards,
Timo

Am 12/13/17 um 5:10 PM schrieb Seth Wiesman:
Hi,

How are watermarks propagated during a broadcast partition? I have a TwoInputStreamTransformation that takes a broadcast stream as one of its inputs. Both streams are assigned timestamps and watermarks before being connected however I only ever see watermarks from my non-broadcast stream. Is this expected behavior? Currently I have overridden processWatermark1 to unconditionally call processWatermark but that does not seem like an ideal solution.

Thank you,
[cid:image001.png@01D3740B.CADE87C0]<http://www.mediamath.com/>

Seth Wiesman | Software Engineer, Data

4 World Trade Center, 46th Floor, New York, NY 10007






Re: Watermark in broadcast

Posted by Seth Wiesman <sw...@mediamath.com>.
Hi Timo,

I think you are correct. This stream is consumed from Kafka and the number of partitions is much less than the parallelism of the program so there would be many partitions that never forward watermarks greater than Long.Min_Value.

Thank you for the quick response.

[cid:image001.png@01D3740A.880106E0]<http://www.mediamath.com/>

Seth Wiesman | Software Engineer, Data

4 World Trade Center, 46th Floor, New York, NY 10007




From: Timo Walther <tw...@apache.org>
Date: Wednesday, December 13, 2017 at 11:46 AM
To: "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: Watermark in broadcast

Hi Seth,

are you sure that all partitions of the broadcasted stream send a watermark? processWatermark is only called if a minimum watermark arrived from all partitions.

Regards,
Timo

Am 12/13/17 um 5:10 PM schrieb Seth Wiesman:
Hi,

How are watermarks propagated during a broadcast partition? I have a TwoInputStreamTransformation that takes a broadcast stream as one of its inputs. Both streams are assigned timestamps and watermarks before being connected however I only ever see watermarks from my non-broadcast stream. Is this expected behavior? Currently I have overridden processWatermark1 to unconditionally call processWatermark but that does not seem like an ideal solution.

Thank you,
[cid:image001.png@01D3740A.880106E0]<http://www.mediamath.com/>

Seth Wiesman | Software Engineer, Data

4 World Trade Center, 46th Floor, New York, NY 10007






Re: Watermark in broadcast

Posted by Timo Walther <tw...@apache.org>.
Hi Seth,

are you sure that all partitions of the broadcasted stream send a 
watermark? processWatermark is only called if a minimum watermark 
arrived from all partitions.

Regards,
Timo

Am 12/13/17 um 5:10 PM schrieb Seth Wiesman:
>
> Hi,
>
> How are watermarks propagated during a broadcast partition? I have a 
> TwoInputStreamTransformation that takes a broadcast stream as one of 
> its inputs. Both streams are assigned timestamps and watermarks before 
> being connected however I only ever see watermarks from my 
> non-broadcast stream. Is this expected behavior? Currently I have 
> overridden processWatermark1 to unconditionally call processWatermark 
> but that does not seem like an ideal solution.
>
> Thank you,
>
> <http://www.mediamath.com/>
>
> 	
>
> *Seth Wiesman *| Software Engineer, Data

>
> 4 World Trade Center, 46th Floor, New York, NY 10007

>