You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Alexander Gryzlov <al...@gmail.com> on 2016/01/26 19:32:22 UTC

Streaming left outer join

Hello,

I'm trying to implement a left outer join of two Kafka streams within a
sliding window. So far I have the following code:

foos
  .coGroup(bars)
  .where(_.baz).equalTo(_.baz)
  .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.MINUTES), Time.of(1,
TimeUnit.SECONDS)))
  .apply((fs: Iterator[Foo], bs: Iterator[Bar], o: Collector[FooBar]) =>
   fs.foreach(f =>
    if (bs.isEmpty)
      o.collect(FooBar(f, None))
    else
      bs.foreach(b => o.collect(FooBar(f, Some(b))))
   )
  )

However, this results in the pair being emitted from every window slide,
regardless of the match. The desired behaviour would be:
* emit the the match as soon as it's found, don't emit any more pairs for
it,
* otherwise, emit the empty match, when the left side element leaves the
last of its windows

What would be the idiomatic/efficient way to implement such behaviour? Is
it possible at all with the coGroup/window mechanism, or some other way is
necessary?

Alex

Re: Streaming left outer join

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
I think hacking the StreamJoinOperator could work for you. In Flink a StreamOperator is essentially a more low-level version of a FlatMap. It receives elements by the processElement() method and can emit elements using the Output object output which is a souped up Collector that also allows for watermark emission. Also, the StreamOperator has a StreamingRuntimeContext that can be used to register timer callbacks to perform work on specific time intervals or after timeouts. One think to note is that StreamOperators always deal with StreamRecord<T> which is a wrapper for the user type T that also has a timestamp.

Cheers,
Aljoscha
> On 28 Jan 2016, at 14:31, Alexander Gryzlov <al...@gmail.com> wrote:
> 
> Hello Stephan,
> 
> Yes, I've seen this one before, but AFAIU this is a different use-case: they need an inner join with 2 different windows, whereas I'm ok with a single window, but need an outer join with different semantics... Their StreamJoinOperator, however looks roughly fitting, so I'll probably start by hacking it; unless Aljoscha or somebody else with more experience than me has a better idea :)
> 
> Alex
> 
> On Wed, Jan 27, 2016 at 10:53 PM, Stephan Ewen <se...@apache.org> wrote:
> Hi!
> 
> I think this pull request may be implementing what you are looking for: https://github.com/apache/flink/pull/1527
> 
> Stephan
> 
> 
> On Wed, Jan 27, 2016 at 2:06 PM, Alexander Gryzlov <al...@gmail.com> wrote:
> Hello Aljoscha,
> 
> Indeed, it seems like I'd need a custom operator. I imagine this involves implementing org.apache.flink.streaming.api.operators.TwoInputStreamOperator? Could you provide those pointers please? 
> 
> Alex
> 
> On Wed, Jan 27, 2016 at 12:03 PM, Aljoscha Krettek <al...@apache.org> wrote:
> Hi,
> I’m afraid there is currently now way to do what you want with the builtin window primitives. Each of the slices of the sliding windows is essentially evaluated independently. Therefore, there cannot be effects in one slice that influence processing of another slice.
> 
> What you could do is switch to tumbling windows, then each element would only be in one window. That probably won’t fit your use case anymore. The alternative I see to that is to implement everything in a custom operator where you deal with window states and triggering on time yourself. Let me know if you need some pointers about that one.
> 
> Cheers,
> Aljoscha
> > On 26 Jan 2016, at 19:32, Alexander Gryzlov <al...@gmail.com> wrote:
> >
> > Hello,
> >
> > I'm trying to implement a left outer join of two Kafka streams within a sliding window. So far I have the following code:
> >
> > foos
> >   .coGroup(bars)
> >   .where(_.baz).equalTo(_.baz)
> >   .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.MINUTES), Time.of(1, TimeUnit.SECONDS)))
> >   .apply((fs: Iterator[Foo], bs: Iterator[Bar], o: Collector[FooBar]) =>
> >    fs.foreach(f =>
> >     if (bs.isEmpty)
> >       o.collect(FooBar(f, None))
> >     else
> >       bs.foreach(b => o.collect(FooBar(f, Some(b))))
> >    )
> >   )
> >
> > However, this results in the pair being emitted from every window slide, regardless of the match. The desired behaviour would be:
> > * emit the the match as soon as it's found, don't emit any more pairs for it,
> > * otherwise, emit the empty match, when the left side element leaves the last of its windows
> >
> > What would be the idiomatic/efficient way to implement such behaviour? Is it possible at all with the coGroup/window mechanism, or some other way is necessary?
> >
> > Alex
> 
> 
> 
> 


Re: Streaming left outer join

Posted by Alexander Gryzlov <al...@gmail.com>.
Hello Stephan,

Yes, I've seen this one before, but AFAIU this is a different use-case:
they need an inner join with 2 different windows, whereas I'm ok with a
single window, but need an outer join with different semantics...
Their StreamJoinOperator, however looks roughly fitting, so I'll probably
start by hacking it; unless Aljoscha or somebody else with more experience
than me has a better idea :)

Alex

On Wed, Jan 27, 2016 at 10:53 PM, Stephan Ewen <se...@apache.org> wrote:

> Hi!
>
> I think this pull request may be implementing what you are looking for:
> https://github.com/apache/flink/pull/1527
>
> Stephan
>
>
> On Wed, Jan 27, 2016 at 2:06 PM, Alexander Gryzlov <alex.gryzlov@gmail.com
> > wrote:
>
>> Hello Aljoscha,
>>
>> Indeed, it seems like I'd need a custom operator. I imagine this
>> involves
>> implementing org.apache.flink.streaming.api.operators.TwoInputStreamOperator? Could
>> you provide those pointers please?
>>
>> Alex
>>
>> On Wed, Jan 27, 2016 at 12:03 PM, Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>>> Hi,
>>> I’m afraid there is currently now way to do what you want with the
>>> builtin window primitives. Each of the slices of the sliding windows is
>>> essentially evaluated independently. Therefore, there cannot be effects in
>>> one slice that influence processing of another slice.
>>>
>>> What you could do is switch to tumbling windows, then each element would
>>> only be in one window. That probably won’t fit your use case anymore. The
>>> alternative I see to that is to implement everything in a custom operator
>>> where you deal with window states and triggering on time yourself. Let me
>>> know if you need some pointers about that one.
>>>
>>> Cheers,
>>> Aljoscha
>>> > On 26 Jan 2016, at 19:32, Alexander Gryzlov <al...@gmail.com>
>>> wrote:
>>> >
>>> > Hello,
>>> >
>>> > I'm trying to implement a left outer join of two Kafka streams within
>>> a sliding window. So far I have the following code:
>>> >
>>> > foos
>>> >   .coGroup(bars)
>>> >   .where(_.baz).equalTo(_.baz)
>>> >   .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.MINUTES),
>>> Time.of(1, TimeUnit.SECONDS)))
>>> >   .apply((fs: Iterator[Foo], bs: Iterator[Bar], o: Collector[FooBar])
>>> =>
>>> >    fs.foreach(f =>
>>> >     if (bs.isEmpty)
>>> >       o.collect(FooBar(f, None))
>>> >     else
>>> >       bs.foreach(b => o.collect(FooBar(f, Some(b))))
>>> >    )
>>> >   )
>>> >
>>> > However, this results in the pair being emitted from every window
>>> slide, regardless of the match. The desired behaviour would be:
>>> > * emit the the match as soon as it's found, don't emit any more pairs
>>> for it,
>>> > * otherwise, emit the empty match, when the left side element leaves
>>> the last of its windows
>>> >
>>> > What would be the idiomatic/efficient way to implement such behaviour?
>>> Is it possible at all with the coGroup/window mechanism, or some other way
>>> is necessary?
>>> >
>>> > Alex
>>>
>>>
>>
>

Re: Streaming left outer join

Posted by Stephan Ewen <se...@apache.org>.
Hi!

I think this pull request may be implementing what you are looking for:
https://github.com/apache/flink/pull/1527

Stephan


On Wed, Jan 27, 2016 at 2:06 PM, Alexander Gryzlov <al...@gmail.com>
wrote:

> Hello Aljoscha,
>
> Indeed, it seems like I'd need a custom operator. I imagine this involves
> implementing org.apache.flink.streaming.api.operators.TwoInputStreamOperator? Could
> you provide those pointers please?
>
> Alex
>
> On Wed, Jan 27, 2016 at 12:03 PM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Hi,
>> I’m afraid there is currently now way to do what you want with the
>> builtin window primitives. Each of the slices of the sliding windows is
>> essentially evaluated independently. Therefore, there cannot be effects in
>> one slice that influence processing of another slice.
>>
>> What you could do is switch to tumbling windows, then each element would
>> only be in one window. That probably won’t fit your use case anymore. The
>> alternative I see to that is to implement everything in a custom operator
>> where you deal with window states and triggering on time yourself. Let me
>> know if you need some pointers about that one.
>>
>> Cheers,
>> Aljoscha
>> > On 26 Jan 2016, at 19:32, Alexander Gryzlov <al...@gmail.com>
>> wrote:
>> >
>> > Hello,
>> >
>> > I'm trying to implement a left outer join of two Kafka streams within a
>> sliding window. So far I have the following code:
>> >
>> > foos
>> >   .coGroup(bars)
>> >   .where(_.baz).equalTo(_.baz)
>> >   .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.MINUTES),
>> Time.of(1, TimeUnit.SECONDS)))
>> >   .apply((fs: Iterator[Foo], bs: Iterator[Bar], o: Collector[FooBar]) =>
>> >    fs.foreach(f =>
>> >     if (bs.isEmpty)
>> >       o.collect(FooBar(f, None))
>> >     else
>> >       bs.foreach(b => o.collect(FooBar(f, Some(b))))
>> >    )
>> >   )
>> >
>> > However, this results in the pair being emitted from every window
>> slide, regardless of the match. The desired behaviour would be:
>> > * emit the the match as soon as it's found, don't emit any more pairs
>> for it,
>> > * otherwise, emit the empty match, when the left side element leaves
>> the last of its windows
>> >
>> > What would be the idiomatic/efficient way to implement such behaviour?
>> Is it possible at all with the coGroup/window mechanism, or some other way
>> is necessary?
>> >
>> > Alex
>>
>>
>

Re: Streaming left outer join

Posted by Alexander Gryzlov <al...@gmail.com>.
Hello Aljoscha,

Indeed, it seems like I'd need a custom operator. I imagine this involves
implementing org.apache.flink.streaming.api.operators.TwoInputStreamOperator?
Could
you provide those pointers please?

Alex

On Wed, Jan 27, 2016 at 12:03 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> I’m afraid there is currently now way to do what you want with the builtin
> window primitives. Each of the slices of the sliding windows is essentially
> evaluated independently. Therefore, there cannot be effects in one slice
> that influence processing of another slice.
>
> What you could do is switch to tumbling windows, then each element would
> only be in one window. That probably won’t fit your use case anymore. The
> alternative I see to that is to implement everything in a custom operator
> where you deal with window states and triggering on time yourself. Let me
> know if you need some pointers about that one.
>
> Cheers,
> Aljoscha
> > On 26 Jan 2016, at 19:32, Alexander Gryzlov <al...@gmail.com>
> wrote:
> >
> > Hello,
> >
> > I'm trying to implement a left outer join of two Kafka streams within a
> sliding window. So far I have the following code:
> >
> > foos
> >   .coGroup(bars)
> >   .where(_.baz).equalTo(_.baz)
> >   .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.MINUTES), Time.of(1,
> TimeUnit.SECONDS)))
> >   .apply((fs: Iterator[Foo], bs: Iterator[Bar], o: Collector[FooBar]) =>
> >    fs.foreach(f =>
> >     if (bs.isEmpty)
> >       o.collect(FooBar(f, None))
> >     else
> >       bs.foreach(b => o.collect(FooBar(f, Some(b))))
> >    )
> >   )
> >
> > However, this results in the pair being emitted from every window slide,
> regardless of the match. The desired behaviour would be:
> > * emit the the match as soon as it's found, don't emit any more pairs
> for it,
> > * otherwise, emit the empty match, when the left side element leaves the
> last of its windows
> >
> > What would be the idiomatic/efficient way to implement such behaviour?
> Is it possible at all with the coGroup/window mechanism, or some other way
> is necessary?
> >
> > Alex
>
>

Re: Streaming left outer join

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
I’m afraid there is currently now way to do what you want with the builtin window primitives. Each of the slices of the sliding windows is essentially evaluated independently. Therefore, there cannot be effects in one slice that influence processing of another slice.

What you could do is switch to tumbling windows, then each element would only be in one window. That probably won’t fit your use case anymore. The alternative I see to that is to implement everything in a custom operator where you deal with window states and triggering on time yourself. Let me know if you need some pointers about that one.

Cheers,
Aljoscha
> On 26 Jan 2016, at 19:32, Alexander Gryzlov <al...@gmail.com> wrote:
> 
> Hello, 
> 
> I'm trying to implement a left outer join of two Kafka streams within a sliding window. So far I have the following code:
> 
> foos
>   .coGroup(bars)
>   .where(_.baz).equalTo(_.baz)
>   .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.MINUTES), Time.of(1, TimeUnit.SECONDS)))
>   .apply((fs: Iterator[Foo], bs: Iterator[Bar], o: Collector[FooBar]) =>
>    fs.foreach(f =>
>     if (bs.isEmpty)
>       o.collect(FooBar(f, None))
>     else
>       bs.foreach(b => o.collect(FooBar(f, Some(b))))
>    )
>   )
> 
> However, this results in the pair being emitted from every window slide, regardless of the match. The desired behaviour would be:
> * emit the the match as soon as it's found, don't emit any more pairs for it,
> * otherwise, emit the empty match, when the left side element leaves the last of its windows
> 
> What would be the idiomatic/efficient way to implement such behaviour? Is it possible at all with the coGroup/window mechanism, or some other way is necessary?
> 
> Alex