You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Henry Cai <hc...@pinterest.com> on 2016/04/14 06:32:36 UTC

streaming join implementation

Hi,

We are evaluating different streaming platforms.  For a typical join
between two streams

select a.*, b.*
FROM a, b
ON a.id == b.id

How does flink implement the join?  The matching record from either stream
can come late, we consider it's a valid join as long as the event time for
record a and b are in the same day.

I think some streaming platform (e.g. google data flow) will store the
records from both streams in a K/V lookup store and later do the lookup.
Is this how flink implement the streaming join?

If we need to store all the records in a state store, that's going to be a
lots of records for a day.

Re: streaming join implementation

Posted by Aljoscha Krettek <al...@apache.org>.
I'll try and answer both questions.

Regarding Henry's question about very large state and caching: this depends
on the StateBackend. The FsStateBackend has to keep all state on the JVM
heap in hash-maps. If you have the appropriate number of machines which
large memory then this could still work. If you enable checkpointing the
FsStateBackend will write the whole state into HDFS for every checkpoint,
this is a blocking operation, so if it takes too long you will see latency.
The RocksDB state backend keeps the state in a RocksDB data base on disk,
so  the state can possibly grow as large as disk space allows. The problem
here is just that Flink will try and checkpoint the whole RocksDB data base
if you enable checkpointing, which could take a while. The good news there
is that RocksDB checkpointing works in two phases: a short, synchronous
phase where we draw a local backup of the RocksDB data base and a longer,
asynchronous phase where put that snapshot into HDFS.

Now, regarding Andrew's question: one way I see of solving this is
splitting the stream into two streams. One that contains elements that are
more or less on time and another one for elements that are hopelessly late.
The former stream you could process with the normal windowing mechanisms
while for the latter you do special-case handling, such as going to an
external store, updating a value there, what have you.

On Thu, 14 Apr 2016 at 19:45 Andrew Coates <bi...@gmail.com>
wrote:

> Extending on what Henry is asking... What if data can be more that a day
> late, or in a more streaming nature, what if updates can come through for
> previous values?
>
> This would obviously involve storing a great deal of state. The use case
> I'm thinking of has large large volumes per day. So an external store would
> be needed to store the state.
>
> But is this something Flink could do well?
>
> On Thu, 14 Apr 2016, 18:25 Henry Cai, <hc...@pinterest.com> wrote:
>
>> Cogroup is nice, thanks.
>>
>> But if I define a tumbling window of one day, does that mean flink needs
>> to cache all the data for one day in memory?  I have about 5TB of data
>> coming for one day.  About 50% records will find a matching records (the
>> other 50% doesn't).
>>
>>
>> On Thu, Apr 14, 2016 at 9:05 AM, Aljoscha Krettek <al...@apache.org>
>> wrote:
>>
>>> Hi,
>>> right now, Flink does not give you a way to get the the records that
>>> where not joined for a join. You can, however use a co-group operation
>>> instead of a join to figure out which records did not join with records
>>> from the other side and treat them separately.
>>>
>>> Let me show an example:
>>>
>>> val input1: DataStream[A] = ...
>>> val input2: DataStream[B] = ...
>>>
>>> val result = input1.coGroup(input2)
>>>   .where(_.key1)
>>>   .equalTo(_.key2)
>>>   .window(TumblingTimeWindows.of(Time.days(1)))
>>>   .apply(new MyCoGroupFunction)
>>>
>>> class MyCoGroupFunction {
>>>   void coGroup(Iterable[A] first, Iterable[B] second, Collector[O] out) {
>>>     if (!first.iterator().hasNext()) {
>>>       // no element from first input matched
>>>       out.collect(<message telling that I only have second input
>>> elements>)
>>>     } else if (!second.iterator().hasNext()) {
>>>             out.collect(<message telling that I only have first input
>>> elements>)
>>>     } else {
>>>        // perform the actual join using the two iterables
>>>     }
>>>   }
>>> }
>>>
>>> The result will be a stream that contains both join results as well as
>>> the elements telling you that something didn't join. You can process this
>>> stream further by splitting it into different streams of only proper join
>>> results and non-joined elements and so on.
>>>
>>> I hope this helps somewhat.
>>>
>>> Cheers,
>>> Aljoscha
>>> On Thu, 14 Apr 2016 at 08:55 Balaji Rajagopalan <
>>> balaji.rajagopalan@olacabs.com> wrote:
>>>
>>>> Let me give you specific example, say stream1 event1 happened within
>>>> your window 0-5 min with key1, and event2 on stream2 with key2 which could
>>>> have matched with key1 happened at 5:01 outside the join window, so now you
>>>> will have to co-relate the event2 on stream2 with the event1 with stream1
>>>> which has happened on the previous window, this was the corner case I
>>>> mentioned before. I am not aware if flink can solve this problem for you,
>>>> that would be nice, instead of solving this in application.
>>>>
>>>> On Thu, Apr 14, 2016 at 12:10 PM, Henry Cai <hc...@pinterest.com> wrote:
>>>>
>>>>> Thanks Balaji.  Do you mean you spill the non-matching records after 5
>>>>> minutes into redis?  Does flink give you control on which records is not
>>>>> matching in the current window such that you can copy into a long-term
>>>>> storage?
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Apr 13, 2016 at 11:20 PM, Balaji Rajagopalan <
>>>>> balaji.rajagopalan@olacabs.com> wrote:
>>>>>
>>>>>> You can implement join in flink (which is a inner join) the below
>>>>>> mentioned pseudo code . The below join is for a 5 minute interval, yes will
>>>>>> be some corners cases when the data coming after 5 minutes will be  missed
>>>>>> out in the join window, I actually had solved this problem but storing some
>>>>>> data in redis and wrote correlation logic to take care of the corner cases
>>>>>> that were missed out in the join  window.
>>>>>>
>>>>>> val output: DataStream[(OutputData)] = stream1.join(stream2).where(_.key1).equalTo(_.key2).
>>>>>>   window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.MINUTE))).apply(new SomeJoinFunction)
>>>>>>
>>>>>>
>>>>>> On Thu, Apr 14, 2016 at 10:02 AM, Henry Cai <hc...@pinterest.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> We are evaluating different streaming platforms.  For a typical join
>>>>>>> between two streams
>>>>>>>
>>>>>>> select a.*, b.*
>>>>>>> FROM a, b
>>>>>>> ON a.id == b.id
>>>>>>>
>>>>>>> How does flink implement the join?  The matching record from either
>>>>>>> stream can come late, we consider it's a valid join as long as the event
>>>>>>> time for record a and b are in the same day.
>>>>>>>
>>>>>>> I think some streaming platform (e.g. google data flow) will store
>>>>>>> the records from both streams in a K/V lookup store and later do the
>>>>>>> lookup.  Is this how flink implement the streaming join?
>>>>>>>
>>>>>>> If we need to store all the records in a state store, that's going
>>>>>>> to be a lots of records for a day.
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>

Re: streaming join implementation

Posted by Andrew Coates <bi...@gmail.com>.
Extending on what Henry is asking... What if data can be more that a day
late, or in a more streaming nature, what if updates can come through for
previous values?

This would obviously involve storing a great deal of state. The use case
I'm thinking of has large large volumes per day. So an external store would
be needed to store the state.

But is this something Flink could do well?

On Thu, 14 Apr 2016, 18:25 Henry Cai, <hc...@pinterest.com> wrote:

> Cogroup is nice, thanks.
>
> But if I define a tumbling window of one day, does that mean flink needs
> to cache all the data for one day in memory?  I have about 5TB of data
> coming for one day.  About 50% records will find a matching records (the
> other 50% doesn't).
>
>
> On Thu, Apr 14, 2016 at 9:05 AM, Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Hi,
>> right now, Flink does not give you a way to get the the records that
>> where not joined for a join. You can, however use a co-group operation
>> instead of a join to figure out which records did not join with records
>> from the other side and treat them separately.
>>
>> Let me show an example:
>>
>> val input1: DataStream[A] = ...
>> val input2: DataStream[B] = ...
>>
>> val result = input1.coGroup(input2)
>>   .where(_.key1)
>>   .equalTo(_.key2)
>>   .window(TumblingTimeWindows.of(Time.days(1)))
>>   .apply(new MyCoGroupFunction)
>>
>> class MyCoGroupFunction {
>>   void coGroup(Iterable[A] first, Iterable[B] second, Collector[O] out) {
>>     if (!first.iterator().hasNext()) {
>>       // no element from first input matched
>>       out.collect(<message telling that I only have second input
>> elements>)
>>     } else if (!second.iterator().hasNext()) {
>>             out.collect(<message telling that I only have first input
>> elements>)
>>     } else {
>>        // perform the actual join using the two iterables
>>     }
>>   }
>> }
>>
>> The result will be a stream that contains both join results as well as
>> the elements telling you that something didn't join. You can process this
>> stream further by splitting it into different streams of only proper join
>> results and non-joined elements and so on.
>>
>> I hope this helps somewhat.
>>
>> Cheers,
>> Aljoscha
>> On Thu, 14 Apr 2016 at 08:55 Balaji Rajagopalan <
>> balaji.rajagopalan@olacabs.com> wrote:
>>
>>> Let me give you specific example, say stream1 event1 happened within
>>> your window 0-5 min with key1, and event2 on stream2 with key2 which could
>>> have matched with key1 happened at 5:01 outside the join window, so now you
>>> will have to co-relate the event2 on stream2 with the event1 with stream1
>>> which has happened on the previous window, this was the corner case I
>>> mentioned before. I am not aware if flink can solve this problem for you,
>>> that would be nice, instead of solving this in application.
>>>
>>> On Thu, Apr 14, 2016 at 12:10 PM, Henry Cai <hc...@pinterest.com> wrote:
>>>
>>>> Thanks Balaji.  Do you mean you spill the non-matching records after 5
>>>> minutes into redis?  Does flink give you control on which records is not
>>>> matching in the current window such that you can copy into a long-term
>>>> storage?
>>>>
>>>>
>>>>
>>>> On Wed, Apr 13, 2016 at 11:20 PM, Balaji Rajagopalan <
>>>> balaji.rajagopalan@olacabs.com> wrote:
>>>>
>>>>> You can implement join in flink (which is a inner join) the below
>>>>> mentioned pseudo code . The below join is for a 5 minute interval, yes will
>>>>> be some corners cases when the data coming after 5 minutes will be  missed
>>>>> out in the join window, I actually had solved this problem but storing some
>>>>> data in redis and wrote correlation logic to take care of the corner cases
>>>>> that were missed out in the join  window.
>>>>>
>>>>> val output: DataStream[(OutputData)] = stream1.join(stream2).where(_.key1).equalTo(_.key2).
>>>>>   window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.MINUTE))).apply(new SomeJoinFunction)
>>>>>
>>>>>
>>>>> On Thu, Apr 14, 2016 at 10:02 AM, Henry Cai <hc...@pinterest.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> We are evaluating different streaming platforms.  For a typical join
>>>>>> between two streams
>>>>>>
>>>>>> select a.*, b.*
>>>>>> FROM a, b
>>>>>> ON a.id == b.id
>>>>>>
>>>>>> How does flink implement the join?  The matching record from either
>>>>>> stream can come late, we consider it's a valid join as long as the event
>>>>>> time for record a and b are in the same day.
>>>>>>
>>>>>> I think some streaming platform (e.g. google data flow) will store
>>>>>> the records from both streams in a K/V lookup store and later do the
>>>>>> lookup.  Is this how flink implement the streaming join?
>>>>>>
>>>>>> If we need to store all the records in a state store, that's going to
>>>>>> be a lots of records for a day.
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>

Re: streaming join implementation

Posted by Henry Cai <hc...@pinterest.com>.
Cogroup is nice, thanks.

But if I define a tumbling window of one day, does that mean flink needs to
cache all the data for one day in memory?  I have about 5TB of data coming
for one day.  About 50% records will find a matching records (the other 50%
doesn't).


On Thu, Apr 14, 2016 at 9:05 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> right now, Flink does not give you a way to get the the records that where
> not joined for a join. You can, however use a co-group operation instead of
> a join to figure out which records did not join with records from the other
> side and treat them separately.
>
> Let me show an example:
>
> val input1: DataStream[A] = ...
> val input2: DataStream[B] = ...
>
> val result = input1.coGroup(input2)
>   .where(_.key1)
>   .equalTo(_.key2)
>   .window(TumblingTimeWindows.of(Time.days(1)))
>   .apply(new MyCoGroupFunction)
>
> class MyCoGroupFunction {
>   void coGroup(Iterable[A] first, Iterable[B] second, Collector[O] out) {
>     if (!first.iterator().hasNext()) {
>       // no element from first input matched
>       out.collect(<message telling that I only have second input elements>)
>     } else if (!second.iterator().hasNext()) {
>             out.collect(<message telling that I only have first input
> elements>)
>     } else {
>        // perform the actual join using the two iterables
>     }
>   }
> }
>
> The result will be a stream that contains both join results as well as the
> elements telling you that something didn't join. You can process this
> stream further by splitting it into different streams of only proper join
> results and non-joined elements and so on.
>
> I hope this helps somewhat.
>
> Cheers,
> Aljoscha
> On Thu, 14 Apr 2016 at 08:55 Balaji Rajagopalan <
> balaji.rajagopalan@olacabs.com> wrote:
>
>> Let me give you specific example, say stream1 event1 happened within your
>> window 0-5 min with key1, and event2 on stream2 with key2 which could have
>> matched with key1 happened at 5:01 outside the join window, so now you will
>> have to co-relate the event2 on stream2 with the event1 with stream1 which
>> has happened on the previous window, this was the corner case I mentioned
>> before. I am not aware if flink can solve this problem for you, that would
>> be nice, instead of solving this in application.
>>
>> On Thu, Apr 14, 2016 at 12:10 PM, Henry Cai <hc...@pinterest.com> wrote:
>>
>>> Thanks Balaji.  Do you mean you spill the non-matching records after 5
>>> minutes into redis?  Does flink give you control on which records is not
>>> matching in the current window such that you can copy into a long-term
>>> storage?
>>>
>>>
>>>
>>> On Wed, Apr 13, 2016 at 11:20 PM, Balaji Rajagopalan <
>>> balaji.rajagopalan@olacabs.com> wrote:
>>>
>>>> You can implement join in flink (which is a inner join) the below
>>>> mentioned pseudo code . The below join is for a 5 minute interval, yes will
>>>> be some corners cases when the data coming after 5 minutes will be  missed
>>>> out in the join window, I actually had solved this problem but storing some
>>>> data in redis and wrote correlation logic to take care of the corner cases
>>>> that were missed out in the join  window.
>>>>
>>>> val output: DataStream[(OutputData)] = stream1.join(stream2).where(_.key1).equalTo(_.key2).
>>>>   window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.MINUTE))).apply(new SomeJoinFunction)
>>>>
>>>>
>>>> On Thu, Apr 14, 2016 at 10:02 AM, Henry Cai <hc...@pinterest.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> We are evaluating different streaming platforms.  For a typical join
>>>>> between two streams
>>>>>
>>>>> select a.*, b.*
>>>>> FROM a, b
>>>>> ON a.id == b.id
>>>>>
>>>>> How does flink implement the join?  The matching record from either
>>>>> stream can come late, we consider it's a valid join as long as the event
>>>>> time for record a and b are in the same day.
>>>>>
>>>>> I think some streaming platform (e.g. google data flow) will store the
>>>>> records from both streams in a K/V lookup store and later do the lookup.
>>>>> Is this how flink implement the streaming join?
>>>>>
>>>>> If we need to store all the records in a state store, that's going to
>>>>> be a lots of records for a day.
>>>>>
>>>>>
>>>>
>>>
>>

Re: streaming join implementation

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
right now, Flink does not give you a way to get the the records that where
not joined for a join. You can, however use a co-group operation instead of
a join to figure out which records did not join with records from the other
side and treat them separately.

Let me show an example:

val input1: DataStream[A] = ...
val input2: DataStream[B] = ...

val result = input1.coGroup(input2)
  .where(_.key1)
  .equalTo(_.key2)
  .window(TumblingTimeWindows.of(Time.days(1)))
  .apply(new MyCoGroupFunction)

class MyCoGroupFunction {
  void coGroup(Iterable[A] first, Iterable[B] second, Collector[O] out) {
    if (!first.iterator().hasNext()) {
      // no element from first input matched
      out.collect(<message telling that I only have second input elements>)
    } else if (!second.iterator().hasNext()) {
            out.collect(<message telling that I only have first input
elements>)
    } else {
       // perform the actual join using the two iterables
    }
  }
}

The result will be a stream that contains both join results as well as the
elements telling you that something didn't join. You can process this
stream further by splitting it into different streams of only proper join
results and non-joined elements and so on.

I hope this helps somewhat.

Cheers,
Aljoscha
On Thu, 14 Apr 2016 at 08:55 Balaji Rajagopalan <
balaji.rajagopalan@olacabs.com> wrote:

> Let me give you specific example, say stream1 event1 happened within your
> window 0-5 min with key1, and event2 on stream2 with key2 which could have
> matched with key1 happened at 5:01 outside the join window, so now you will
> have to co-relate the event2 on stream2 with the event1 with stream1 which
> has happened on the previous window, this was the corner case I mentioned
> before. I am not aware if flink can solve this problem for you, that would
> be nice, instead of solving this in application.
>
> On Thu, Apr 14, 2016 at 12:10 PM, Henry Cai <hc...@pinterest.com> wrote:
>
>> Thanks Balaji.  Do you mean you spill the non-matching records after 5
>> minutes into redis?  Does flink give you control on which records is not
>> matching in the current window such that you can copy into a long-term
>> storage?
>>
>>
>>
>> On Wed, Apr 13, 2016 at 11:20 PM, Balaji Rajagopalan <
>> balaji.rajagopalan@olacabs.com> wrote:
>>
>>> You can implement join in flink (which is a inner join) the below
>>> mentioned pseudo code . The below join is for a 5 minute interval, yes will
>>> be some corners cases when the data coming after 5 minutes will be  missed
>>> out in the join window, I actually had solved this problem but storing some
>>> data in redis and wrote correlation logic to take care of the corner cases
>>> that were missed out in the join  window.
>>>
>>> val output: DataStream[(OutputData)] = stream1.join(stream2).where(_.key1).equalTo(_.key2).
>>>   window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.MINUTE))).apply(new SomeJoinFunction)
>>>
>>>
>>> On Thu, Apr 14, 2016 at 10:02 AM, Henry Cai <hc...@pinterest.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> We are evaluating different streaming platforms.  For a typical join
>>>> between two streams
>>>>
>>>> select a.*, b.*
>>>> FROM a, b
>>>> ON a.id == b.id
>>>>
>>>> How does flink implement the join?  The matching record from either
>>>> stream can come late, we consider it's a valid join as long as the event
>>>> time for record a and b are in the same day.
>>>>
>>>> I think some streaming platform (e.g. google data flow) will store the
>>>> records from both streams in a K/V lookup store and later do the lookup.
>>>> Is this how flink implement the streaming join?
>>>>
>>>> If we need to store all the records in a state store, that's going to
>>>> be a lots of records for a day.
>>>>
>>>>
>>>
>>
>

Re: streaming join implementation

Posted by Balaji Rajagopalan <ba...@olacabs.com>.
Let me give you specific example, say stream1 event1 happened within your
window 0-5 min with key1, and event2 on stream2 with key2 which could have
matched with key1 happened at 5:01 outside the join window, so now you will
have to co-relate the event2 on stream2 with the event1 with stream1 which
has happened on the previous window, this was the corner case I mentioned
before. I am not aware if flink can solve this problem for you, that would
be nice, instead of solving this in application.

On Thu, Apr 14, 2016 at 12:10 PM, Henry Cai <hc...@pinterest.com> wrote:

> Thanks Balaji.  Do you mean you spill the non-matching records after 5
> minutes into redis?  Does flink give you control on which records is not
> matching in the current window such that you can copy into a long-term
> storage?
>
>
>
> On Wed, Apr 13, 2016 at 11:20 PM, Balaji Rajagopalan <
> balaji.rajagopalan@olacabs.com> wrote:
>
>> You can implement join in flink (which is a inner join) the below
>> mentioned pseudo code . The below join is for a 5 minute interval, yes will
>> be some corners cases when the data coming after 5 minutes will be  missed
>> out in the join window, I actually had solved this problem but storing some
>> data in redis and wrote correlation logic to take care of the corner cases
>> that were missed out in the join  window.
>>
>> val output: DataStream[(OutputData)] = stream1.join(stream2).where(_.key1).equalTo(_.key2).
>>   window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.MINUTE))).apply(new SomeJoinFunction)
>>
>>
>> On Thu, Apr 14, 2016 at 10:02 AM, Henry Cai <hc...@pinterest.com> wrote:
>>
>>> Hi,
>>>
>>> We are evaluating different streaming platforms.  For a typical join
>>> between two streams
>>>
>>> select a.*, b.*
>>> FROM a, b
>>> ON a.id == b.id
>>>
>>> How does flink implement the join?  The matching record from either
>>> stream can come late, we consider it's a valid join as long as the event
>>> time for record a and b are in the same day.
>>>
>>> I think some streaming platform (e.g. google data flow) will store the
>>> records from both streams in a K/V lookup store and later do the lookup.
>>> Is this how flink implement the streaming join?
>>>
>>> If we need to store all the records in a state store, that's going to be
>>> a lots of records for a day.
>>>
>>>
>>
>

Re: streaming join implementation

Posted by Henry Cai <hc...@pinterest.com>.
Thanks Balaji.  Do you mean you spill the non-matching records after 5
minutes into redis?  Does flink give you control on which records is not
matching in the current window such that you can copy into a long-term
storage?



On Wed, Apr 13, 2016 at 11:20 PM, Balaji Rajagopalan <
balaji.rajagopalan@olacabs.com> wrote:

> You can implement join in flink (which is a inner join) the below
> mentioned pseudo code . The below join is for a 5 minute interval, yes will
> be some corners cases when the data coming after 5 minutes will be  missed
> out in the join window, I actually had solved this problem but storing some
> data in redis and wrote correlation logic to take care of the corner cases
> that were missed out in the join  window.
>
> val output: DataStream[(OutputData)] = stream1.join(stream2).where(_.key1).equalTo(_.key2).
>   window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.MINUTE))).apply(new SomeJoinFunction)
>
>
> On Thu, Apr 14, 2016 at 10:02 AM, Henry Cai <hc...@pinterest.com> wrote:
>
>> Hi,
>>
>> We are evaluating different streaming platforms.  For a typical join
>> between two streams
>>
>> select a.*, b.*
>> FROM a, b
>> ON a.id == b.id
>>
>> How does flink implement the join?  The matching record from either
>> stream can come late, we consider it's a valid join as long as the event
>> time for record a and b are in the same day.
>>
>> I think some streaming platform (e.g. google data flow) will store the
>> records from both streams in a K/V lookup store and later do the lookup.
>> Is this how flink implement the streaming join?
>>
>> If we need to store all the records in a state store, that's going to be
>> a lots of records for a day.
>>
>>
>

Re: streaming join implementation

Posted by Balaji Rajagopalan <ba...@olacabs.com>.
You can implement join in flink (which is a inner join) the below mentioned
pseudo code . The below join is for a 5 minute interval, yes will be some
corners cases when the data coming after 5 minutes will be  missed out in
the join window, I actually had solved this problem but storing some data
in redis and wrote correlation logic to take care of the corner cases that
were missed out in the join  window.

val output: DataStream[(OutputData)] =
stream1.join(stream2).where(_.key1).equalTo(_.key2).
  window(TumblingEventTimeWindows.of(Time.of(5,
TimeUnit.MINUTE))).apply(new SomeJoinFunction)


On Thu, Apr 14, 2016 at 10:02 AM, Henry Cai <hc...@pinterest.com> wrote:

> Hi,
>
> We are evaluating different streaming platforms.  For a typical join
> between two streams
>
> select a.*, b.*
> FROM a, b
> ON a.id == b.id
>
> How does flink implement the join?  The matching record from either stream
> can come late, we consider it's a valid join as long as the event time for
> record a and b are in the same day.
>
> I think some streaming platform (e.g. google data flow) will store the
> records from both streams in a K/V lookup store and later do the lookup.
> Is this how flink implement the streaming join?
>
> If we need to store all the records in a state store, that's going to be a
> lots of records for a day.
>
>