You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2019/09/09 02:32:00 UTC

[jira] [Work logged] (BEAM-8157) [Flink Runner 1.9] State requests return wrong state in timers when encoded key is length-prefixed

     [ https://issues.apache.org/jira/browse/BEAM-8157?focusedWorklogId=308580&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-308580 ]

ASF GitHub Bot logged work on BEAM-8157:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Sep/19 02:31
            Start Date: 09/Sep/19 02:31
    Worklog Time Spent: 10m 
      Work Description: tweise commented on issue #9484: [BEAM-8157] Remove length prefix from state key for Flink's state backend
URL: https://github.com/apache/beam/pull/9484#issuecomment-529277954
 
 
   This actually fails the test that I had provided in the gist.
   ```
   RuntimeError: java.lang.IllegalArgumentException: Forbidden IOException when reading from InputStream
           at org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:118)
           at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray(CoderUtils.java:98)
           at org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkKeyUtils.removeNestedContext(FlinkKeyUtils.java:81)
           at org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$BagUserStateFactory$1.prepareStateBackend(ExecutableStageDoFnOperator.java:351)
           at org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$BagUserStateFactory$1.get(ExecutableStageDoFnOperator.java:283)
           at org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$BagUserStateFactory$1.get(ExecutableStageDoFnOperator.java:278)
           at org.apache.beam.runners.fnexecution.state.StateRequestHandlers$ByteStringStateRequestHandlerToBagUserStateHandlerFactoryAdapter.handleGetRequest(StateRequestHandlers.java:435)
           at org.apache.beam.runners.fnexecution.state.StateRequestHandlers$ByteStringStateRequestHandlerToBagUserStateHandlerFactoryAdapter.handle(StateRequestHandlers.java:400)
           at org.apache.beam.runners.fnexecution.state.StateRequestHandlers$StateKeyTypeDelegatingStateRequestHandler.handle(StateRequestHandlers.java:205)
           at org.apache.beam.runners.fnexecution.state.GrpcStateService$Inbound.onNext(GrpcStateService.java:130)
           at org.apache.beam.runners.fnexecution.state.GrpcStateService$Inbound.onNext(GrpcStateService.java:118)
           at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:249)
           at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
           at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
           at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:297)
           at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:738)
           at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
           at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
           at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:748)
   Caused by: java.io.EOFException: reached end of stream after reading 1 bytes; 107 bytes expected
           at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams.readFully(ByteStreams.java:780)
           at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams.readFully(ByteStreams.java:762)
           at org.apache.beam.sdk.coders.ByteArrayCoder.decode(ByteArrayCoder.java:108)
           at org.apache.beam.sdk.coders.ByteArrayCoder.decode(ByteArrayCoder.java:41)
           at org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream(CoderUtils.java:115)
   ```
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 308580)
    Time Spent: 3h 10m  (was: 3h)

> [Flink Runner 1.9] State requests return wrong state in timers when encoded key is length-prefixed
> --------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-8157
>                 URL: https://issues.apache.org/jira/browse/BEAM-8157
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>    Affects Versions: 2.13.0
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>            Priority: Major
>             Fix For: 2.16.0
>
>          Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> The Flink runner requires the internal key to be encoded without a length prefix (OUTER context). The user state request handler exposes a serialized version of the key to the Runner. This key is encoded with the NESTED context which may add a length prefix. We need to convert it to OUTER context to match the Flink runner's key encoding.
> So far this has not caused the Flink Runner to behave incorrectly. However, with the upcoming support for Flink 1.9, the state backend will not accept requests for keys not part of any key group/partition of the operator. This is very likely to happen with the encoding not being consistent.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)