You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Michael Warnock <mi...@ripple.com> on 2016/08/23 18:03:42 UTC

sharded state, 2-step operation

I'm trying to do something that seems like it should be possible, but my
implementation doesn't behave as expected, and I'm not sure how else to
express it.

Let's say the stream is composed of tuples like this: (Alice, Bob, 1) and I
want to keyBy(1), flatMap with state associated with Alice, then keyBy(2)
with state associated with Bob.  The trick is, when I later get a tuple
like (Bob, Alice, 1), I want the first operator to see the state that was
updated in the second op previously.  Is this possible?  I tried
 implementing both operators as one, getting the state by descriptor in the
flatMap body, and even instantiating the operator only once; the behavior
is, as you might guess, that the state in stage 1 doesn't include changes
made previously in stage 2.

Is there any way to do this without throwing away the parallelism?

Thanks in advance!
~Michael

Re: sharded state, 2-step operation

Posted by Stephan Ewen <se...@apache.org>.
Hi!

The "feedback loop" sounds like a solution, yes. Actually, that works well
with the CoMap / CoFlatMap - one input to the CoMap would be the original
value, the other input the feedback value.

https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html#datastream-transformations


Once Flink's iterations are better hardened, these could be used for
feedback as well, and would be exactly once.

Best,
Stephan



On Tue, Aug 23, 2016 at 9:05 PM, Michael Warnock <mi...@ripple.com> wrote:

> Another approach I'm considering, which feels pretty kludgy, but I think
> should be acceptable for my current use:
>
> Only one stateful op, keyed on the same field, but with a flag field
> indicating the actual operation to be performed.  The results of this op
> are output to a kafka (or whatever) queue, which is ingested along with the
> first stream.  The two state changes don't have to be atomic for my case,
> but the second one does have to be guaranteed to eventually happen, and be
> idempotent.  I'm not quite sure how to (safely) make that second pass
> idempotent though, at the moment, and I'm not sure if there might be other
> issues with it I'm not seeing - it definitely doesn't _feel_ like a great
> solution.
>
> Any thoughts?
>
> On Tue, Aug 23, 2016 at 11:53 AM, Michael Warnock <mi...@ripple.com>
> wrote:
>
>> Thanks for the quick response!
>>
>> I've been wondering about Connected streams and CoFlatMap, but either I
>> don't see all the ways they can be used, or they don't solve my problem.
>> Do you know of any examples outside of the documentation?  My searches for
>> "flink comap example" and similar haven't turned anything up.
>>
>> On Tue, Aug 23, 2016 at 11:41 AM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> Hi!
>>>
>>> This is a tricky one. State access and changes are not shared across
>>> operators in Flink.
>>> We chose that design because it makes it possible to work on "local"
>>> state in each operator
>>>   - state automatically shards with the computation
>>>   - no locking / concurrency implications
>>>   - asynchronous persistence
>>>
>>> Sharing state across operations between two operations in the same stage
>>> works with the CoMap / CoFlatMap functions
>>> Sharing state across successive nodes does not work, because the
>>> functions could be executed on different machines and one would need to do
>>> remote and synchronized state updates that way.
>>>
>>> Do you think you can use the CoMap / CoFlatMap functions for this?
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Tue, Aug 23, 2016 at 8:03 PM, Michael Warnock <mi...@ripple.com>
>>> wrote:
>>>
>>>> I'm trying to do something that seems like it should be possible, but
>>>> my implementation doesn't behave as expected, and I'm not sure how else to
>>>> express it.
>>>>
>>>> Let's say the stream is composed of tuples like this: (Alice, Bob, 1)
>>>> and I want to keyBy(1), flatMap with state associated with Alice, then
>>>> keyBy(2) with state associated with Bob.  The trick is, when I later get a
>>>> tuple like (Bob, Alice, 1), I want the first operator to see the state that
>>>> was updated in the second op previously.  Is this possible?  I tried
>>>>  implementing both operators as one, getting the state by descriptor in the
>>>> flatMap body, and even instantiating the operator only once; the behavior
>>>> is, as you might guess, that the state in stage 1 doesn't include changes
>>>> made previously in stage 2.
>>>>
>>>> Is there any way to do this without throwing away the parallelism?
>>>>
>>>> Thanks in advance!
>>>> ~Michael
>>>>
>>>
>>>
>>
>

Re: sharded state, 2-step operation

Posted by Michael Warnock <mi...@ripple.com>.
Another approach I'm considering, which feels pretty kludgy, but I think
should be acceptable for my current use:

Only one stateful op, keyed on the same field, but with a flag field
indicating the actual operation to be performed.  The results of this op
are output to a kafka (or whatever) queue, which is ingested along with the
first stream.  The two state changes don't have to be atomic for my case,
but the second one does have to be guaranteed to eventually happen, and be
idempotent.  I'm not quite sure how to (safely) make that second pass
idempotent though, at the moment, and I'm not sure if there might be other
issues with it I'm not seeing - it definitely doesn't _feel_ like a great
solution.

Any thoughts?

On Tue, Aug 23, 2016 at 11:53 AM, Michael Warnock <mi...@ripple.com>
wrote:

> Thanks for the quick response!
>
> I've been wondering about Connected streams and CoFlatMap, but either I
> don't see all the ways they can be used, or they don't solve my problem.
> Do you know of any examples outside of the documentation?  My searches for
> "flink comap example" and similar haven't turned anything up.
>
> On Tue, Aug 23, 2016 at 11:41 AM, Stephan Ewen <se...@apache.org> wrote:
>
>> Hi!
>>
>> This is a tricky one. State access and changes are not shared across
>> operators in Flink.
>> We chose that design because it makes it possible to work on "local"
>> state in each operator
>>   - state automatically shards with the computation
>>   - no locking / concurrency implications
>>   - asynchronous persistence
>>
>> Sharing state across operations between two operations in the same stage
>> works with the CoMap / CoFlatMap functions
>> Sharing state across successive nodes does not work, because the
>> functions could be executed on different machines and one would need to do
>> remote and synchronized state updates that way.
>>
>> Do you think you can use the CoMap / CoFlatMap functions for this?
>>
>> Greetings,
>> Stephan
>>
>>
>> On Tue, Aug 23, 2016 at 8:03 PM, Michael Warnock <mi...@ripple.com>
>> wrote:
>>
>>> I'm trying to do something that seems like it should be possible, but my
>>> implementation doesn't behave as expected, and I'm not sure how else to
>>> express it.
>>>
>>> Let's say the stream is composed of tuples like this: (Alice, Bob, 1)
>>> and I want to keyBy(1), flatMap with state associated with Alice, then
>>> keyBy(2) with state associated with Bob.  The trick is, when I later get a
>>> tuple like (Bob, Alice, 1), I want the first operator to see the state that
>>> was updated in the second op previously.  Is this possible?  I tried
>>>  implementing both operators as one, getting the state by descriptor in the
>>> flatMap body, and even instantiating the operator only once; the behavior
>>> is, as you might guess, that the state in stage 1 doesn't include changes
>>> made previously in stage 2.
>>>
>>> Is there any way to do this without throwing away the parallelism?
>>>
>>> Thanks in advance!
>>> ~Michael
>>>
>>
>>
>

Re: sharded state, 2-step operation

Posted by Michael Warnock <mi...@ripple.com>.
Thanks for the quick response!

I've been wondering about Connected streams and CoFlatMap, but either I
don't see all the ways they can be used, or they don't solve my problem.
Do you know of any examples outside of the documentation?  My searches for
"flink comap example" and similar haven't turned anything up.

On Tue, Aug 23, 2016 at 11:41 AM, Stephan Ewen <se...@apache.org> wrote:

> Hi!
>
> This is a tricky one. State access and changes are not shared across
> operators in Flink.
> We chose that design because it makes it possible to work on "local" state
> in each operator
>   - state automatically shards with the computation
>   - no locking / concurrency implications
>   - asynchronous persistence
>
> Sharing state across operations between two operations in the same stage
> works with the CoMap / CoFlatMap functions
> Sharing state across successive nodes does not work, because the functions
> could be executed on different machines and one would need to do remote and
> synchronized state updates that way.
>
> Do you think you can use the CoMap / CoFlatMap functions for this?
>
> Greetings,
> Stephan
>
>
> On Tue, Aug 23, 2016 at 8:03 PM, Michael Warnock <mi...@ripple.com>
> wrote:
>
>> I'm trying to do something that seems like it should be possible, but my
>> implementation doesn't behave as expected, and I'm not sure how else to
>> express it.
>>
>> Let's say the stream is composed of tuples like this: (Alice, Bob, 1) and
>> I want to keyBy(1), flatMap with state associated with Alice, then keyBy(2)
>> with state associated with Bob.  The trick is, when I later get a tuple
>> like (Bob, Alice, 1), I want the first operator to see the state that was
>> updated in the second op previously.  Is this possible?  I tried
>>  implementing both operators as one, getting the state by descriptor in the
>> flatMap body, and even instantiating the operator only once; the behavior
>> is, as you might guess, that the state in stage 1 doesn't include changes
>> made previously in stage 2.
>>
>> Is there any way to do this without throwing away the parallelism?
>>
>> Thanks in advance!
>> ~Michael
>>
>
>

Re: sharded state, 2-step operation

Posted by Stephan Ewen <se...@apache.org>.
Hi!

This is a tricky one. State access and changes are not shared across
operators in Flink.
We chose that design because it makes it possible to work on "local" state
in each operator
  - state automatically shards with the computation
  - no locking / concurrency implications
  - asynchronous persistence

Sharing state across operations between two operations in the same stage
works with the CoMap / CoFlatMap functions
Sharing state across successive nodes does not work, because the functions
could be executed on different machines and one would need to do remote and
synchronized state updates that way.

Do you think you can use the CoMap / CoFlatMap functions for this?

Greetings,
Stephan


On Tue, Aug 23, 2016 at 8:03 PM, Michael Warnock <mi...@ripple.com> wrote:

> I'm trying to do something that seems like it should be possible, but my
> implementation doesn't behave as expected, and I'm not sure how else to
> express it.
>
> Let's say the stream is composed of tuples like this: (Alice, Bob, 1) and
> I want to keyBy(1), flatMap with state associated with Alice, then keyBy(2)
> with state associated with Bob.  The trick is, when I later get a tuple
> like (Bob, Alice, 1), I want the first operator to see the state that was
> updated in the second op previously.  Is this possible?  I tried
>  implementing both operators as one, getting the state by descriptor in the
> flatMap body, and even instantiating the operator only once; the behavior
> is, as you might guess, that the state in stage 1 doesn't include changes
> made previously in stage 2.
>
> Is there any way to do this without throwing away the parallelism?
>
> Thanks in advance!
> ~Michael
>