You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dmitry Golubets <dg...@gmail.com> on 2017/01/16 23:20:43 UTC

Three input stream operator and back pressure

Hi,

there are only *two *interfaces defined at the moment:
*OneInputStreamOperator*
and
*TwoInputStreamOperator.*

Is there any way to define an operator with arbitrary number of inputs?

My another concern is how to maintain *backpressure *in the operator?
Let's say I read events from two Kafka sources, both of which are ordered
by time. I want to merge them keeping the global order. But to do it, I
need to stop block one input if another one has no data yet.

Best regards,
Dmitry

Re: Three input stream operator and back pressure

Posted by Stephan Ewen <se...@apache.org>.
Hi!

Just to avoid confusion: the DataStream network readers does currently not
support backpressuring only one input (as this conflicts with other design
aspects). (The DataSet network readers do support that FYI)

How about simply "correcting" the order later? If you have pre-sorted data
per stream, you can generate frequent watermarks trivially (every 100 ms,
based the event's timestamp that you would use for the merge) and then
apply event time windows of say 100ms, inside which you sort and emit the
elements. The windows are strictly evaluated in order, so the resulting
stream is sorted. This would be similar to a form of incremental
"bucketing" sort over the merged stream.

That will give you a sorted stream easily, any may even be not too
expensive.

Stephan


On Tue, Jan 17, 2017 at 1:05 PM, Dmitry Golubets <dg...@gmail.com>
wrote:

> Hi Stephan,
>
> In one of our components we have to process events in order, due to
> business logic requirements.
> That is for sure introduces a bottleneck, but other aspects are fine.
>
> I'm not taking about really resorting data, but just about consuming it in
> the right order.
> I.e. if two streams are already in order, all that has to be done is to
> consume one that has the Min element at it's head and backpressure another
> one.
>
> What I can do ofc is to create a custom Source for it. But I would prefer
> not to mix source dependent logic (e.g. Kafka connection, etc) and merging
> logic.
>
> Best regards,
> Dmitry
>
> On Tue, Jan 17, 2017 at 10:46 AM, Stephan Ewen <se...@apache.org> wrote:
>
>> Hi Dmitry!
>>
>> The streaming runtime makes a conscious decision to not merge streams as
>> in an ordered merge.
>> The reason is that this is at large scale typically bad for scalability /
>> network performance.
>> Also, in certain DAGs, it may lead to deadlocks.
>>
>> Even the two input operator delivers records on a low level in a
>> first-come-first-serve order as driven by network events (NIO events).
>>
>> Flink's operators tolerate out-of-order records to compensate for that.
>> Overall, that seemed the more scalable design to us.
>> Can your use case follow a similar approach?
>>
>> Stephan
>>
>>
>>
>> On Tue, Jan 17, 2017 at 10:57 AM, Dmitry Golubets <dg...@gmail.com>
>> wrote:
>>
>>> Hi Timo,
>>>
>>> I don't have any key to join on, so I'm not sure Window Join would work
>>> for me.
>>>
>>> Can I implement my own "low level" operator in any way?
>>> I would appreciate if you can give me a hint or a link to example of how
>>> to do it.
>>>
>>>
>>>
>>> Best regards,
>>> Dmitry
>>>
>>> On Tue, Jan 17, 2017 at 9:24 AM, Timo Walther <tw...@apache.org>
>>> wrote:
>>>
>>>> Hi Dmitry,
>>>>
>>>> the runtime supports an arbitrary number of inputs, however, the API
>>>> does currently not provide a convenient way. You could use the "union"
>>>> operator to reduce the number of inputs. Otherwise I think you have to
>>>> implement your own operator. That depends on your use case though.
>>>>
>>>> You can maintain backpressure by using Flink's operator state. But did
>>>> you also thought about a Window Join instead?
>>>>
>>>> I hope that helps.
>>>>
>>>> Timo
>>>>
>>>>
>>>>
>>>>
>>>> Am 17/01/17 um 00:20 schrieb Dmitry Golubets:
>>>>
>>>> Hi,
>>>>
>>>> there are only *two *interfaces defined at the moment:
>>>> *OneInputStreamOperator*
>>>> and
>>>> *TwoInputStreamOperator.*
>>>>
>>>> Is there any way to define an operator with arbitrary number of inputs?
>>>>
>>>> My another concern is how to maintain *backpressure *in the operator?
>>>> Let's say I read events from two Kafka sources, both of which are
>>>> ordered by time. I want to merge them keeping the global order. But to do
>>>> it, I need to stop block one input if another one has no data yet.
>>>>
>>>> Best regards,
>>>> Dmitry
>>>>
>>>>
>>>>
>>>
>>
>

Re: Three input stream operator and back pressure

Posted by Dmitry Golubets <dg...@gmail.com>.
Hi Stephan,

In one of our components we have to process events in order, due to
business logic requirements.
That is for sure introduces a bottleneck, but other aspects are fine.

I'm not taking about really resorting data, but just about consuming it in
the right order.
I.e. if two streams are already in order, all that has to be done is to
consume one that has the Min element at it's head and backpressure another
one.

What I can do ofc is to create a custom Source for it. But I would prefer
not to mix source dependent logic (e.g. Kafka connection, etc) and merging
logic.

Best regards,
Dmitry

On Tue, Jan 17, 2017 at 10:46 AM, Stephan Ewen <se...@apache.org> wrote:

> Hi Dmitry!
>
> The streaming runtime makes a conscious decision to not merge streams as
> in an ordered merge.
> The reason is that this is at large scale typically bad for scalability /
> network performance.
> Also, in certain DAGs, it may lead to deadlocks.
>
> Even the two input operator delivers records on a low level in a
> first-come-first-serve order as driven by network events (NIO events).
>
> Flink's operators tolerate out-of-order records to compensate for that.
> Overall, that seemed the more scalable design to us.
> Can your use case follow a similar approach?
>
> Stephan
>
>
>
> On Tue, Jan 17, 2017 at 10:57 AM, Dmitry Golubets <dg...@gmail.com>
> wrote:
>
>> Hi Timo,
>>
>> I don't have any key to join on, so I'm not sure Window Join would work
>> for me.
>>
>> Can I implement my own "low level" operator in any way?
>> I would appreciate if you can give me a hint or a link to example of how
>> to do it.
>>
>>
>>
>> Best regards,
>> Dmitry
>>
>> On Tue, Jan 17, 2017 at 9:24 AM, Timo Walther <tw...@apache.org> wrote:
>>
>>> Hi Dmitry,
>>>
>>> the runtime supports an arbitrary number of inputs, however, the API
>>> does currently not provide a convenient way. You could use the "union"
>>> operator to reduce the number of inputs. Otherwise I think you have to
>>> implement your own operator. That depends on your use case though.
>>>
>>> You can maintain backpressure by using Flink's operator state. But did
>>> you also thought about a Window Join instead?
>>>
>>> I hope that helps.
>>>
>>> Timo
>>>
>>>
>>>
>>>
>>> Am 17/01/17 um 00:20 schrieb Dmitry Golubets:
>>>
>>> Hi,
>>>
>>> there are only *two *interfaces defined at the moment:
>>> *OneInputStreamOperator*
>>> and
>>> *TwoInputStreamOperator.*
>>>
>>> Is there any way to define an operator with arbitrary number of inputs?
>>>
>>> My another concern is how to maintain *backpressure *in the operator?
>>> Let's say I read events from two Kafka sources, both of which are
>>> ordered by time. I want to merge them keeping the global order. But to do
>>> it, I need to stop block one input if another one has no data yet.
>>>
>>> Best regards,
>>> Dmitry
>>>
>>>
>>>
>>
>

Re: Three input stream operator and back pressure

Posted by Stephan Ewen <se...@apache.org>.
Hi Dmitry!

The streaming runtime makes a conscious decision to not merge streams as in
an ordered merge.
The reason is that this is at large scale typically bad for scalability /
network performance.
Also, in certain DAGs, it may lead to deadlocks.

Even the two input operator delivers records on a low level in a
first-come-first-serve order as driven by network events (NIO events).

Flink's operators tolerate out-of-order records to compensate for that.
Overall, that seemed the more scalable design to us.
Can your use case follow a similar approach?

Stephan



On Tue, Jan 17, 2017 at 10:57 AM, Dmitry Golubets <dg...@gmail.com>
wrote:

> Hi Timo,
>
> I don't have any key to join on, so I'm not sure Window Join would work
> for me.
>
> Can I implement my own "low level" operator in any way?
> I would appreciate if you can give me a hint or a link to example of how
> to do it.
>
>
>
> Best regards,
> Dmitry
>
> On Tue, Jan 17, 2017 at 9:24 AM, Timo Walther <tw...@apache.org> wrote:
>
>> Hi Dmitry,
>>
>> the runtime supports an arbitrary number of inputs, however, the API does
>> currently not provide a convenient way. You could use the "union" operator
>> to reduce the number of inputs. Otherwise I think you have to implement
>> your own operator. That depends on your use case though.
>>
>> You can maintain backpressure by using Flink's operator state. But did
>> you also thought about a Window Join instead?
>>
>> I hope that helps.
>>
>> Timo
>>
>>
>>
>>
>> Am 17/01/17 um 00:20 schrieb Dmitry Golubets:
>>
>> Hi,
>>
>> there are only *two *interfaces defined at the moment:
>> *OneInputStreamOperator*
>> and
>> *TwoInputStreamOperator.*
>>
>> Is there any way to define an operator with arbitrary number of inputs?
>>
>> My another concern is how to maintain *backpressure *in the operator?
>> Let's say I read events from two Kafka sources, both of which are ordered
>> by time. I want to merge them keeping the global order. But to do it, I
>> need to stop block one input if another one has no data yet.
>>
>> Best regards,
>> Dmitry
>>
>>
>>
>

Re: Three input stream operator and back pressure

Posted by Dmitry Golubets <dg...@gmail.com>.
Hi Timo,

I don't have any key to join on, so I'm not sure Window Join would work for
me.

Can I implement my own "low level" operator in any way?
I would appreciate if you can give me a hint or a link to example of how to
do it.



Best regards,
Dmitry

On Tue, Jan 17, 2017 at 9:24 AM, Timo Walther <tw...@apache.org> wrote:

> Hi Dmitry,
>
> the runtime supports an arbitrary number of inputs, however, the API does
> currently not provide a convenient way. You could use the "union" operator
> to reduce the number of inputs. Otherwise I think you have to implement
> your own operator. That depends on your use case though.
>
> You can maintain backpressure by using Flink's operator state. But did you
> also thought about a Window Join instead?
>
> I hope that helps.
>
> Timo
>
>
>
>
> Am 17/01/17 um 00:20 schrieb Dmitry Golubets:
>
> Hi,
>
> there are only *two *interfaces defined at the moment:
> *OneInputStreamOperator*
> and
> *TwoInputStreamOperator.*
>
> Is there any way to define an operator with arbitrary number of inputs?
>
> My another concern is how to maintain *backpressure *in the operator?
> Let's say I read events from two Kafka sources, both of which are ordered
> by time. I want to merge them keeping the global order. But to do it, I
> need to stop block one input if another one has no data yet.
>
> Best regards,
> Dmitry
>
>
>

Re: Three input stream operator and back pressure

Posted by Timo Walther <tw...@apache.org>.
Hi Dmitry,

the runtime supports an arbitrary number of inputs, however, the API 
does currently not provide a convenient way. You could use the "union" 
operator to reduce the number of inputs. Otherwise I think you have to 
implement your own operator. That depends on your use case though.

You can maintain backpressure by using Flink's operator state. But did 
you also thought about a Window Join instead?

I hope that helps.

Timo




Am 17/01/17 um 00:20 schrieb Dmitry Golubets:
> Hi,
>
> there are only *two *interfaces defined at the moment:
> /OneInputStreamOperator/
> and
> /TwoInputStreamOperator./
>
> Is there any way to define an operator with arbitrary number of inputs?
>
> My another concern is how to maintain *backpressure *in the operator?
> Let's say I read events from two Kafka sources, both of which are 
> ordered by time. I want to merge them keeping the global order. But to 
> do it, I need to stop block one input if another one has no data yet.
>
> Best regards,
> Dmitry