You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Wesley Tanaka <wt...@yahoo.com> on 2017/04/02 21:52:48 UTC

Conceptual foundation of State API

I'm trying to understand the State API (in 0.6.0/Java). I started with https://s.apache.org/presenting-a-new-dofn in order to understand the syntax, but am still not understanding something conceptually.  This may be related to me learning Beam before Flink/Dataflow/Apex.
Does the long term vision of Beam model have this technical contract as a part of its semantics:
"A DoFn which uses state API MUST have an input type of KV<K,V>"  (if so, does Beam put further requirements upon the K type, e.g. does it need to implement hashCode or equals in particular ways, or require that the serialized bytes of the instances of K are equal if and only if the instances of K should share the same state cell)
In testValueStateSimple in https://github.com/apache/beam/blob/e31ca8b0d05e47c2588d5db29c92bac49aa410da/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java#L1615 if I change the DoFn signature:

FROM: DoFn<KV<String, Integer>, Integer>TO: DoFn<String, Integer>
Then I start getting this error, which is confusing me.  Is this ultimately caused because the above technical contract is actually required but not enforced in some kind of validation, or is this something else silly that I'm doing wrong?  :)

java.lang.NullPointerException: Outputs for non-root node Nl/ParDo(Anonymous)/ParMultiDo(Anonymous) are null
 at org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:864)
 at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:490)
 at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:481)
 at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:481)
 at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:481)
 at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$400(TransformHierarchy.java:231)
 at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:206)
 at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:321)
 at org.apache.beam.sdk.testing.TestPipeline$PipelineAbandonedNodeEnforcement.recordPipelineNodes(TestPipeline.java:166)
 at org.apache.beam.sdk.testing.TestPipeline$PipelineAbandonedNodeEnforcement.afterPipelineExecution(TestPipeline.java:200)
 at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:314)

Finally, it seems like it would be possible to add state API to the processing of any arbitrary non-KV PCollection by simply tacking on the string "hello" like in the unit tests to every value using WithKeys.  I suspect the answer will probably depend on the runner, but is there a general intuition that I could gain for what bad thing will happen if I do this, e.g. will the stateful ParDo be stuck running within a single machine, or will we run some lower layer out of memory, or will we make the network traffic between cluster nodes much more chatty and synchronized?



---
Wesley Tanaka
http://wtanaka.com/

Re: Conceptual foundation of State API

Posted by Kenneth Knowles <kl...@google.com>.
On Sun, Apr 2, 2017 at 10:30 PM, Wesley Tanaka <wt...@yahoo.com> wrote:

> Thanks Kenn,
>
> Just curious, do you think there might be any approach that would let the
> validation occur at compile time?
>

It is an admirable goal to get maximum checking at compile time. We always
consider the balance of usability, compile-time checking, and
construction-time checking (validating your graph before you start to run
it - basically "static" checking for the Beam-in-Java "language").

Java's type system can sometimes embed interesting properties, but it isn't
that great for getting fancy like you might in Scala or Haskell, so we make
tradeoffs to keep usability.

With the annotation-driven design of DoFn in the Java SDK, we have
deliberately chosen to give up a lot of compile-time checking in exchange
for vastly better construction-time checking. Neglecting to have a key when
running a stateful DoFn can and should be caught at construction time. This
is tracked by https://issues.apache.org/jira/browse/BEAM-1030.

Kenn


> ---
> Wesley Tanaka
> http://wtanaka.com/
>
>
> On Sunday, April 2, 2017, 5:18:26 AM HST, Kenneth Knowles <kl...@google.com>
> wrote:
> Hi Wesley,
>
>
> On Apr 2, 2017 14:56, "Wesley Tanaka" <wt...@yahoo.com> wrote:
>
> I'm trying to understand the State API (in 0.6.0/Java). I started with https://s.apache.org/
> presenting-a-new-dofn <https://s.apache.org/presenting-a-new-dofn> in
> order to understand the syntax, but am still not understanding something
> conceptually.  This may be related to me learning Beam before
> Flink/Dataflow/Apex.
>
> Does the long term vision of Beam model have this technical contract as a
> part of its semantics:
>
> "A DoFn which uses state API MUST have an input type of KV<K,V>"
>
>
> Yes, this is required. The state is partitioned by key and window, so
> without a key we wouldn't have a well-defined partitioning.
>
> You are correct that adding a key like "hello" to every value in a
> collection would suffice, but this is generally not a good idea for exactly
> the reason you surmised. (This is also why we don't support state without a
> key. Technically parallelism of stateful processing is also provided per
> window, but today no runner implements this in parallel)
>
> Stateful computation occurs sequentially by definition - whatever
> computation reads a value that was previously written happens strictly
> afterwards. So by putting one key throughout your collection, you eliminate
> parallelism. Sometimes this could be OK for special places in your
> pipeline, but for big data it is not going to work.
>
> The particular error you encountered should instead be clear and
> actionable, rather than a NullPointerException. I will follow up with a
> JIRA issue.
>
> Kenn
>
>  (if so, does Beam put further requirements upon the K type, e.g. does it
> need to implement hashCode or equals in particular ways, or require that
> the serialized bytes of the instances of K are equal if and only if the
> instances of K should share the same state cell)
>
> In testValueStateSimple in https://github.com/apache/ beam/blob/
> e31ca8b0d05e47c2588d5db29c92ba c49aa410da/sdks/java/core/src/
> test/java/org/apache/beam/sdk/ transforms/ParDoTest.java# L1615
> <https://github.com/apache/beam/blob/e31ca8b0d05e47c2588d5db29c92bac49aa410da/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java#L1615> if
> I change the DoFn signature:
>
> FROM: DoFn<KV<String, Integer>, Integer>
> TO: DoFn<String, Integer>
>
> Then I start getting this error, which is confusing me.  Is this
> ultimately caused because the above technical contract is actually required
> but not enforced in some kind of validation, or is this something else
> silly that I'm doing wrong?  :)
>
> java.lang. NullPointerException: Outputs for non-root node
> Nl/ParDo(Anonymous)/ ParMultiDo(Anonymous) are null
> at org.apache.beam.sdk. repackaged.com.google.common. base.Preconditions.
> checkNotNull(Preconditions. java:864)
> at org.apache.beam.sdk.runners. TransformHierarchy$Node.visit(
> TransformHierarchy.java:490)
> at org.apache.beam.sdk.runners. TransformHierarchy$Node.visit(
> TransformHierarchy.java:481)
> at org.apache.beam.sdk.runners. TransformHierarchy$Node.visit(
> TransformHierarchy.java:481)
> at org.apache.beam.sdk.runners. TransformHierarchy$Node.visit(
> TransformHierarchy.java:481)
> at org.apache.beam.sdk.runners. TransformHierarchy$Node.
> access$400(TransformHierarchy. java:231)
> at org.apache.beam.sdk.runners. TransformHierarchy.visit(
> TransformHierarchy.java:206)
> at org.apache.beam.sdk.Pipeline. traverseTopologically( Pipeline.java:321)
> at org.apache.beam.sdk.testing. TestPipeline$
> PipelineAbandonedNodeEnforceme nt.recordPipelineNodes(
> TestPipeline.java:166)
> at org.apache.beam.sdk.testing. TestPipeline$
> PipelineAbandonedNodeEnforceme nt.afterPipelineExecution(
> TestPipeline.java:200)
> at org.apache.beam.sdk.testing. TestPipeline.run(TestPipeline. java:314)
>
> Finally, it seems like it would be possible to add state API to the
> processing of any arbitrary non-KV PCollection by simply tacking on the
> string "hello" like in the unit tests to every value using WithKeys.  I
> suspect the answer will probably depend on the runner, but is there a
> general intuition that I could gain for what bad thing will happen if I do
> this, e.g. will the stateful ParDo be stuck running within a single
> machine, or will we run some lower layer out of memory, or will we make the
> network traffic between cluster nodes much more chatty and synchronized?
>
>
>
>
> ---
> Wesley Tanaka
> http://wtanaka.com/
>
>
>

Re: Conceptual foundation of State API

Posted by Wesley Tanaka <wt...@yahoo.com>.
Thanks Kenn,
Just curious, do you think there might be any approach that would let the validation occur at compile time?


---
Wesley Tanaka
http://wtanaka.com/

On Sunday, April 2, 2017, 5:18:26 AM HST, Kenneth Knowles <kl...@google.com> wrote:Hi Wesley,

On Apr 2, 2017 14:56, "Wesley Tanaka" <wt...@yahoo.com> wrote:

I'm trying to understand the State API (in 0.6.0/Java). I started with https://s.apache.org/ presenting-a-new-dofn in order to understand the syntax, but am still not understanding something conceptually.  This may be related to me learning Beam before Flink/Dataflow/Apex.
Does the long term vision of Beam model have this technical contract as a part of its semantics:
"A DoFn which uses state API MUST have an input type of KV<K,V>"

Yes, this is required. The state is partitioned by key and window, so without a key we wouldn't have a well-defined partitioning.
You are correct that adding a key like "hello" to every value in a collection would suffice, but this is generally not a good idea for exactly the reason you surmised. (This is also why we don't support state without a key. Technically parallelism of stateful processing is also provided per window, but today no runner implements this in parallel)
Stateful computation occurs sequentially by definition - whatever computation reads a value that was previously written happens strictly afterwards. So by putting one key throughout your collection, you eliminate parallelism. Sometimes this could be OK for special places in your pipeline, but for big data it is not going to work.
The particular error you encountered should instead be clear and actionable, rather than a NullPointerException. I will follow up with a JIRA issue.
Kenn

  (if so, does Beam put further requirements upon the K type, e.g. does it need to implement hashCode or equals in particular ways, or require that the serialized bytes of the instances of K are equal if and only if the instances of K should share the same state cell)
In testValueStateSimple in https://github.com/apache/ beam/blob/ e31ca8b0d05e47c2588d5db29c92ba c49aa410da/sdks/java/core/src/ test/java/org/apache/beam/sdk/ transforms/ParDoTest.java# L1615 if I change the DoFn signature:

FROM: DoFn<KV<String, Integer>, Integer>TO: DoFn<String, Integer>
Then I start getting this error, which is confusing me.  Is this ultimately caused because the above technical contract is actually required but not enforced in some kind of validation, or is this something else silly that I'm doing wrong?  :)

java.lang. NullPointerException: Outputs for non-root node Nl/ParDo(Anonymous)/ ParMultiDo(Anonymous) are null
 at org.apache.beam.sdk. repackaged.com.google.common. base.Preconditions. checkNotNull(Preconditions. java:864)
 at org.apache.beam.sdk.runners. TransformHierarchy$Node.visit( TransformHierarchy.java:490)
 at org.apache.beam.sdk.runners. TransformHierarchy$Node.visit( TransformHierarchy.java:481)
 at org.apache.beam.sdk.runners. TransformHierarchy$Node.visit( TransformHierarchy.java:481)
 at org.apache.beam.sdk.runners. TransformHierarchy$Node.visit( TransformHierarchy.java:481)
 at org.apache.beam.sdk.runners. TransformHierarchy$Node. access$400(TransformHierarchy. java:231)
 at org.apache.beam.sdk.runners. TransformHierarchy.visit( TransformHierarchy.java:206)
 at org.apache.beam.sdk.Pipeline. traverseTopologically( Pipeline.java:321)
 at org.apache.beam.sdk.testing. TestPipeline$ PipelineAbandonedNodeEnforceme nt.recordPipelineNodes( TestPipeline.java:166)
 at org.apache.beam.sdk.testing. TestPipeline$ PipelineAbandonedNodeEnforceme nt.afterPipelineExecution( TestPipeline.java:200)
 at org.apache.beam.sdk.testing. TestPipeline.run(TestPipeline. java:314)

Finally, it seems like it would be possible to add state API to the processing of any arbitrary non-KV PCollection by simply tacking on the string "hello" like in the unit tests to every value using WithKeys.  I suspect the answer will probably depend on the runner, but is there a general intuition that I could gain for what bad thing will happen if I do this, e.g. will the stateful ParDo be stuck running within a single machine, or will we run some lower layer out of memory, or will we make the network traffic between cluster nodes much more chatty and synchronized?



---
Wesley Tanaka
http://wtanaka.com/


Re: Conceptual foundation of State API

Posted by Kenneth Knowles <kl...@google.com>.
Hi Wesley,


On Apr 2, 2017 14:56, "Wesley Tanaka" <wt...@yahoo.com> wrote:

I'm trying to understand the State API (in 0.6.0/Java). I started with
https://s.apache.org/presenting-a-new-dofn in order to understand the
syntax, but am still not understanding something conceptually.  This may be
related to me learning Beam before Flink/Dataflow/Apex.

Does the long term vision of Beam model have this technical contract as a
part of its semantics:

"A DoFn which uses state API MUST have an input type of KV<K,V>"


Yes, this is required. The state is partitioned by key and window, so
without a key we wouldn't have a well-defined partitioning.

You are correct that adding a key like "hello" to every value in a
collection would suffice, but this is generally not a good idea for exactly
the reason you surmised. (This is also why we don't support state without a
key. Technically parallelism of stateful processing is also provided per
window, but today no runner implements this in parallel)

Stateful computation occurs sequentially by definition - whatever
computation reads a value that was previously written happens strictly
afterwards. So by putting one key throughout your collection, you eliminate
parallelism. Sometimes this could be OK for special places in your
pipeline, but for big data it is not going to work.

The particular error you encountered should instead be clear and
actionable, rather than a NullPointerException. I will follow up with a
JIRA issue.

Kenn

 (if so, does Beam put further requirements upon the K type, e.g. does it
need to implement hashCode or equals in particular ways, or require that
the serialized bytes of the instances of K are equal if and only if the
instances of K should share the same state cell)

In testValueStateSimple in https://github.com/apache/beam/blob/
e31ca8b0d05e47c2588d5db29c92bac49aa410da/sdks/java/core/src/
test/java/org/apache/beam/sdk/transforms/ParDoTest.java#L1615 if I change
the DoFn signature:

FROM: DoFn<KV<String, Integer>, Integer>
TO: DoFn<String, Integer>

Then I start getting this error, which is confusing me.  Is this ultimately
caused because the above technical contract is actually required but not
enforced in some kind of validation, or is this something else silly that
I'm doing wrong?  :)

java.lang.NullPointerException: Outputs for non-root node
Nl/ParDo(Anonymous)/ParMultiDo(Anonymous) are null
at org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.
checkNotNull(Preconditions.java:864)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(
TransformHierarchy.java:490)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(
TransformHierarchy.java:481)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(
TransformHierarchy.java:481)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(
TransformHierarchy.java:481)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.
access$400(TransformHierarchy.java:231)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(
TransformHierarchy.java:206)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:321)
at org.apache.beam.sdk.testing.TestPipeline$PipelineAbandonedNodeEnforceme
nt.recordPipelineNodes(TestPipeline.java:166)
at org.apache.beam.sdk.testing.TestPipeline$PipelineAbandonedNodeEnforceme
nt.afterPipelineExecution(TestPipeline.java:200)
at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:314)

Finally, it seems like it would be possible to add state API to the
processing of any arbitrary non-KV PCollection by simply tacking on the
string "hello" like in the unit tests to every value using WithKeys.  I
suspect the answer will probably depend on the runner, but is there a
general intuition that I could gain for what bad thing will happen if I do
this, e.g. will the stateful ParDo be stuck running within a single
machine, or will we run some lower layer out of memory, or will we make the
network traffic between cluster nodes much more chatty and synchronized?




---
Wesley Tanaka
http://wtanaka.com/