You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Maximilian Michels (Jira)" <ji...@apache.org> on 2019/11/05 15:57:00 UTC

[jira] [Reopened] (BEAM-8157) Key encoding for state requests is not consistent across SDKs

     [ https://issues.apache.org/jira/browse/BEAM-8157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Maximilian Michels reopened BEAM-8157:
--------------------------------------

Reopening. Unfortunately, this is still an issue and setting the encoding scheme to be NESTED by default, did not cure it. I'm getting the following when Python's {{FastPrimitivesCoder}} has been used for encoding a key for a state request:

{noformat}
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction 4: Traceback (most recent call last):
  File "/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 168, in _execute
    response = task()
  File "/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 201, in <lambda>
    self._execute(lambda: worker.do_instruction(work), work)
  File "/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 356, in do_instruction
    request.instruction_id)
  File "/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 382, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 667, in process_bundle
    data.ptransform_id].process_encoded(data.data)
  File "/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 143, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 255, in apache_beam.runners.worker.operations.Operation.output
    def output(self, windowed_value, output_index=0):
  File "apache_beam/runners/worker/operations.py", line 256, in apache_beam.runners.worker.operations.Operation.output
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 143, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
    self.consumer.process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 593, in apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 594, in apache_beam.runners.worker.operations.DoOperation.process
    delayed_application = self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 776, in apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 782, in apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 849, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise_with_traceback(new_exn)
  File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
    return self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.PerWindowInvoker.invoke_process
    self._invoke_process_per_window(
  File "apache_beam/runners/common.py", line 659, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
    output_processor.process_outputs(
  File "apache_beam/runners/common.py", line 880, in apache_beam.runners.common._OutputProcessor.process_outputs
    def process_outputs(self, windowed_input_element, results):
  File "apache_beam/runners/common.py", line 895, in apache_beam.runners.common._OutputProcessor.process_outputs
    for result in results:
  File "pricingrealtime/event_processing/stateful_event_processing.py", line 55, in process
    recent_events_map = StatefulEventDoFn._load_recent_events_map(recent_events_state)
  File "pricingrealtime/event_processing/stateful_event_processing.py", line 127, in _load_recent_events_map
    items_in_recent_events_bag = [e for e in recent_events_state.read()]
  File "/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__
    for elem in self.first:
  File "/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 723, in _materialize_iter
    self._underlying.get_raw(state_key, continuation_token)
  File "/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw
    continuation_token=continuation_token)))
  File "/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 637, in _blocking_request
    raise RuntimeError(response.error)
RuntimeError: java.lang.IllegalStateException: The current key '[1, -104, -97, -93, -34, -73, -128, -42, 36]' with key group index '274' does not belong to the key group range 'KeyGroupRange{startKeyGroup=153, endKeyGroup=154}'. Runner KeyCoder: LengthPrefixCoder(ByteArrayCoder). Ptransformid: ref_AppliedPTransform_process_events_with_stateful_dofn_23 Userstateid: recent_events
	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:531)
	at org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$BagUserStateFactory$1.prepareStateBackend(ExecutableStageDoFnOperator.java:387)
	at org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$BagUserStateFactory$1.get(ExecutableStageDoFnOperator.java:309)
	at org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$BagUserStateFactory$1.get(ExecutableStageDoFnOperator.java:303)
	at org.apache.beam.runners.fnexecution.state.StateRequestHandlers$ByteStringStateRequestHandlerToBagUserStateHandlerFactoryAdapter.handleGetRequest(StateRequestHandlers.java:468)
	at org.apache.beam.runners.fnexecution.state.StateRequestHandlers$ByteStringStateRequestHandlerToBagUserStateHandlerFactoryAdapter.handle(StateRequestHandlers.java:415)
	at org.apache.beam.runners.fnexecution.state.StateRequestHandlers$StateKeyTypeDelegatingStateRequestHandler.handle(StateRequestHandlers.java:206)
	at org.apache.beam.runners.fnexecution.state.GrpcStateService$Inbound.onNext(GrpcStateService.java:130)
	at org.apache.beam.runners.fnexecution.state.GrpcStateService$Inbound.onNext(GrpcStateService.java:118)
	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:249)
	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:297)
	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:738)
	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
 [while running 'process_events_with_stateful_dofn']
{noformat}

As long as we use standard coders, these errors do not occur. When an SDK-specific coder is used, then the nested encoding does not work properly in Python, i.e. Python does not add a length prefix but the Runner side has a {{LengthPrefixCoder(ByteArrayCoder)}}. The only way to fix this, is to remove the length prefix coder on the Runner side and use OUTER encoding for the key in state requests. This gives correct results for known and unknown coders.

> Key encoding for state requests is not consistent across SDKs
> -------------------------------------------------------------
>
>                 Key: BEAM-8157
>                 URL: https://issues.apache.org/jira/browse/BEAM-8157
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>    Affects Versions: 2.13.0
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>            Priority: Major
>             Fix For: 2.17.0
>
>          Time Spent: 6.5h
>  Remaining Estimate: 0h
>
> The Flink runner requires the internal key to be encoded without a length prefix (OUTER context). The user state request handler exposes a serialized version of the key to the Runner. This key is encoded with the NESTED context which may add a length prefix. We need to convert it to OUTER context to match the Flink runner's key encoding.
> So far this has not caused the Flink Runner to behave incorrectly. However, with the upcoming support for Flink 1.9, the state backend will not accept requests for keys not part of any key group/partition of the operator. This is very likely to happen with the encoding not being consistent.
> **NOTE** This is only applicable to the Java SDK, as the Python SDK uses OUTER encoding for the key in state requests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)