You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Maximilian Michels <mx...@apache.org> on 2018/10/17 12:52:03 UTC

Integrating Stateful DoFns from the Python SDK

Hi everyone,

While integrating portable state with the FlinkRunner, I hit a problem 
and wanted to get your opinion.

Stateful DoFns require their input to be KV records. The reason for this 
is that state is isolated by key. The (non-portable) FlinkRunner uses 
Flink's `keyBy(key)` construct to partition state by key [1].

That works fine for portable Java pipelines where we enforce the `KV` 
class for Stateful DoFns. After running tests with the Python SDK, I 
came to the conclusion that tuples, e.g. `(key, value)` which are used 
for KV functionality, do not go through the KvCoder but are encoded 
using a byte array encoder.

How do we infer the key in the Runner from an opaque sequence of bytes? 
Should we also require the KvCoder for stateful DoFns in the Python SDK?

Thanks,
Max

[1] 
https://github.com/apache/beam/blob/22f59deaf2a0aa77d98f2f024ce4b2a399014382/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L471

Re: Integrating Stateful DoFns from the Python SDK

Posted by Robert Bradshaw <ro...@google.com>.
I think Create() simply ignores any type hints it's given (which should be
fixed).

On Wed, Oct 17, 2018 at 4:17 PM Maximilian Michels <mx...@apache.org> wrote:

> Type hints turn out to be not so predictable:
>
> 1) WORKS
>    p | beam.Impulse() \
>      | beam.ParDo(MyCreate()).with_output_types(typehints.KV[K, V]) \
>      | "statefulParDo" >> beam.ParDo(AddIndex())
>
> 2) DOES NOT (no KvCoder)
>    p | beam.Create(inputs).with_output_types(typehints.KV[K, V]) \
>      | "statefulParDo" >> beam.ParDo(AddIndex())
>
> Do you know a way to make 2) work, i.e. set the KvCoder for the Create?
>
>
> In the first example, the Create runs in a ParDo, in the second example
> On 17.10.18 15:34, Maximilian Michels wrote:
> > Thanks Robert. I was able to get it working by adding this to the
> > transform before my stateful DoFn:
> >
> >    .with_output_types(typehints.KV[K, V])
> >
> > For some reason `.with_input_types(typehints.KV[K, V])` on my stateful
> > DoFn did not work.
> >
> > Until we enforce KV during pipeline construction, we will have to throw
> > an informative exception in the Runner.
> >
> > On 17.10.18 15:03, Robert Bradshaw wrote:
> >> Yes, we should be enforcing keyness (and use of KeyCoder with)
> >> stateful DoFns, similar to what we do for GBKs. See e.g.
> >> https://github.com/apache/beam/pull/6304#issuecomment-421935375
> >>
> >> (This possibly relates to a long-standing issue that the coder
> >> inference should be moved up into construction, or at least before we
> >> pass the graph to the runner.)
> >>
> >> On Wed, Oct 17, 2018 at 2:52 PM Maximilian Michels <mxm@apache.org
> >> <ma...@apache.org>> wrote:
> >>
> >>     Hi everyone,
> >>
> >>     While integrating portable state with the FlinkRunner, I hit a
> >> problem
> >>     and wanted to get your opinion.
> >>
> >>     Stateful DoFns require their input to be KV records. The reason for
> >>     this
> >>     is that state is isolated by key. The (non-portable) FlinkRunner
> uses
> >>     Flink's `keyBy(key)` construct to partition state by key [1].
> >>
> >>     That works fine for portable Java pipelines where we enforce the
> `KV`
> >>     class for Stateful DoFns. After running tests with the Python SDK, I
> >>     came to the conclusion that tuples, e.g. `(key, value)` which are
> >> used
> >>     for KV functionality, do not go through the KvCoder but are encoded
> >>     using a byte array encoder.
> >>
> >>     How do we infer the key in the Runner from an opaque sequence of
> >> bytes?
> >>     Should we also require the KvCoder for stateful DoFns in the
> >> Python SDK?
> >>
> >>     Thanks,
> >>     Max
> >>
> >>     [1]
> >>
> >>
> https://github.com/apache/beam/blob/22f59deaf2a0aa77d98f2f024ce4b2a399014382/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L471
> >>
> >>
>

Re: Integrating Stateful DoFns from the Python SDK

Posted by Maximilian Michels <mx...@apache.org>.
Type hints turn out to be not so predictable:

1) WORKS
   p | beam.Impulse() \
     | beam.ParDo(MyCreate()).with_output_types(typehints.KV[K, V]) \
     | "statefulParDo" >> beam.ParDo(AddIndex())

2) DOES NOT (no KvCoder)
   p | beam.Create(inputs).with_output_types(typehints.KV[K, V]) \
     | "statefulParDo" >> beam.ParDo(AddIndex())

Do you know a way to make 2) work, i.e. set the KvCoder for the Create?


In the first example, the Create runs in a ParDo, in the second example
On 17.10.18 15:34, Maximilian Michels wrote:
> Thanks Robert. I was able to get it working by adding this to the 
> transform before my stateful DoFn:
> 
>    .with_output_types(typehints.KV[K, V])
> 
> For some reason `.with_input_types(typehints.KV[K, V])` on my stateful 
> DoFn did not work.
> 
> Until we enforce KV during pipeline construction, we will have to throw 
> an informative exception in the Runner.
> 
> On 17.10.18 15:03, Robert Bradshaw wrote:
>> Yes, we should be enforcing keyness (and use of KeyCoder with) 
>> stateful DoFns, similar to what we do for GBKs. See e.g. 
>> https://github.com/apache/beam/pull/6304#issuecomment-421935375
>>
>> (This possibly relates to a long-standing issue that the coder 
>> inference should be moved up into construction, or at least before we 
>> pass the graph to the runner.)
>>
>> On Wed, Oct 17, 2018 at 2:52 PM Maximilian Michels <mxm@apache.org 
>> <ma...@apache.org>> wrote:
>>
>>     Hi everyone,
>>
>>     While integrating portable state with the FlinkRunner, I hit a 
>> problem
>>     and wanted to get your opinion.
>>
>>     Stateful DoFns require their input to be KV records. The reason for
>>     this
>>     is that state is isolated by key. The (non-portable) FlinkRunner uses
>>     Flink's `keyBy(key)` construct to partition state by key [1].
>>
>>     That works fine for portable Java pipelines where we enforce the `KV`
>>     class for Stateful DoFns. After running tests with the Python SDK, I
>>     came to the conclusion that tuples, e.g. `(key, value)` which are 
>> used
>>     for KV functionality, do not go through the KvCoder but are encoded
>>     using a byte array encoder.
>>
>>     How do we infer the key in the Runner from an opaque sequence of 
>> bytes?
>>     Should we also require the KvCoder for stateful DoFns in the 
>> Python SDK?
>>
>>     Thanks,
>>     Max
>>
>>     [1]
>>     
>> https://github.com/apache/beam/blob/22f59deaf2a0aa77d98f2f024ce4b2a399014382/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L471 
>>
>>

Re: Integrating Stateful DoFns from the Python SDK

Posted by Maximilian Michels <mx...@apache.org>.
Thanks Robert. I was able to get it working by adding this to the 
transform before my stateful DoFn:

   .with_output_types(typehints.KV[K, V])

For some reason `.with_input_types(typehints.KV[K, V])` on my stateful 
DoFn did not work.

Until we enforce KV during pipeline construction, we will have to throw 
an informative exception in the Runner.

On 17.10.18 15:03, Robert Bradshaw wrote:
> Yes, we should be enforcing keyness (and use of KeyCoder with) stateful 
> DoFns, similar to what we do for GBKs. See e.g. 
> https://github.com/apache/beam/pull/6304#issuecomment-421935375
> 
> (This possibly relates to a long-standing issue that the coder inference 
> should be moved up into construction, or at least before we pass the 
> graph to the runner.)
> 
> On Wed, Oct 17, 2018 at 2:52 PM Maximilian Michels <mxm@apache.org 
> <ma...@apache.org>> wrote:
> 
>     Hi everyone,
> 
>     While integrating portable state with the FlinkRunner, I hit a problem
>     and wanted to get your opinion.
> 
>     Stateful DoFns require their input to be KV records. The reason for
>     this
>     is that state is isolated by key. The (non-portable) FlinkRunner uses
>     Flink's `keyBy(key)` construct to partition state by key [1].
> 
>     That works fine for portable Java pipelines where we enforce the `KV`
>     class for Stateful DoFns. After running tests with the Python SDK, I
>     came to the conclusion that tuples, e.g. `(key, value)` which are used
>     for KV functionality, do not go through the KvCoder but are encoded
>     using a byte array encoder.
> 
>     How do we infer the key in the Runner from an opaque sequence of bytes?
>     Should we also require the KvCoder for stateful DoFns in the Python SDK?
> 
>     Thanks,
>     Max
> 
>     [1]
>     https://github.com/apache/beam/blob/22f59deaf2a0aa77d98f2f024ce4b2a399014382/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L471
> 

Re: Integrating Stateful DoFns from the Python SDK

Posted by Robert Bradshaw <ro...@google.com>.
Yes, we should be enforcing keyness (and use of KeyCoder with) stateful
DoFns, similar to what we do for GBKs. See e.g.
https://github.com/apache/beam/pull/6304#issuecomment-421935375

(This possibly relates to a long-standing issue that the coder inference
should be moved up into construction, or at least before we pass the graph
to the runner.)

On Wed, Oct 17, 2018 at 2:52 PM Maximilian Michels <mx...@apache.org> wrote:

> Hi everyone,
>
> While integrating portable state with the FlinkRunner, I hit a problem
> and wanted to get your opinion.
>
> Stateful DoFns require their input to be KV records. The reason for this
> is that state is isolated by key. The (non-portable) FlinkRunner uses
> Flink's `keyBy(key)` construct to partition state by key [1].
>
> That works fine for portable Java pipelines where we enforce the `KV`
> class for Stateful DoFns. After running tests with the Python SDK, I
> came to the conclusion that tuples, e.g. `(key, value)` which are used
> for KV functionality, do not go through the KvCoder but are encoded
> using a byte array encoder.
>
> How do we infer the key in the Runner from an opaque sequence of bytes?
> Should we also require the KvCoder for stateful DoFns in the Python SDK?
>
> Thanks,
> Max
>
> [1]
>
> https://github.com/apache/beam/blob/22f59deaf2a0aa77d98f2f024ce4b2a399014382/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java#L471
>