You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Gytis Žilinskas <gy...@gmail.com> on 2018/03/08 09:02:24 UTC

Event time join

Hi,

we're considering flink for a couple of our projects. I'm doing a
trial implementation for one of them. So far, I like a lot of things,
however there are a couple of issues that I can't figure out how to
resolve. Not sure if it's me misunderstanding the tool, or flink just
doesn't have a capability to do it.

We want to do an event time join on two big kafka streams. Both of
them might experience some issues on the other end and be delayed.
Additionally, while both are big, one (let's call it stream A) is
significantly larger than stream B.

We also know, that the join window is around 5min. That is, given some
key K in stream B, if there is a counterpart in stream A, it's going
to be +/5 5min in event time.

Since stream A is especially heavy and it's unfeasable to keep hours
of it in memory, I would imagine an ideal solution where we read both
streams from Kafka. We always make sure that stream B is ahead by
10min, that is, if stream A is currently ahead in watermarks, we stall
it and consume stream B until it catches up. Once the stream are
alligned in event time (with the 10min delay window) we run them both
through join.

The problem is, that I find a mechanism to implement that in flink. If
I try to do a CoProcessFunction then it just consumes both streams at
the same time, ingests a lot of messages from stream A, runs out of
memory and dies.

Any ideas on how this could be solved?

(here's a thread with a very similar problem from some time ago
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/synchronizing-two-streams-td6830.html)

Regards,
Gytis

Re: Event time join

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

A Flink application does not have a problem if it ingests two streams with
very different throughput as long as they are somewhat synced on their
event-time.
This is typically the case when ingesting real-time data. In such
scenarios, an application would not buffer more data than necessary.

When reading two streams of historic data with different "density" (events
per time interval) or real-time streams that are off by some time interval,
the application needs to buffer more data to compensate for the difference
in time.
In case of real-time streams that are off by a (more or less) fixed offset,
you should plan for the additional state requirements. Syncing sources to
the same event-time would help in both cases.
However, Flink's RocksDB state backend is also pretty good in handling very
large state sizes due to asynchronous and incremental checkpointing.

The window join functions of the SQL and Table API are implemented using a
CoProcessFunction and so is the new join operator that I pointed to.

Syncing sources is not really related to fault tolerance except that
additional state affects the checkpointing and recovery performance.
Pausing sources can cause problems because watermarks do not advance when
no data is ingested, but again this is not related to fault tolerance.

The credit-based network transfer will be included in Flink 1.5. However,
this is not related to the question discussed here.
It only applies to cases where an operator cannot continue processing, for
example if the function call does not return.
An operator cannot decide to block a particular input and process the other
one.

Long story short.
If you join two streams on event time, you need to buffer the data for the
join window + the event time difference between both streams.

Best, Fabian


2018-03-09 9:28 GMT+01:00 Gytis Žilinskas <gy...@gmail.com>:

> Thanks for the answers and discussion both of you.
>
> The FLIP mentions that the cases where one stream is much faster than
> the other one, will not be handled for now either, so I guess it would
> still not solve our problems. As for the join semantics itself, I
> think we achieve the same thing with CoProcessFunction, unless I'm
> missing something.
>
> Anyway, one couple more questions then. It seems weird that this issue
> isn't much more talked about or prioritized. That leads me to believe
> that maybe we're misunderstanding the use case for flink, or maybe
> other users have a different architecture / environment that doesn't
> present them with such problems. Could you describe how it is usually
> used?
>
> From the documentation and talks it looks like fault tolerance is an
> important concept in flink, so a source pausing, or slowing down is
> expected. The way I see it, the only options to deal with it at the
> moment:
>
> 1) have a cluster size that can buffer everything for as long as
> needed and is able to eventually catch up
> 2) model the behaviour so that the streams that are ahead, can go
> through after some cutoff time
>
> do most of the applications just fall into one of these behaviours?
>
> Finally, are there some ideas about extending capabilities of the
> backpressure mechanism that would allow of building some sort of
> functionality, similar to what I was describing in the initial mail.
> With some prioritisation to one of the streams, or other custom
> stalling behaviour. (maybe this credit based approach Vishal mentions?
> The FLIP document is not public, so can't really tell)
>
>
> Thanks again for all the help!
> Gytis
>
> On Thu, Mar 8, 2018 at 7:48 PM, Vishal Santoshi
> <vi...@gmail.com> wrote:
> > Yep.  I think this leads to this general question and may be not
> pertinent
> > to https://github.com/apache/flink/pull/5342.  How do we throttle a
> source
> > if the held back data gets unreasonably large ? I know that that is in
> > itself a broader question but delayed watermarks of slow stream
> accentuates
> > the issue . I am curious to know how credit based back pressure handling
> > plays or is that outside the realm of this discussion ? And is credit
> based
> > back pressure handling in 1.5 release.
> >
> > On Thu, Mar 8, 2018 at 12:23 PM, Fabian Hueske <fh...@gmail.com>
> wrote:
> >>
> >> The join would not cause backpressure but rather put all events that
> >> cannot be processed yet into state to process them later.
> >> So this works well if the data that is provided by the streams is
> roughly
> >> aligned by event time.
> >>
> >> 2018-03-08 9:04 GMT-08:00 Vishal Santoshi <vi...@gmail.com>:
> >>>
> >>> Aah we have it here
> >>> https://docs.google.com/document/d/16GMH5VM6JJiWj_
> N0W8y3PtQ1aoJFxsKoOTSYOfqlsRE/edit#heading=h.bgl260hr56g6
> >>>
> >>> On Thu, Mar 8, 2018 at 11:45 AM, Vishal Santoshi
> >>> <vi...@gmail.com> wrote:
> >>>>
> >>>> This is very interesting.  I would imagine that there will be high
> back
> >>>> pressure on the LEFT source effectively throttling it but as is the
> current
> >>>> state that is likely effect other pipelines as the free o/p buffer on
> the
> >>>> source side and and i/p buffers on the consumer side start blocking
> and get
> >>>> exhausted for all other pipes. I am very interested in how holding
> back the
> >>>> busy source does not create a pathological  issue where that source is
> >>>> forever held back. Is there a FLIP for it ?
> >>>>
> >>>> On Thu, Mar 8, 2018 at 11:29 AM, Fabian Hueske <fh...@gmail.com>
> >>>> wrote:
> >>>>>
> >>>>> Hi Gytis,
> >>>>>
> >>>>> Flink does currently not support holding back individual streams, for
> >>>>> example it is not possible to align streams on (offset) event-time.
> >>>>>
> >>>>> However, the Flink community is working on a windowed join for the
> >>>>> DataStream API, that only holds the relevant tail of the stream as
> state.
> >>>>> If your join condition is +/- 5 minutes then, the join would store he
> >>>>> last five minutes of both streams as state. Here's an implementation
> of the
> >>>>> operator [1] that is close to be merged and will be available in
> Flink
> >>>>> 1.6.0.
> >>>>> Flink's SQL support (and Table API) support this join type since
> >>>>> version 1.4.0 [2].
> >>>>>
> >>>>> Best, Fabian
> >>>>>
> >>>>> [1] https://github.com/apache/flink/pull/5342
> >>>>> [2]
> >>>>> https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/dev/table/sql.html#joins
> >>>>>
> >>>>> 2018-03-08 1:02 GMT-08:00 Gytis Žilinskas <gytis.zilinskas@gmail.com
> >:
> >>>>>>
> >>>>>> Hi,
> >>>>>>
> >>>>>> we're considering flink for a couple of our projects. I'm doing a
> >>>>>> trial implementation for one of them. So far, I like a lot of
> things,
> >>>>>> however there are a couple of issues that I can't figure out how to
> >>>>>> resolve. Not sure if it's me misunderstanding the tool, or flink
> just
> >>>>>> doesn't have a capability to do it.
> >>>>>>
> >>>>>> We want to do an event time join on two big kafka streams. Both of
> >>>>>> them might experience some issues on the other end and be delayed.
> >>>>>> Additionally, while both are big, one (let's call it stream A) is
> >>>>>> significantly larger than stream B.
> >>>>>>
> >>>>>> We also know, that the join window is around 5min. That is, given
> some
> >>>>>> key K in stream B, if there is a counterpart in stream A, it's going
> >>>>>> to be +/5 5min in event time.
> >>>>>>
> >>>>>> Since stream A is especially heavy and it's unfeasable to keep hours
> >>>>>> of it in memory, I would imagine an ideal solution where we read
> both
> >>>>>> streams from Kafka. We always make sure that stream B is ahead by
> >>>>>> 10min, that is, if stream A is currently ahead in watermarks, we
> stall
> >>>>>> it and consume stream B until it catches up. Once the stream are
> >>>>>> alligned in event time (with the 10min delay window) we run them
> both
> >>>>>> through join.
> >>>>>>
> >>>>>> The problem is, that I find a mechanism to implement that in flink.
> If
> >>>>>> I try to do a CoProcessFunction then it just consumes both streams
> at
> >>>>>> the same time, ingests a lot of messages from stream A, runs out of
> >>>>>> memory and dies.
> >>>>>>
> >>>>>> Any ideas on how this could be solved?
> >>>>>>
> >>>>>> (here's a thread with a very similar problem from some time ago
> >>>>>>
> >>>>>> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/synchronizing-two-streams-td6830.html)
> >>>>>>
> >>>>>> Regards,
> >>>>>> Gytis
> >>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>

Re: Event time join

Posted by Gytis Žilinskas <gy...@gmail.com>.
Thanks for the answers and discussion both of you.

The FLIP mentions that the cases where one stream is much faster than
the other one, will not be handled for now either, so I guess it would
still not solve our problems. As for the join semantics itself, I
think we achieve the same thing with CoProcessFunction, unless I'm
missing something.

Anyway, one couple more questions then. It seems weird that this issue
isn't much more talked about or prioritized. That leads me to believe
that maybe we're misunderstanding the use case for flink, or maybe
other users have a different architecture / environment that doesn't
present them with such problems. Could you describe how it is usually
used?

From the documentation and talks it looks like fault tolerance is an
important concept in flink, so a source pausing, or slowing down is
expected. The way I see it, the only options to deal with it at the
moment:

1) have a cluster size that can buffer everything for as long as
needed and is able to eventually catch up
2) model the behaviour so that the streams that are ahead, can go
through after some cutoff time

do most of the applications just fall into one of these behaviours?

Finally, are there some ideas about extending capabilities of the
backpressure mechanism that would allow of building some sort of
functionality, similar to what I was describing in the initial mail.
With some prioritisation to one of the streams, or other custom
stalling behaviour. (maybe this credit based approach Vishal mentions?
The FLIP document is not public, so can't really tell)


Thanks again for all the help!
Gytis

On Thu, Mar 8, 2018 at 7:48 PM, Vishal Santoshi
<vi...@gmail.com> wrote:
> Yep.  I think this leads to this general question and may be not pertinent
> to https://github.com/apache/flink/pull/5342.  How do we throttle a source
> if the held back data gets unreasonably large ? I know that that is in
> itself a broader question but delayed watermarks of slow stream accentuates
> the issue . I am curious to know how credit based back pressure handling
> plays or is that outside the realm of this discussion ? And is credit based
> back pressure handling in 1.5 release.
>
> On Thu, Mar 8, 2018 at 12:23 PM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>> The join would not cause backpressure but rather put all events that
>> cannot be processed yet into state to process them later.
>> So this works well if the data that is provided by the streams is roughly
>> aligned by event time.
>>
>> 2018-03-08 9:04 GMT-08:00 Vishal Santoshi <vi...@gmail.com>:
>>>
>>> Aah we have it here
>>> https://docs.google.com/document/d/16GMH5VM6JJiWj_N0W8y3PtQ1aoJFxsKoOTSYOfqlsRE/edit#heading=h.bgl260hr56g6
>>>
>>> On Thu, Mar 8, 2018 at 11:45 AM, Vishal Santoshi
>>> <vi...@gmail.com> wrote:
>>>>
>>>> This is very interesting.  I would imagine that there will be high back
>>>> pressure on the LEFT source effectively throttling it but as is the current
>>>> state that is likely effect other pipelines as the free o/p buffer on the
>>>> source side and and i/p buffers on the consumer side start blocking and get
>>>> exhausted for all other pipes. I am very interested in how holding back the
>>>> busy source does not create a pathological  issue where that source is
>>>> forever held back. Is there a FLIP for it ?
>>>>
>>>> On Thu, Mar 8, 2018 at 11:29 AM, Fabian Hueske <fh...@gmail.com>
>>>> wrote:
>>>>>
>>>>> Hi Gytis,
>>>>>
>>>>> Flink does currently not support holding back individual streams, for
>>>>> example it is not possible to align streams on (offset) event-time.
>>>>>
>>>>> However, the Flink community is working on a windowed join for the
>>>>> DataStream API, that only holds the relevant tail of the stream as state.
>>>>> If your join condition is +/- 5 minutes then, the join would store he
>>>>> last five minutes of both streams as state. Here's an implementation of the
>>>>> operator [1] that is close to be merged and will be available in Flink
>>>>> 1.6.0.
>>>>> Flink's SQL support (and Table API) support this join type since
>>>>> version 1.4.0 [2].
>>>>>
>>>>> Best, Fabian
>>>>>
>>>>> [1] https://github.com/apache/flink/pull/5342
>>>>> [2]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/sql.html#joins
>>>>>
>>>>> 2018-03-08 1:02 GMT-08:00 Gytis Žilinskas <gy...@gmail.com>:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> we're considering flink for a couple of our projects. I'm doing a
>>>>>> trial implementation for one of them. So far, I like a lot of things,
>>>>>> however there are a couple of issues that I can't figure out how to
>>>>>> resolve. Not sure if it's me misunderstanding the tool, or flink just
>>>>>> doesn't have a capability to do it.
>>>>>>
>>>>>> We want to do an event time join on two big kafka streams. Both of
>>>>>> them might experience some issues on the other end and be delayed.
>>>>>> Additionally, while both are big, one (let's call it stream A) is
>>>>>> significantly larger than stream B.
>>>>>>
>>>>>> We also know, that the join window is around 5min. That is, given some
>>>>>> key K in stream B, if there is a counterpart in stream A, it's going
>>>>>> to be +/5 5min in event time.
>>>>>>
>>>>>> Since stream A is especially heavy and it's unfeasable to keep hours
>>>>>> of it in memory, I would imagine an ideal solution where we read both
>>>>>> streams from Kafka. We always make sure that stream B is ahead by
>>>>>> 10min, that is, if stream A is currently ahead in watermarks, we stall
>>>>>> it and consume stream B until it catches up. Once the stream are
>>>>>> alligned in event time (with the 10min delay window) we run them both
>>>>>> through join.
>>>>>>
>>>>>> The problem is, that I find a mechanism to implement that in flink. If
>>>>>> I try to do a CoProcessFunction then it just consumes both streams at
>>>>>> the same time, ingests a lot of messages from stream A, runs out of
>>>>>> memory and dies.
>>>>>>
>>>>>> Any ideas on how this could be solved?
>>>>>>
>>>>>> (here's a thread with a very similar problem from some time ago
>>>>>>
>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/synchronizing-two-streams-td6830.html)
>>>>>>
>>>>>> Regards,
>>>>>> Gytis
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Event time join

Posted by Vishal Santoshi <vi...@gmail.com>.
Yep.  I think this leads to this general question and may be not pertinent
to https://github.com/apache/flink/pull/5342.  How do we throttle a source
if the held back data gets unreasonably large ? I know that that is in
itself a broader question but delayed watermarks of slow stream accentuates
the issue . I am curious to know how credit based back pressure handling
plays or is that outside the realm of this discussion ? And is credit
based back
pressure handling in 1.5 release.

On Thu, Mar 8, 2018 at 12:23 PM, Fabian Hueske <fh...@gmail.com> wrote:

> The join would not cause backpressure but rather put all events that
> cannot be processed yet into state to process them later.
> So this works well if the data that is provided by the streams is roughly
> aligned by event time.
>
> 2018-03-08 9:04 GMT-08:00 Vishal Santoshi <vi...@gmail.com>:
>
>> Aah we have it here https://docs.google.com/d
>> ocument/d/16GMH5VM6JJiWj_N0W8y3PtQ1aoJFxsKoOTSYOfqlsRE/edit#
>> heading=h.bgl260hr56g6
>>
>> On Thu, Mar 8, 2018 at 11:45 AM, Vishal Santoshi <
>> vishal.santoshi@gmail.com> wrote:
>>
>>> This is very interesting.  I would imagine that there will be high back
>>> pressure on the LEFT source effectively throttling it but as is the current
>>> state that is likely effect other pipelines as the free o/p buffer on the
>>> source side and and i/p buffers on the consumer side start blocking and get
>>> exhausted for all other pipes. I am very interested in how holding back the
>>> busy source does not create a pathological  issue where that source is
>>> forever held back. Is there a FLIP for it ?
>>>
>>> On Thu, Mar 8, 2018 at 11:29 AM, Fabian Hueske <fh...@gmail.com>
>>> wrote:
>>>
>>>> Hi Gytis,
>>>>
>>>> Flink does currently not support holding back individual streams, for
>>>> example it is not possible to align streams on (offset) event-time.
>>>>
>>>> However, the Flink community is working on a windowed join for the
>>>> DataStream API, that only holds the relevant tail of the stream as state.
>>>> If your join condition is +/- 5 minutes then, the join would store he
>>>> last five minutes of both streams as state. Here's an implementation of the
>>>> operator [1] that is close to be merged and will be available in Flink
>>>> 1.6.0.
>>>> Flink's SQL support (and Table API) support this join type since
>>>> version 1.4.0 [2].
>>>>
>>>> Best, Fabian
>>>>
>>>> [1] https://github.com/apache/flink/pull/5342
>>>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>>>> dev/table/sql.html#joins
>>>>
>>>> 2018-03-08 1:02 GMT-08:00 Gytis Žilinskas <gy...@gmail.com>:
>>>>
>>>>> Hi,
>>>>>
>>>>> we're considering flink for a couple of our projects. I'm doing a
>>>>> trial implementation for one of them. So far, I like a lot of things,
>>>>> however there are a couple of issues that I can't figure out how to
>>>>> resolve. Not sure if it's me misunderstanding the tool, or flink just
>>>>> doesn't have a capability to do it.
>>>>>
>>>>> We want to do an event time join on two big kafka streams. Both of
>>>>> them might experience some issues on the other end and be delayed.
>>>>> Additionally, while both are big, one (let's call it stream A) is
>>>>> significantly larger than stream B.
>>>>>
>>>>> We also know, that the join window is around 5min. That is, given some
>>>>> key K in stream B, if there is a counterpart in stream A, it's going
>>>>> to be +/5 5min in event time.
>>>>>
>>>>> Since stream A is especially heavy and it's unfeasable to keep hours
>>>>> of it in memory, I would imagine an ideal solution where we read both
>>>>> streams from Kafka. We always make sure that stream B is ahead by
>>>>> 10min, that is, if stream A is currently ahead in watermarks, we stall
>>>>> it and consume stream B until it catches up. Once the stream are
>>>>> alligned in event time (with the 10min delay window) we run them both
>>>>> through join.
>>>>>
>>>>> The problem is, that I find a mechanism to implement that in flink. If
>>>>> I try to do a CoProcessFunction then it just consumes both streams at
>>>>> the same time, ingests a lot of messages from stream A, runs out of
>>>>> memory and dies.
>>>>>
>>>>> Any ideas on how this could be solved?
>>>>>
>>>>> (here's a thread with a very similar problem from some time ago
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>>> ble.com/synchronizing-two-streams-td6830.html)
>>>>>
>>>>> Regards,
>>>>> Gytis
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Event time join

Posted by Fabian Hueske <fh...@gmail.com>.
The join would not cause backpressure but rather put all events that cannot
be processed yet into state to process them later.
So this works well if the data that is provided by the streams is roughly
aligned by event time.

2018-03-08 9:04 GMT-08:00 Vishal Santoshi <vi...@gmail.com>:

> Aah we have it here https://docs.google.com/document/d/16GMH5VM6JJiWj_
> N0W8y3PtQ1aoJFxsKoOTSYOfqlsRE/edit#heading=h.bgl260hr56g6
>
> On Thu, Mar 8, 2018 at 11:45 AM, Vishal Santoshi <
> vishal.santoshi@gmail.com> wrote:
>
>> This is very interesting.  I would imagine that there will be high back
>> pressure on the LEFT source effectively throttling it but as is the current
>> state that is likely effect other pipelines as the free o/p buffer on the
>> source side and and i/p buffers on the consumer side start blocking and get
>> exhausted for all other pipes. I am very interested in how holding back the
>> busy source does not create a pathological  issue where that source is
>> forever held back. Is there a FLIP for it ?
>>
>> On Thu, Mar 8, 2018 at 11:29 AM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Hi Gytis,
>>>
>>> Flink does currently not support holding back individual streams, for
>>> example it is not possible to align streams on (offset) event-time.
>>>
>>> However, the Flink community is working on a windowed join for the
>>> DataStream API, that only holds the relevant tail of the stream as state.
>>> If your join condition is +/- 5 minutes then, the join would store he
>>> last five minutes of both streams as state. Here's an implementation of the
>>> operator [1] that is close to be merged and will be available in Flink
>>> 1.6.0.
>>> Flink's SQL support (and Table API) support this join type since version
>>> 1.4.0 [2].
>>>
>>> Best, Fabian
>>>
>>> [1] https://github.com/apache/flink/pull/5342
>>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>>> dev/table/sql.html#joins
>>>
>>> 2018-03-08 1:02 GMT-08:00 Gytis Žilinskas <gy...@gmail.com>:
>>>
>>>> Hi,
>>>>
>>>> we're considering flink for a couple of our projects. I'm doing a
>>>> trial implementation for one of them. So far, I like a lot of things,
>>>> however there are a couple of issues that I can't figure out how to
>>>> resolve. Not sure if it's me misunderstanding the tool, or flink just
>>>> doesn't have a capability to do it.
>>>>
>>>> We want to do an event time join on two big kafka streams. Both of
>>>> them might experience some issues on the other end and be delayed.
>>>> Additionally, while both are big, one (let's call it stream A) is
>>>> significantly larger than stream B.
>>>>
>>>> We also know, that the join window is around 5min. That is, given some
>>>> key K in stream B, if there is a counterpart in stream A, it's going
>>>> to be +/5 5min in event time.
>>>>
>>>> Since stream A is especially heavy and it's unfeasable to keep hours
>>>> of it in memory, I would imagine an ideal solution where we read both
>>>> streams from Kafka. We always make sure that stream B is ahead by
>>>> 10min, that is, if stream A is currently ahead in watermarks, we stall
>>>> it and consume stream B until it catches up. Once the stream are
>>>> alligned in event time (with the 10min delay window) we run them both
>>>> through join.
>>>>
>>>> The problem is, that I find a mechanism to implement that in flink. If
>>>> I try to do a CoProcessFunction then it just consumes both streams at
>>>> the same time, ingests a lot of messages from stream A, runs out of
>>>> memory and dies.
>>>>
>>>> Any ideas on how this could be solved?
>>>>
>>>> (here's a thread with a very similar problem from some time ago
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>> ble.com/synchronizing-two-streams-td6830.html)
>>>>
>>>> Regards,
>>>> Gytis
>>>>
>>>
>>>
>>
>

Re: Event time join

Posted by Vishal Santoshi <vi...@gmail.com>.
Aah we have it here
https://docs.google.com/document/d/16GMH5VM6JJiWj_N0W8y3PtQ1aoJFxsKoOTSYOfqlsRE/edit#heading=h.bgl260hr56g6

On Thu, Mar 8, 2018 at 11:45 AM, Vishal Santoshi <vi...@gmail.com>
wrote:

> This is very interesting.  I would imagine that there will be high back
> pressure on the LEFT source effectively throttling it but as is the current
> state that is likely effect other pipelines as the free o/p buffer on the
> source side and and i/p buffers on the consumer side start blocking and get
> exhausted for all other pipes. I am very interested in how holding back the
> busy source does not create a pathological  issue where that source is
> forever held back. Is there a FLIP for it ?
>
> On Thu, Mar 8, 2018 at 11:29 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Gytis,
>>
>> Flink does currently not support holding back individual streams, for
>> example it is not possible to align streams on (offset) event-time.
>>
>> However, the Flink community is working on a windowed join for the
>> DataStream API, that only holds the relevant tail of the stream as state.
>> If your join condition is +/- 5 minutes then, the join would store he
>> last five minutes of both streams as state. Here's an implementation of the
>> operator [1] that is close to be merged and will be available in Flink
>> 1.6.0.
>> Flink's SQL support (and Table API) support this join type since version
>> 1.4.0 [2].
>>
>> Best, Fabian
>>
>> [1] https://github.com/apache/flink/pull/5342
>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>> dev/table/sql.html#joins
>>
>> 2018-03-08 1:02 GMT-08:00 Gytis Žilinskas <gy...@gmail.com>:
>>
>>> Hi,
>>>
>>> we're considering flink for a couple of our projects. I'm doing a
>>> trial implementation for one of them. So far, I like a lot of things,
>>> however there are a couple of issues that I can't figure out how to
>>> resolve. Not sure if it's me misunderstanding the tool, or flink just
>>> doesn't have a capability to do it.
>>>
>>> We want to do an event time join on two big kafka streams. Both of
>>> them might experience some issues on the other end and be delayed.
>>> Additionally, while both are big, one (let's call it stream A) is
>>> significantly larger than stream B.
>>>
>>> We also know, that the join window is around 5min. That is, given some
>>> key K in stream B, if there is a counterpart in stream A, it's going
>>> to be +/5 5min in event time.
>>>
>>> Since stream A is especially heavy and it's unfeasable to keep hours
>>> of it in memory, I would imagine an ideal solution where we read both
>>> streams from Kafka. We always make sure that stream B is ahead by
>>> 10min, that is, if stream A is currently ahead in watermarks, we stall
>>> it and consume stream B until it catches up. Once the stream are
>>> alligned in event time (with the 10min delay window) we run them both
>>> through join.
>>>
>>> The problem is, that I find a mechanism to implement that in flink. If
>>> I try to do a CoProcessFunction then it just consumes both streams at
>>> the same time, ingests a lot of messages from stream A, runs out of
>>> memory and dies.
>>>
>>> Any ideas on how this could be solved?
>>>
>>> (here's a thread with a very similar problem from some time ago
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>> ble.com/synchronizing-two-streams-td6830.html)
>>>
>>> Regards,
>>> Gytis
>>>
>>
>>
>

Re: Event time join

Posted by Vishal Santoshi <vi...@gmail.com>.
This is very interesting.  I would imagine that there will be high back
pressure on the LEFT source effectively throttling it but as is the current
state that is likely effect other pipelines as the free o/p buffer on the
source side and and i/p buffers on the consumer side start blocking and get
exhausted for all other pipes. I am very interested in how holding back the
busy source does not create a pathological  issue where that source is
forever held back. Is there a FLIP for it ?

On Thu, Mar 8, 2018 at 11:29 AM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Gytis,
>
> Flink does currently not support holding back individual streams, for
> example it is not possible to align streams on (offset) event-time.
>
> However, the Flink community is working on a windowed join for the
> DataStream API, that only holds the relevant tail of the stream as state.
> If your join condition is +/- 5 minutes then, the join would store he last
> five minutes of both streams as state. Here's an implementation of the
> operator [1] that is close to be merged and will be available in Flink
> 1.6.0.
> Flink's SQL support (and Table API) support this join type since version
> 1.4.0 [2].
>
> Best, Fabian
>
> [1] https://github.com/apache/flink/pull/5342
> [2] https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/dev/table/sql.html#joins
>
> 2018-03-08 1:02 GMT-08:00 Gytis Žilinskas <gy...@gmail.com>:
>
>> Hi,
>>
>> we're considering flink for a couple of our projects. I'm doing a
>> trial implementation for one of them. So far, I like a lot of things,
>> however there are a couple of issues that I can't figure out how to
>> resolve. Not sure if it's me misunderstanding the tool, or flink just
>> doesn't have a capability to do it.
>>
>> We want to do an event time join on two big kafka streams. Both of
>> them might experience some issues on the other end and be delayed.
>> Additionally, while both are big, one (let's call it stream A) is
>> significantly larger than stream B.
>>
>> We also know, that the join window is around 5min. That is, given some
>> key K in stream B, if there is a counterpart in stream A, it's going
>> to be +/5 5min in event time.
>>
>> Since stream A is especially heavy and it's unfeasable to keep hours
>> of it in memory, I would imagine an ideal solution where we read both
>> streams from Kafka. We always make sure that stream B is ahead by
>> 10min, that is, if stream A is currently ahead in watermarks, we stall
>> it and consume stream B until it catches up. Once the stream are
>> alligned in event time (with the 10min delay window) we run them both
>> through join.
>>
>> The problem is, that I find a mechanism to implement that in flink. If
>> I try to do a CoProcessFunction then it just consumes both streams at
>> the same time, ingests a lot of messages from stream A, runs out of
>> memory and dies.
>>
>> Any ideas on how this could be solved?
>>
>> (here's a thread with a very similar problem from some time ago
>> http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/synchronizing-two-streams-td6830.html)
>>
>> Regards,
>> Gytis
>>
>
>

Re: Event time join

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Gytis,

Flink does currently not support holding back individual streams, for
example it is not possible to align streams on (offset) event-time.

However, the Flink community is working on a windowed join for the
DataStream API, that only holds the relevant tail of the stream as state.
If your join condition is +/- 5 minutes then, the join would store he last
five minutes of both streams as state. Here's an implementation of the
operator [1] that is close to be merged and will be available in Flink
1.6.0.
Flink's SQL support (and Table API) support this join type since version
1.4.0 [2].

Best, Fabian

[1] https://github.com/apache/flink/pull/5342
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/sql.html#joins

2018-03-08 1:02 GMT-08:00 Gytis Žilinskas <gy...@gmail.com>:

> Hi,
>
> we're considering flink for a couple of our projects. I'm doing a
> trial implementation for one of them. So far, I like a lot of things,
> however there are a couple of issues that I can't figure out how to
> resolve. Not sure if it's me misunderstanding the tool, or flink just
> doesn't have a capability to do it.
>
> We want to do an event time join on two big kafka streams. Both of
> them might experience some issues on the other end and be delayed.
> Additionally, while both are big, one (let's call it stream A) is
> significantly larger than stream B.
>
> We also know, that the join window is around 5min. That is, given some
> key K in stream B, if there is a counterpart in stream A, it's going
> to be +/5 5min in event time.
>
> Since stream A is especially heavy and it's unfeasable to keep hours
> of it in memory, I would imagine an ideal solution where we read both
> streams from Kafka. We always make sure that stream B is ahead by
> 10min, that is, if stream A is currently ahead in watermarks, we stall
> it and consume stream B until it catches up. Once the stream are
> alligned in event time (with the 10min delay window) we run them both
> through join.
>
> The problem is, that I find a mechanism to implement that in flink. If
> I try to do a CoProcessFunction then it just consumes both streams at
> the same time, ingests a lot of messages from stream A, runs out of
> memory and dies.
>
> Any ideas on how this could be solved?
>
> (here's a thread with a very similar problem from some time ago
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/synchronizing-two-streams-td6830.html)
>
> Regards,
> Gytis
>