You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2019/11/05 16:59:00 UTC

[jira] [Work logged] (BEAM-8157) Key encoding for state requests is not consistent across SDKs

     [ https://issues.apache.org/jira/browse/BEAM-8157?focusedWorklogId=338866&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-338866 ]

ASF GitHub Bot logged work on BEAM-8157:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 05/Nov/19 16:58
            Start Date: 05/Nov/19 16:58
    Worklog Time Spent: 10m 
      Work Description: mxm commented on pull request #9997: [BEAM-8157] Revert key encoding changes for state requests / improve debugging and testing
URL: https://github.com/apache/beam/pull/9997
 
 
   Without this patch, I'm getting the following when Python's FastPrimitivesCoder has been used for encoding a key for a state request:
   
   ```
   Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction 4: Traceback (most recent call last):
     File "/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 168, in _execute
       response = task()
     File "/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 201, in <lambda>
       self._execute(lambda: worker.do_instruction(work), work)
     File "/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 356, in do_instruction
       request.instruction_id)
     File "/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 382, in process_bundle
       bundle_processor.process_bundle(instruction_id))
     File "/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 667, in process_bundle
       data.ptransform_id].process_encoded(data.data)
     File "/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 143, in process_encoded
       self.output(decoded_value)
     File "apache_beam/runners/worker/operations.py", line 255, in apache_beam.runners.worker.operations.Operation.output
       def output(self, windowed_value, output_index=0):
     File "apache_beam/runners/worker/operations.py", line 256, in apache_beam.runners.worker.operations.Operation.output
       cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
     File "apache_beam/runners/worker/operations.py", line 143, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
       self.consumer.process(windowed_value)
     File "apache_beam/runners/worker/operations.py", line 593, in apache_beam.runners.worker.operations.DoOperation.process
       with self.scoped_process_state:
     File "apache_beam/runners/worker/operations.py", line 594, in apache_beam.runners.worker.operations.DoOperation.process
       delayed_application = self.dofn_receiver.receive(o)
     File "apache_beam/runners/common.py", line 776, in apache_beam.runners.common.DoFnRunner.receive
       self.process(windowed_value)
     File "apache_beam/runners/common.py", line 782, in apache_beam.runners.common.DoFnRunner.process
       self._reraise_augmented(exn)
     File "apache_beam/runners/common.py", line 849, in apache_beam.runners.common.DoFnRunner._reraise_augmented
       raise_with_traceback(new_exn)
     File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
       return self.do_fn_invoker.invoke_process(windowed_value)
     File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.PerWindowInvoker.invoke_process
       self._invoke_process_per_window(
     File "apache_beam/runners/common.py", line 659, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
       output_processor.process_outputs(
     File "apache_beam/runners/common.py", line 880, in apache_beam.runners.common._OutputProcessor.process_outputs
       def process_outputs(self, windowed_input_element, results):
     File "apache_beam/runners/common.py", line 895, in apache_beam.runners.common._OutputProcessor.process_outputs
       for result in results:
     File "pricingrealtime/event_processing/stateful_event_processing.py", line 55, in process
       recent_events_map = StatefulEventDoFn._load_recent_events_map(recent_events_state)
     File "pricingrealtime/event_processing/stateful_event_processing.py", line 127, in _load_recent_events_map
       items_in_recent_events_bag = [e for e in recent_events_state.read()]
     File "/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 335, in __iter__
       for elem in self.first:
     File "/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 723, in _materialize_iter
       self._underlying.get_raw(state_key, continuation_token)
     File "/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 603, in get_raw
       continuation_token=continuation_token)))
     File "/srv/venvs/service/trusty/service_venv/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 637, in _blocking_request
       raise RuntimeError(response.error)
   RuntimeError: java.lang.IllegalStateException: The current key '[1, -104, -97, -93, -34, -73, -128, -42, 36]' with key group index '274' does not belong to the key group range 'KeyGroupRange{startKeyGroup=153, endKeyGroup=154}'. Runner KeyCoder: LengthPrefixCoder(ByteArrayCoder). Ptransformid: ref_AppliedPTransform_process_events_with_stateful_dofn_23 Userstateid: recent_events
   	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:531)
   	at org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$BagUserStateFactory$1.prepareStateBackend(ExecutableStageDoFnOperator.java:387)
   	at org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$BagUserStateFactory$1.get(ExecutableStageDoFnOperator.java:309)
   	at org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator$BagUserStateFactory$1.get(ExecutableStageDoFnOperator.java:303)
   	at org.apache.beam.runners.fnexecution.state.StateRequestHandlers$ByteStringStateRequestHandlerToBagUserStateHandlerFactoryAdapter.handleGetRequest(StateRequestHandlers.java:468)
   	at org.apache.beam.runners.fnexecution.state.StateRequestHandlers$ByteStringStateRequestHandlerToBagUserStateHandlerFactoryAdapter.handle(StateRequestHandlers.java:415)
   	at org.apache.beam.runners.fnexecution.state.StateRequestHandlers$StateKeyTypeDelegatingStateRequestHandler.handle(StateRequestHandlers.java:206)
   	at org.apache.beam.runners.fnexecution.state.GrpcStateService$Inbound.onNext(GrpcStateService.java:130)
   	at org.apache.beam.runners.fnexecution.state.GrpcStateService$Inbound.onNext(GrpcStateService.java:118)
   	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:249)
   	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
   	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
   	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:297)
   	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:738)
   	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
   	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at java.lang.Thread.run(Thread.java:748)
    [while running 'process_events_with_stateful_dofn']
   ```
   
   As long as we use standard coders, these errors do not occur. When an
   SDK-specific coder is used, then the nested encoding does not work properly in
   Python, i.e. Python does not add a length prefix but the Runner side has a
   LengthPrefixCoder(ByteArrayCoder). The only way to fix this, is to remove the
   length prefix coder on the Runner side and use OUTER encoding for the key in
   state requests. This gives correct results for known and unknown coders.
   
   Effectively reverts #9484.
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 338866)
    Time Spent: 6h 40m  (was: 6.5h)

> Key encoding for state requests is not consistent across SDKs
> -------------------------------------------------------------
>
>                 Key: BEAM-8157
>                 URL: https://issues.apache.org/jira/browse/BEAM-8157
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>    Affects Versions: 2.13.0
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>            Priority: Critical
>             Fix For: 2.17.0
>
>          Time Spent: 6h 40m
>  Remaining Estimate: 0h
>
> The Flink runner requires the internal key to be encoded without a length prefix (OUTER context). The user state request handler exposes a serialized version of the key to the Runner. This key is encoded with the NESTED context which may add a length prefix. We need to convert it to OUTER context to match the Flink runner's key encoding.
> So far this has not caused the Flink Runner to behave incorrectly. However, with the upcoming support for Flink 1.9, the state backend will not accept requests for keys not part of any key group/partition of the operator. This is very likely to happen with the encoding not being consistent.
> **NOTE** This is only applicable to the Java SDK, as the Python SDK uses OUTER encoding for the key in state requests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)