You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Maximilian Michels <mx...@apache.org> on 2018/11/01 09:42:58 UTC

Re: Accessing keyed state in portable timer callbacks

Hi Lukasz,

Thanks for promptly fixing this [1]. I saw that the current element was 
not set correctly when timers are processed, but wanted to make sure any 
changes would be aligned with the harness processing model.

I think I favor the currentElementOrTimer approach because it makes 
things more explicit, but the solution is fine for now.

Thanks,
Max

[1] https://github.com/apache/beam/pull/6902

On 31.10.18 19:09, Lukasz Cwik wrote:
> I filed https://issues.apache.org/jira/browse/BEAM-5930.
> 
> On Wed, Oct 31, 2018 at 10:22 AM Lukasz Cwik <lcwik@google.com 
> <ma...@google.com>> wrote:
> 
>     That looks like a bug in the FnApiDoFnRunner.java
> 
>     The FnApiStateAccessor is given a callback to get the current
>     element and it is not handling the case where the current element is
>     a timer.
> 
>     callback:
>     https://github.com/apache/beam/blob/29c443162a2fe4c89d26336b30aa6e3a3bfbade8/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L212
>     where the current "element" gets set:
>     https://github.com/apache/beam/blob/29c443162a2fe4c89d26336b30aa6e3a3bfbade8/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L220
>     where the current "timer" gets set:
>     https://github.com/apache/beam/blob/29c443162a2fe4c89d26336b30aa6e3a3bfbade8/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L237
> 
>     The easiest fix would be to have the callback return the first non
>     null from currentElement/currentTimer but longer term I think we'll
>     want a different solution. Alternatively, we could collapse
>     currentElement and currentTimer to be currentElementOrTimer which
>     would solve the accessor issue.
> 
>     On Wed, Oct 31, 2018 at 9:50 AM Maximilian Michels <mxm@apache.org
>     <ma...@apache.org>> wrote:
> 
>         Hi,
> 
>         I have a question regarding user state during timer callback in
>         the FnApiDoFnRunner (Java SDK Harness).
> 
>         I've started implementing Timers for the portable Flink Runner.
>         I can register a timer via the timer output collection and fire
>         the timer via the timer input of the SDK Harness. But when I try
>         to access state in the Timer callback, I get the exception below.
> 
>         Is this a bug or if not, how is the timer's key supposed to be
>         set? I assume that it should be set from the timer element which
>         contains the key.
> 
>         Thanks,
>         Max
> 
> 
>         Caused by: java.util.concurrent.ExecutionException:
>         java.lang.RuntimeException: Error received from SDK harness for
>         instruction 72: java.util.concurrent.ExecutionException:
>         java.lang.NullPointerException
>                  at
>         java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>                  at
>         java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>                  at
>         org.apache.beam.sdk.fn.data.CompletableFutureInboundDataClient.awaitCompletion(CompletableFutureInboundDataClient.java:49)
>                  at
>         org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.awaitCompletion(BeamFnDataInboundObserver.java:90)
>                  at
>         org.apache.beam.fn.harness.BeamFnDataReadRunner.blockTillReadFinishes(BeamFnDataReadRunner.java:185)
>                  at
>         org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:292)
>                  at
>         org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:161)
>                  at
>         org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:145)
>                  at
>         java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>                  at
>         java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>                  at java.lang.Thread.run(Thread.java:748)
>         Caused by: java.lang.NullPointerException
>                  at
>         org.apache.beam.model.fnexecution.v1.BeamFnApi$StateKey$BagUserState$Builder.setKey(BeamFnApi.java:49694)
>                  at
>         org.apache.beam.fn.harness.state.FnApiStateAccessor.createBagUserStateKey(FnApiStateAccessor.java:451)
>                  at
>         org.apache.beam.fn.harness.state.FnApiStateAccessor.bindBag(FnApiStateAccessor.java:244)
>                  at
>         org.apache.beam.sdk.state.StateSpecs$BagStateSpec.bind(StateSpecs.java:487)
>                  at
>         org.apache.beam.sdk.state.StateSpecs$BagStateSpec.bind(StateSpecs.java:477)
>                  at
>         org.apache.beam.fn.harness.FnApiDoFnRunner$OnTimerContext.state(FnApiDoFnRunner.java:671)
>                  at
>         StateTest$5$OnTimerInvoker$expiry$ZXhwaXJ5.invokeOnTimer(Unknown
>         Source)
>                  at
>         org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DoFnInvokerBase.invokeOnTimer(ByteBuddyDoFnInvokerFactory.java:187)
>                  at
>         org.apache.beam.fn.harness.FnApiDoFnRunner.processTimer(FnApiDoFnRunner.java:244)
>                  at
>         org.apache.beam.fn.harness.DoFnPTransformRunnerFactory.lambda$createRunnerForPTransform$0(DoFnPTransformRunnerFactory.java:134)
>                  at
>         org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:81)
>                  at
>         org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:32)
>                  at
>         org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:139)
>                  at
>         org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:125)
>                  at
>         org.apache.beam.sdk.fn.stream.ForwardingClientResponseObserver.onNext(ForwardingClientResponseObserver.java:50)
>                  at
>         org.apache.beam.vendor.grpc.v1.io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onMessage(ClientCalls.java:407)
>                  at
>         org.apache.beam.vendor.grpc.v1.io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:33)
>                  at
>         org.apache.beam.vendor.grpc.v1.io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:33)
>                  at
>         org.apache.beam.vendor.grpc.v1.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:519)
>                  at
>         org.apache.beam.vendor.grpc.v1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>                  at
>         org.apache.beam.vendor.grpc.v1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
> 
> 

Re: Accessing keyed state in portable timer callbacks

Posted by Lukasz Cwik <lc...@google.com>.
Yes, I was hoping to merge the logic within the
SplittableProcessElementsRunner back into the FnApiDoFnRunner since there
is some duplication and it is the original reason why FnApiStateAccessor
exists. Previously we just referenced the correct object directly within
the class based upon the current context
(StartBundle/FinishBundle/ProcessElement/OnTimer).

On Thu, Nov 1, 2018 at 2:43 AM Maximilian Michels <mx...@apache.org> wrote:

> Hi Lukasz,
>
> Thanks for promptly fixing this [1]. I saw that the current element was
> not set correctly when timers are processed, but wanted to make sure any
> changes would be aligned with the harness processing model.
>
> I think I favor the currentElementOrTimer approach because it makes
> things more explicit, but the solution is fine for now.
>
> Thanks,
> Max
>
> [1] https://github.com/apache/beam/pull/6902
>
> On 31.10.18 19:09, Lukasz Cwik wrote:
> > I filed https://issues.apache.org/jira/browse/BEAM-5930.
> >
> > On Wed, Oct 31, 2018 at 10:22 AM Lukasz Cwik <lcwik@google.com
> > <ma...@google.com>> wrote:
> >
> >     That looks like a bug in the FnApiDoFnRunner.java
> >
> >     The FnApiStateAccessor is given a callback to get the current
> >     element and it is not handling the case where the current element is
> >     a timer.
> >
> >     callback:
> >
> https://github.com/apache/beam/blob/29c443162a2fe4c89d26336b30aa6e3a3bfbade8/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L212
> >     where the current "element" gets set:
> >
> https://github.com/apache/beam/blob/29c443162a2fe4c89d26336b30aa6e3a3bfbade8/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L220
> >     where the current "timer" gets set:
> >
> https://github.com/apache/beam/blob/29c443162a2fe4c89d26336b30aa6e3a3bfbade8/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L237
> >
> >     The easiest fix would be to have the callback return the first non
> >     null from currentElement/currentTimer but longer term I think we'll
> >     want a different solution. Alternatively, we could collapse
> >     currentElement and currentTimer to be currentElementOrTimer which
> >     would solve the accessor issue.
> >
> >     On Wed, Oct 31, 2018 at 9:50 AM Maximilian Michels <mxm@apache.org
> >     <ma...@apache.org>> wrote:
> >
> >         Hi,
> >
> >         I have a question regarding user state during timer callback in
> >         the FnApiDoFnRunner (Java SDK Harness).
> >
> >         I've started implementing Timers for the portable Flink Runner.
> >         I can register a timer via the timer output collection and fire
> >         the timer via the timer input of the SDK Harness. But when I try
> >         to access state in the Timer callback, I get the exception below.
> >
> >         Is this a bug or if not, how is the timer's key supposed to be
> >         set? I assume that it should be set from the timer element which
> >         contains the key.
> >
> >         Thanks,
> >         Max
> >
> >
> >         Caused by: java.util.concurrent.ExecutionException:
> >         java.lang.RuntimeException: Error received from SDK harness for
> >         instruction 72: java.util.concurrent.ExecutionException:
> >         java.lang.NullPointerException
> >                  at
> >
>  java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> >                  at
> >
>  java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> >                  at
> >
>  org.apache.beam.sdk.fn.data.CompletableFutureInboundDataClient.awaitCompletion(CompletableFutureInboundDataClient.java:49)
> >                  at
> >
>  org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.awaitCompletion(BeamFnDataInboundObserver.java:90)
> >                  at
> >
>  org.apache.beam.fn.harness.BeamFnDataReadRunner.blockTillReadFinishes(BeamFnDataReadRunner.java:185)
> >                  at
> >
>  org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:292)
> >                  at
> >
>  org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:161)
> >                  at
> >
>  org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:145)
> >                  at
> >
>  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >                  at
> >
>  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >                  at java.lang.Thread.run(Thread.java:748)
> >         Caused by: java.lang.NullPointerException
> >                  at
> >
>  org.apache.beam.model.fnexecution.v1.BeamFnApi$StateKey$BagUserState$Builder.setKey(BeamFnApi.java:49694)
> >                  at
> >
>  org.apache.beam.fn.harness.state.FnApiStateAccessor.createBagUserStateKey(FnApiStateAccessor.java:451)
> >                  at
> >
>  org.apache.beam.fn.harness.state.FnApiStateAccessor.bindBag(FnApiStateAccessor.java:244)
> >                  at
> >
>  org.apache.beam.sdk.state.StateSpecs$BagStateSpec.bind(StateSpecs.java:487)
> >                  at
> >
>  org.apache.beam.sdk.state.StateSpecs$BagStateSpec.bind(StateSpecs.java:477)
> >                  at
> >
>  org.apache.beam.fn.harness.FnApiDoFnRunner$OnTimerContext.state(FnApiDoFnRunner.java:671)
> >                  at
> >         StateTest$5$OnTimerInvoker$expiry$ZXhwaXJ5.invokeOnTimer(Unknown
> >         Source)
> >                  at
> >
>  org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DoFnInvokerBase.invokeOnTimer(ByteBuddyDoFnInvokerFactory.java:187)
> >                  at
> >
>  org.apache.beam.fn.harness.FnApiDoFnRunner.processTimer(FnApiDoFnRunner.java:244)
> >                  at
> >
>  org.apache.beam.fn.harness.DoFnPTransformRunnerFactory.lambda$createRunnerForPTransform$0(DoFnPTransformRunnerFactory.java:134)
> >                  at
> >
>  org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:81)
> >                  at
> >
>  org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:32)
> >                  at
> >
>  org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:139)
> >                  at
> >
>  org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:125)
> >                  at
> >
>  org.apache.beam.sdk.fn.stream.ForwardingClientResponseObserver.onNext(ForwardingClientResponseObserver.java:50)
> >                  at
> >
>  org.apache.beam.vendor.grpc.v1.io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onMessage(ClientCalls.java:407)
> >                  at
> >
>  org.apache.beam.vendor.grpc.v1.io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:33)
> >                  at
> >
>  org.apache.beam.vendor.grpc.v1.io.grpc.ForwardingClientCallListener.onMessage(ForwardingClientCallListener.java:33)
> >                  at
> >
>  org.apache.beam.vendor.grpc.v1.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1MessagesAvailable.runInContext(ClientCallImpl.java:519)
> >                  at
> >
>  org.apache.beam.vendor.grpc.v1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
> >                  at
> >
>  org.apache.beam.vendor.grpc.v1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
> >
> >
>