You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Elias Levy <fe...@gmail.com> on 2018/05/24 14:07:06 UTC

Multiple stream operator watermark handling

Is there mechanism for a multiple stream operator to ignore watermarks from
one of the streams?

The use case is a multiple stream operator that consumes a primary stream
and a secondary control stream.  The control stream may only receive
messages in rare occasion, and possibly never.  The default behavior of the
operator is to only emit the lowest of the last watermark received from
each input stream.  That means that event time fails to advance if there
are no control messages.

I also notice that FLIP-17, the Side Input proposal, does not address this
issue, either in the Wiki or in the Google Docs.

Assuming there is no currently prescribed way to handle this, are folks
taking care of this by introducing a new Assigner after the multiple input
operator to generate watermarks?

Re: Multiple stream operator watermark handling

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Great to hear that this worked out for you :)

Progression of watermarks on an empty stream is a known issue, that we are working on to resolve in the future. Usually recommended workarounds are to send a custom blank event (which should be ignored) once a while.

I have expanded the documentation:
https://github.com/apache/flink/pull/6076 <https://github.com/apache/flink/pull/6076>
Please check it and If you have any further suggestions you are welcome to make a comments in the PR. I hope it clarifies the behaviour.

Piotrek

> On 25 May 2018, at 00:03, Elias Levy <fe...@gmail.com> wrote:
> 
> On Thu, May 24, 2018 at 9:20 AM, Elias Levy <fearsome.lucidity@gmail.com <ma...@gmail.com>> wrote:
> On Thu, May 24, 2018 at 7:26 AM, Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>> wrote:
> From top of my head I can imagine two solutions:
> 
> 1. Override the default behaviour of the operator via for example org.apache.flink.streaming.api.datastream.ConnectedStreams#transform
> 
> That seems the safer, but more complicated path.
> 
> As we had already implemented the business logic in a RichCoFlatMapFunction, I ended up extending CoStreamFlatMap:
> 
> class SingleWatermarkCoFlatMap[IN1,IN2,OUT](flatMapper: CoFlatMapFunction[IN1,IN2,OUT]) extends CoStreamFlatMap(flatMapper)  {
> 
>   // Pass through the watermarks from the first stream
>   override def processWatermark1(mark: Watermark): Unit = processWatermark(mark)
> 
>   // Ignore watermarks from the second stream
>   override def processWatermark2(mark: Watermark): Unit = {}
> }
> 
> 
> Then it was easy to replace:
> 
> stream1
>       .connect(stream2)
>       .flatMap( new BusinessCoFlatMapFunction(params) )
>         .name("Operator")
>         .uid("op")
> 
> with:
> 
> stream1
>       .connect(stream2)
>       .transform("Operator", new SingleWatermarkCoFlatMap[X,Y,Z](new BusinessCoFlatMapFunction(params)))
>       .uid("op")
> 
> 


Re: Multiple stream operator watermark handling

Posted by Elias Levy <fe...@gmail.com>.
On Thu, May 24, 2018 at 9:20 AM, Elias Levy <fe...@gmail.com>
wrote:

> On Thu, May 24, 2018 at 7:26 AM, Piotr Nowojski <pi...@data-artisans.com>
> wrote:
>
>> From top of my head I can imagine two solutions:
>>
>> 1. Override the default behaviour of the operator via for example
>> org.apache.flink.streaming.api.datastream.ConnectedStreams#transform
>>
>
> That seems the safer, but more complicated path.
>

As we had already implemented the business logic in
a RichCoFlatMapFunction, I ended up extending CoStreamFlatMap:

class SingleWatermarkCoFlatMap[IN1,IN2,OUT](flatMapper:
CoFlatMapFunction[IN1,IN2,OUT]) extends CoStreamFlatMap(flatMapper)  {

  // Pass through the watermarks from the first stream
  override def processWatermark1(mark: Watermark): Unit =
processWatermark(mark)

  // Ignore watermarks from the second stream
  override def processWatermark2(mark: Watermark): Unit = {}
}


Then it was easy to replace:

stream1
      .connect(stream2)
      .flatMap( new BusinessCoFlatMapFunction(params) )
        .name("Operator")
        .uid("op")

with:

stream1
      .connect(stream2)
      .transform("Operator", new
SingleWatermarkCoFlatMap[X,Y,Z](new BusinessCoFlatMapFunction(params)))
      .uid("op")

Re: Multiple stream operator watermark handling

Posted by Elias Levy <fe...@gmail.com>.
On Thu, May 24, 2018 at 7:26 AM, Piotr Nowojski <pi...@data-artisans.com>
wrote:

> From top of my head I can imagine two solutions:
>
> 1. Override the default behaviour of the operator via for example
> org.apache.flink.streaming.api.datastream.ConnectedStreams#transform
>

That seems the safer, but more complicated path.


2. Can you set control stream’s watermark to Watermark#MAX_WATERMARK or
> maybe Watermark#MAX_WATERMARK - 1 ?
>

That seems simpler, put potentially perilous if at some point in the future
there was some use to control stream watermarks.  Also, would it work if
there are no messages in the control stream?  Wouldn't that mean no
watermark would be emitted, even if they were hardcoded to Long.MAX_VALUE?
In which case, the operator default for the stream would be used, which
would still be Long.MIN_VALUE.


BTW, this reminds me of an issue I've mentioned previously, the
documentation is lacking on a description of how watermarks are processed
by operators.  E.g. when does a window emit watermarks?  what watermarks
does it emit?  That seems like a rather large omission, as one of the main
features of Flink is event time processing, which puts watermarks almost on
equal footing to data and data operations.  Just as the docs describe how
data is process, merged, etc, the same should be true for watermarks.

Re: Multiple stream operator watermark handling

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,

From top of my head I can imagine two solutions:

1. Override the default behaviour of the operator via for example org.apache.flink.streaming.api.datastream.ConnectedStreams#transform

2. Can you set control stream’s watermark to Watermark#MAX_WATERMARK or maybe Watermark#MAX_WATERMARK - 1 ?

Piotrek

> On 24 May 2018, at 16:07, Elias Levy <fe...@gmail.com> wrote:
> 
> Is there mechanism for a multiple stream operator to ignore watermarks from one of the streams?
> 
> The use case is a multiple stream operator that consumes a primary stream and a secondary control stream.  The control stream may only receive messages in rare occasion, and possibly never.  The default behavior of the operator is to only emit the lowest of the last watermark received from each input stream.  That means that event time fails to advance if there are no control messages.  
> 
> I also notice that FLIP-17, the Side Input proposal, does not address this issue, either in the Wiki or in the Google Docs.
> 
> Assuming there is no currently prescribed way to handle this, are folks taking care of this by introducing a new Assigner after the multiple input operator to generate watermarks?
> 
>