You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Dian Fu (Jira)" <ji...@apache.org> on 2023/02/13 11:51:00 UTC

[jira] [Created] (FLINK-31043) KeyError exception is thrown in CachedMapState

Dian Fu created FLINK-31043:
-------------------------------

             Summary: KeyError exception is thrown in CachedMapState
                 Key: FLINK-31043
                 URL: https://issues.apache.org/jira/browse/FLINK-31043
             Project: Flink
          Issue Type: Improvement
          Components: API / Python
    Affects Versions: 1.15.3
            Reporter: Dian Fu


Have seen the following exception in a PyFlink job which runs in Flink 1.15. It happens occasionally and may indicate a bug of the state cache of MapState:
{code:java}
org.apache.flink.runtime.taskmanager.AsynchronousException: Caught exception while processing timer.
	at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1875)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1846)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:2010)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$21(StreamTask.java:1999)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
	at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:1079)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:1028)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:954)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:933)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
	at java.lang.Thread.run(Thread.java:834)
Caused by: TimerException{java.lang.RuntimeException: Error while waiting for BeamPythonFunctionRunner flush}
	... 14 more
Caused by: java.lang.RuntimeException: Error while waiting for BeamPythonFunctionRunner flush
	at org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:106)
	at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:299)
	at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:115)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:2008)
	... 13 more
Caused by: java.lang.RuntimeException: Failed to close remote bundle
	at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:387)
	at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:371)
	at org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator.lambda$invokeFinishBundle$0(AbstractExternalPythonFunctionOperator.java:85)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)
	... 1 more
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction 131: Traceback (most recent call last):
  File "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute
    response = task()
  File "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 607, in do_instruction
    getattr(request, request_type), request.instruction_id)
  File "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1000, in process_bundle
    element.data)
  File "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 228, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 158, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 170, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
  File "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/table/operations.py", line 417, in finish_bundle
    return self.group_agg_function.finish_bundle()
  File "pyflink/fn_execution/table/aggregate_fast.pyx", line 597, in pyflink.fn_execution.table.aggregate_fast.GroupTableAggFunction.finish_bundle
  File "pyflink/fn_execution/table/aggregate_fast.pyx", line 652, in pyflink.fn_execution.table.aggregate_fast.GroupTableAggFunction.finish_bundle
  File "pyflink/fn_execution/table/aggregate_fast.pyx", line 389, in pyflink.fn_execution.table.aggregate_fast.SimpleTableAggsHandleFunction.emit_value
  File "/tmp/pyflink/17360444-8c0b-46a5-90a4-689c376ea4ed/0e2967b5-181c-4663-bd7a-267d47509cf5/whms_dws_stock_python_sps_1_output.py", line 29, in emit_value
  File "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/table/state_data_view.py", line 147, in get
    return self._map_state.get(key)
  File "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/state_impl.py", line 915, in get
    return self.get_internal_state().get(key)
  File "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/state_impl.py", line 773, in get
    self._state_key, map_key, self._map_key_encoder, self._map_value_decoder)
  File "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/state_impl.py", line 418, in blocking_get
    cached_map_state.put(map_key, (exists, value))
  File "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/state_impl.py", line 319, in put
    super(CachedMapState, self).put(key, exists_and_value)
  File "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/state_impl.py", line 68, in put
    self._on_evict(name, value)
  File "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/state_impl.py", line 305, in on_evict
    self._cached_keys.remove(key)
KeyError: 'SPAREPARTS_M11F010L4L1_01'

	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
	at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
	at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
	at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:555)
	at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:385)
	... 7 more
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 131: Traceback (most recent call last):
  File "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute
    response = task()
  File "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 607, in do_instruction
    getattr(request, request_type), request.instruction_id)
  File "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1000, in process_bundle
    element.data)
  File "/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 228, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 357, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 359, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 221, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 158, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 170, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
  File "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/table/operations.py", line 417, in finish_bundle
    return self.group_agg_function.finish_bundle()
  File "pyflink/fn_execution/table/aggregate_fast.pyx", line 597, in pyflink.fn_execution.table.aggregate_fast.GroupTableAggFunction.finish_bundle
  File "pyflink/fn_execution/table/aggregate_fast.pyx", line 652, in pyflink.fn_execution.table.aggregate_fast.GroupTableAggFunction.finish_bundle
  File "pyflink/fn_execution/table/aggregate_fast.pyx", line 389, in pyflink.fn_execution.table.aggregate_fast.SimpleTableAggsHandleFunction.emit_value
  File "/tmp/pyflink/17360444-8c0b-46a5-90a4-689c376ea4ed/0e2967b5-181c-4663-bd7a-267d47509cf5/whms_dws_stock_python_sps_1_output.py", line 29, in emit_value
  File "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/table/state_data_view.py", line 147, in get
    return self._map_state.get(key)
  File "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/state_impl.py", line 915, in get
    return self.get_internal_state().get(key)
  File "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/state_impl.py", line 773, in get
    self._state_key, map_key, self._map_key_encoder, self._map_value_decoder)
  File "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/state_impl.py", line 418, in blocking_get
    cached_map_state.put(map_key, (exists, value))
  File "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/state_impl.py", line 319, in put
    super(CachedMapState, self).put(key, exists_and_value)
  File "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/state_impl.py", line 68, in put
    self._on_evict(name, value)
  File "/usr/local/python3/lib/python3.7/site-packages/pyflink/fn_execution/state_impl.py", line 305, in on_evict
    self._cached_keys.remove(key)
KeyError: 'SPAREPARTS_M11F010L4L1_01'

	at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:180)
	at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:160)
	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)
	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)
	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
	... 3 more
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)