You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Curt Buechter <tr...@gmail.com> on 2021/09/22 16:29:26 UTC
pyflink keyed stream checkpoint error
Hi,
I'm getting an error after enabling checkpointing in my pyflink application
that uses a keyed stream and rocksdb state.
Here is the error message:
2021-09-22 16:18:14,408 INFO
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend [] -
Closed RocksDB State Backend. Cleaning up RocksDB working directory
/mnt/yarn/usercache/hadoop/appcache/application_1631886817823_0058/flink-io-db923140-fce6-433e-b26b-b7d53afbc38f/job_383b888cc0d49bbd7464906e45b88187_op_PythonKeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__uuid_16eefb37-8bae-43ef-b680-d6a4f9660c39.
2021-09-22 16:18:14,409 WARN org.apache.flink.runtime.taskmanager.Task [] -
KEYED PROCESS -> Sink: Unnamed (1/1)#34 (8f4fd40e863dd058822060dc3cf98831)
switched from RUNNING to FAILED with failure cause: java.io.IOException:
Could not perform checkpoint 2 for operator KEYED PROCESS -> Sink: Unnamed
(1/1)#34.
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1045)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431)
at
org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61)
at
org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
at
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Failed to close remote bundle
at
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:383)
at
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:331)
at
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:320)
at
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.prepareSnapshotPreBarrier(AbstractPythonFunctionOperator.java:175)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.prepareSnapshotPreBarrier(OperatorChain.java:415)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:292)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1089)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1073)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1029)
... 19 more
Caused by: java.util.concurrent.ExecutionException:
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException:
CANCELLED: cancelled before receiving half close
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
at
org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:555)
at
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:381)
... 28 more
Caused by:
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException:
CANCELLED: cancelled before receiving half close
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status.asRuntimeException(Status.java:524)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:275)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:353)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:341)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:867)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
2021-09-22 16:18:14,410 INFO org.apache.flink.runtime.taskmanager.Task [] -
Freeing task resources for KEYED PROCESS -> Sink: Unnamed (1/1)#34
(8f4fd40e863dd058822060dc3cf98831).
2021-09-22 16:18:14,411 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task
and sending final execution state FAILED to JobManager for task KEYED
PROCESS -> Sink: Unnamed (1/1)#34 8f4fd40e863dd058822060dc3cf98831.
2021-09-22 16:18:14,426 INFO org.apache.flink.runtime.taskmanager.Task [] -
Attempting to cancel task Source: Custom Source ->
_stream_key_by_map_operator (1/1)#34 (290905523bfbf344381b7a04a8d36831).
2021-09-22 16:18:14,426 INFO org.apache.flink.runtime.taskmanager.Task [] -
Source: Custom Source -> _stream_key_by_map_operator (1/1)#34
(290905523bfbf344381b7a04a8d36831) switched from RUNNING to CANCELING.
2021-09-22 16:18:14,426 INFO org.apache.flink.runtime.taskmanager.Task [] -
Triggering cancellation of task code Source: Custom Source ->
_stream_key_by_map_operator (1/1)#34 (290905523bfbf344381b7a04a8d36831).
2021-09-22 16:18:14,428 INFO org.apache.flink.runtime.taskmanager.Task [] -
Ignoring checkpoint aborted notification for non-running task Source:
Custom Source -> _stream_key_by_map_operator (1/1)#34.
2021-09-22 16:18:14,430 INFO org.apache.kafka.common.metrics.Metrics [] -
Metrics scheduler closed
2021-09-22 16:18:14,430 INFO org.apache.kafka.common.metrics.Metrics [] -
Closing reporter org.apache.kafka.common.metrics.JmxReporter
2021-09-22 16:18:14,430 INFO org.apache.kafka.common.metrics.Metrics [] -
Metrics reporters closed
And, here is some source code:
class DedupeFunction(FlatMapFunction):
def __init__(self, schema):
super().__init__()
self.logger = None
self.state = None
self.my_state = None
self.schema = schema
self.metric_columns = [c.column_name for c in schema.columns
if c.is_metric]
def open(self, runtime_context: RuntimeContext):
self.logger = logging
self.logger.info('Opening the FlatMapFunction')
descriptor = MapStateDescriptor("my_map_state_descriptor",
Types.PICKLED_BYTE_ARRAY(), Types.PICKLED_BYTE_ARRAY())
self.state = runtime_context.get_map_state(descriptor)
def flat_map(self, value):
try:
if not self.state.is_empty():
# self.logger.info('key in state')
previous_dict = {}
for k, v in self.state.items():
# reverse the metric columns
if k in self.metric_columns:
if v:
v = -v
previous_dict[k] = v
yield Row(**previous_dict)
new_dict = value.as_dict()
self.state.put_all(new_dict.items())
yield value
except Exception as ex:
self.logger.error(f'ERROR in dedupe_datastream: {str(ex)}')
class PrimaryKeySelector(KeySelector):
def __init__(self, primary_key):
self.__primary_key__ = primary_key
def get_key(self, kv_obj):
return kv_obj.as_dict().get(self.__primary_key__)
backend = RocksDBStateBackend(self.__conf__.get('CHECKPOINT_DIR'))
self.__env__.set_state_backend(backend)
input_ds = input_ds.key_by(PrimaryKeySelector(self.__primary_key__),
key_type_info=primary_key_type_info)
deduped_ds = input_ds.flat_map(DedupeFunction(self.__schema__),
output_type=type_info)
This program works fine if checkpointing is not enabled. Any advice here?
Thanks
Re: pyflink keyed stream checkpoint error
Posted by Dian Fu <di...@gmail.com>.
Hi Curt,
Could you try if it works by reducing python.fn-execution.bundle.size to
1000 or 100?
Regards,
Dian
On Thu, Oct 14, 2021 at 2:47 AM Curt Buechter <tr...@gmail.com> wrote:
> Hi guys,
> I'm still running into this problem. I checked the logs, and there is no
> evidence that the python process crashed. I checked the process IDs and
> they are still active after the error. No `killed process` messages in
> /var/log/messages.
>
> I don't think it's necessarily related to checkpointing. I noticed
> https://issues.apache.org/jira/browse/FLINK-24123 and thought it was
> possibly related. I tried upgrading to Flink 1.14.0, but get the (mostly)
> same error, but now the error happens outside the context of performing the
> checkpointing operation.
>
> I tried reducing python.fn-execution.bundle.size to 10,000 (default
> 100,000), and no luck there, either.
>
> 2021-10-13 13:39:19
> java.lang.RuntimeException: Error while waiting for
> BeamPythonFunctionRunner flush
> at org.apache.flink.streaming.api.operators.python.
> AbstractPythonFunctionOperator.invokeFinishBundle(
> AbstractPythonFunctionOperator.java:361)
> at org.apache.flink.streaming.api.operators.python.
> AbstractPythonFunctionOperator.checkInvokeFinishBundleByCount(
> AbstractPythonFunctionOperator.java:321)
> at org.apache.flink.streaming.api.operators.python.
> AbstractOneInputPythonFunctionOperator.processElement(
> AbstractOneInputPythonFunctionOperator.java:139)
> at org.apache.flink.streaming.api.operators.python.
> PythonKeyedProcessOperator.processElement(PythonKeyedProcessOperator.java:
> 176)
> at org.apache.flink.streaming.runtime.tasks.
> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
> .java:233)
> at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput
> .processElement(AbstractStreamTaskNetworkInput.java:134)
> at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput
> .emitNext(AbstractStreamTaskNetworkInput.java:105)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:65)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:496)
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:203)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
> StreamTask.java:809)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
> .java:761)
> at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(
> Task.java:958)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:
> 937)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Failed to close remote bundle
> at org.apache.flink.streaming.api.runners.python.beam.
> BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:377)
> at org.apache.flink.streaming.api.runners.python.beam.
> BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:361)
> at org.apache.flink.streaming.api.operators.python.
> AbstractPythonFunctionOperator.lambda$invokeFinishBundle$2(
> AbstractPythonFunctionOperator.java:340)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor
> .java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
> .java:624)
> ... 1 more
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException:
> CANCELLED: cancelled before receiving half close
> at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture
> .java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
> at org.apache.beam.runners.fnexecution.control.
> SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:
> 504)
> at org.apache.beam.runners.fnexecution.control.
> DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(
> DefaultJobBundleFactory.java:555)
> at org.apache.flink.streaming.api.runners.python.beam.
> BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:375)
> ... 7 more
> Caused by: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.
> StatusRuntimeException: CANCELLED: cancelled before receiving half close
> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status.asRuntimeException(
> Status.java:524)
> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.
> ServerCalls$StreamingServerCallHandler$StreamingServerCallListener
> .onCancel(ServerCalls.java:275)
> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.
> PartialForwardingServerCallListener.onCancel(
> PartialForwardingServerCallListener.java:40)
> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.
> ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23
> )
> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.
> ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(
> ForwardingServerCallListener.java:40)
> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.
> Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.
> ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl
> .java:353)
> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.
> ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:341)
> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.
> ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed
> .runInContext(ServerImpl.java:867)
> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable
> .run(ContextRunnable.java:37)
> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.
> SerializingExecutor.run(SerializingExecutor.java:123)
> ... 3 more
>
> On Thu, Sep 23, 2021 at 9:07 PM Dian Fu <di...@gmail.com> wrote:
>
>> I agree with Roman that it seems that the Python process has crashed.
>>
>> Besides the suggestions from Roman, I guess you could also try to
>> configure the bundle size to smaller value via
>> “python.fn-execution.bundle.size”.
>>
>> Regards,
>> Dian
>>
>> > 2021年9月24日 上午3:48,Roman Khachatryan <ro...@apache.org> 写道:
>> >
>> > Hi,
>> >
>> > Is it possible that the python process crashed or hung up? (probably
>> > performing a snapshot)
>> > Could you validate this by checking the OS logs for OOM killer
>> > messages or process status?
>> >
>> > Regards,
>> > Roman
>> >
>> > On Wed, Sep 22, 2021 at 6:30 PM Curt Buechter <tr...@gmail.com>
>> wrote:
>> >>
>> >> Hi,
>> >> I'm getting an error after enabling checkpointing in my pyflink
>> application that uses a keyed stream and rocksdb state.
>> >>
>> >> Here is the error message:
>> >>
>> >> 2021-09-22 16:18:14,408 INFO
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend [] -
>> Closed RocksDB State Backend. Cleaning up RocksDB working directory
>> /mnt/yarn/usercache/hadoop/appcache/application_1631886817823_0058/flink-io-db923140-fce6-433e-b26b-b7d53afbc38f/job_383b888cc0d49bbd7464906e45b88187_op_PythonKeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__uuid_16eefb37-8bae-43ef-b680-d6a4f9660c39.
>> >> 2021-09-22 16:18:14,409 WARN org.apache.flink.runtime.taskmanager.Task
>> [] - KEYED PROCESS -> Sink: Unnamed (1/1)#34
>> (8f4fd40e863dd058822060dc3cf98831) switched from RUNNING to FAILED with
>> failure cause: java.io.IOException: Could not perform checkpoint 2 for
>> operator KEYED PROCESS -> Sink: Unnamed (1/1)#34.
>> >> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1045)
>> >> at
>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135)
>> >> at
>> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250)
>> >> at
>> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61)
>> >> at
>> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431)
>> >> at
>> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61)
>> >> at
>> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227)
>> >> at
>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
>> >> at
>> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
>> >> at org.apache.flink.streaming.runtime.io
>> .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
>> >> at org.apache.flink.streaming.runtime.io
>> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
>> >> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
>> >> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
>> >> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
>> >> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
>> >> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>> >> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
>> >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
>> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>> >> at java.lang.Thread.run(Thread.java:748)
>> >> Caused by: java.lang.RuntimeException: Failed to close remote bundle
>> >> at
>> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:383)
>> >> at
>> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:331)
>> >> at
>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:320)
>> >> at
>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.prepareSnapshotPreBarrier(AbstractPythonFunctionOperator.java:175)
>> >> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.prepareSnapshotPreBarrier(OperatorChain.java:415)
>> >> at
>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:292)
>> >> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1089)
>> >> at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>> >> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1073)
>> >> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1029)
>> >> ... 19 more
>> >> Caused by: java.util.concurrent.ExecutionException:
>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException:
>> CANCELLED: cancelled before receiving half close
>> >> at
>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>> >> at
>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>> >> at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
>> >> at
>> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
>> >> at
>> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:555)
>> >> at
>> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:381)
>> >> ... 28 more
>> >> Caused by:
>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException:
>> CANCELLED: cancelled before receiving half close
>> >> at
>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status.asRuntimeException(Status.java:524)
>> >> at
>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:275)
>> >> at
>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
>> >> at
>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
>> >> at
>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
>> >> at
>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
>> >> at
>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:353)
>> >> at
>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:341)
>> >> at
>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:867)
>> >> at
>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>> >> at
>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>> >> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> >> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> >> ... 1 more
>> >>
>> >> 2021-09-22 16:18:14,410 INFO org.apache.flink.runtime.taskmanager.Task
>> [] - Freeing task resources for KEYED PROCESS -> Sink: Unnamed (1/1)#34
>> (8f4fd40e863dd058822060dc3cf98831).
>> >> 2021-09-22 16:18:14,411 INFO
>> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task
>> and sending final execution state FAILED to JobManager for task KEYED
>> PROCESS -> Sink: Unnamed (1/1)#34 8f4fd40e863dd058822060dc3cf98831.
>> >> 2021-09-22 16:18:14,426 INFO org.apache.flink.runtime.taskmanager.Task
>> [] - Attempting to cancel task Source: Custom Source ->
>> _stream_key_by_map_operator (1/1)#34 (290905523bfbf344381b7a04a8d36831).
>> >> 2021-09-22 16:18:14,426 INFO org.apache.flink.runtime.taskmanager.Task
>> [] - Source: Custom Source -> _stream_key_by_map_operator (1/1)#34
>> (290905523bfbf344381b7a04a8d36831) switched from RUNNING to CANCELING.
>> >> 2021-09-22 16:18:14,426 INFO org.apache.flink.runtime.taskmanager.Task
>> [] - Triggering cancellation of task code Source: Custom Source ->
>> _stream_key_by_map_operator (1/1)#34 (290905523bfbf344381b7a04a8d36831).
>> >> 2021-09-22 16:18:14,428 INFO org.apache.flink.runtime.taskmanager.Task
>> [] - Ignoring checkpoint aborted notification for non-running task Source:
>> Custom Source -> _stream_key_by_map_operator (1/1)#34.
>> >> 2021-09-22 16:18:14,430 INFO org.apache.kafka.common.metrics.Metrics
>> [] - Metrics scheduler closed
>> >> 2021-09-22 16:18:14,430 INFO org.apache.kafka.common.metrics.Metrics
>> [] - Closing reporter org.apache.kafka.common.metrics.JmxReporter
>> >> 2021-09-22 16:18:14,430 INFO org.apache.kafka.common.metrics.Metrics
>> [] - Metrics reporters closed
>> >>
>> >>
>> >> And, here is some source code:
>> >>
>> >> class DedupeFunction(FlatMapFunction):
>> >> def __init__(self, schema):
>> >> super().__init__()
>> >> self.logger = None
>> >> self.state = None
>> >> self.my_state = None
>> >> self.schema = schema
>> >> self.metric_columns = [c.column_name for c in schema.columns if
>> c.is_metric]
>> >>
>> >> def open(self, runtime_context: RuntimeContext):
>> >> self.logger = logging
>> >> self.logger.info('Opening the FlatMapFunction')
>> >> descriptor = MapStateDescriptor("my_map_state_descriptor",
>> Types.PICKLED_BYTE_ARRAY(), Types.PICKLED_BYTE_ARRAY())
>> >> self.state = runtime_context.get_map_state(descriptor)
>> >>
>> >> def flat_map(self, value):
>> >> try:
>> >> if not self.state.is_empty():
>> >> # self.logger.info('key in state')
>> >> previous_dict = {}
>> >> for k, v in self.state.items():
>> >> # reverse the metric columns
>> >> if k in self.metric_columns:
>> >> if v:
>> >> v = -v
>> >> previous_dict[k] = v
>> >> yield Row(**previous_dict)
>> >> new_dict = value.as_dict()
>> >> self.state.put_all(new_dict.items())
>> >> yield value
>> >> except Exception as ex:
>> >> self.logger.error(f'ERROR in dedupe_datastream: {str(ex)}')
>> >>
>> >> class PrimaryKeySelector(KeySelector):
>> >>
>> >> def __init__(self, primary_key):
>> >> self.__primary_key__ = primary_key
>> >>
>> >> def get_key(self, kv_obj):
>> >> return kv_obj.as_dict().get(self.__primary_key__)
>> >>
>> >>
>> >>
>> >> backend = RocksDBStateBackend(self.__conf__.get('CHECKPOINT_DIR'))
>> >>
>> >> self.__env__.set_state_backend(backend)
>> >>
>> >>
>> >> input_ds = input_ds.key_by(PrimaryKeySelector(self.__primary_key__),
>> key_type_info=primary_key_type_info)
>> >>
>> >> deduped_ds = input_ds.flat_map(DedupeFunction(self.__schema__),
>> output_type=type_info)
>> >>
>> >>
>> >> This program works fine if checkpointing is not enabled. Any advice
>> here?
>> >>
>> >>
>> >> Thanks
>>
>>
Re: pyflink keyed stream checkpoint error
Posted by Curt Buechter <tr...@gmail.com>.
Hi guys,
I'm still running into this problem. I checked the logs, and there is no
evidence that the python process crashed. I checked the process IDs and
they are still active after the error. No `killed process` messages in
/var/log/messages.
I don't think it's necessarily related to checkpointing. I noticed
https://issues.apache.org/jira/browse/FLINK-24123 and thought it was
possibly related. I tried upgrading to Flink 1.14.0, but get the (mostly)
same error, but now the error happens outside the context of performing the
checkpointing operation.
I tried reducing python.fn-execution.bundle.size to 10,000 (default
100,000), and no luck there, either.
2021-10-13 13:39:19
java.lang.RuntimeException: Error while waiting for BeamPythonFunctionRunner
flush
at org.apache.flink.streaming.api.operators.python.
AbstractPythonFunctionOperator.invokeFinishBundle(
AbstractPythonFunctionOperator.java:361)
at org.apache.flink.streaming.api.operators.python.
AbstractPythonFunctionOperator.checkInvokeFinishBundleByCount(
AbstractPythonFunctionOperator.java:321)
at org.apache.flink.streaming.api.operators.python.
AbstractOneInputPythonFunctionOperator.processElement(
AbstractOneInputPythonFunctionOperator.java:139)
at org.apache.flink.streaming.api.operators.python.
PythonKeyedProcessOperator.processElement(PythonKeyedProcessOperator.java:
176)
at org.apache.flink.streaming.runtime.tasks.
OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
.java:233)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput
.processElement(AbstractStreamTaskNetworkInput.java:134)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput
.emitNext(AbstractStreamTaskNetworkInput.java:105)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:496)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:203)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:809)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:761)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(
Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Failed to close remote bundle
at org.apache.flink.streaming.api.runners.python.beam.
BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:377)
at org.apache.flink.streaming.api.runners.python.beam.
BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:361)
at org.apache.flink.streaming.api.operators.python.
AbstractPythonFunctionOperator.lambda$invokeFinishBundle$2(
AbstractPythonFunctionOperator.java:340)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor
.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
.java:624)
... 1 more
Caused by: java.util.concurrent.ExecutionException:
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException:
CANCELLED: cancelled before receiving half close
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:
357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
at org.apache.beam.runners.fnexecution.control.
SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:
504)
at org.apache.beam.runners.fnexecution.control.
DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(
DefaultJobBundleFactory.java:555)
at org.apache.flink.streaming.api.runners.python.beam.
BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:375)
... 7 more
Caused by: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.
StatusRuntimeException: CANCELLED: cancelled before receiving half close
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status.asRuntimeException(
Status.java:524)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.
ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(
ServerCalls.java:275)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.
PartialForwardingServerCallListener.onCancel(
PartialForwardingServerCallListener.java:40)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener
.onCancel(ForwardingServerCallListener.java:23)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.
ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(
ForwardingServerCallListener.java:40)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.
Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.
ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:
353)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.
ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:341)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.
ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(
ServerImpl.java:867)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(
ContextRunnable.java:37)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor
.run(SerializingExecutor.java:123)
... 3 more
On Thu, Sep 23, 2021 at 9:07 PM Dian Fu <di...@gmail.com> wrote:
> I agree with Roman that it seems that the Python process has crashed.
>
> Besides the suggestions from Roman, I guess you could also try to
> configure the bundle size to smaller value via
> “python.fn-execution.bundle.size”.
>
> Regards,
> Dian
>
> > 2021年9月24日 上午3:48,Roman Khachatryan <ro...@apache.org> 写道:
> >
> > Hi,
> >
> > Is it possible that the python process crashed or hung up? (probably
> > performing a snapshot)
> > Could you validate this by checking the OS logs for OOM killer
> > messages or process status?
> >
> > Regards,
> > Roman
> >
> > On Wed, Sep 22, 2021 at 6:30 PM Curt Buechter <tr...@gmail.com>
> wrote:
> >>
> >> Hi,
> >> I'm getting an error after enabling checkpointing in my pyflink
> application that uses a keyed stream and rocksdb state.
> >>
> >> Here is the error message:
> >>
> >> 2021-09-22 16:18:14,408 INFO
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend [] -
> Closed RocksDB State Backend. Cleaning up RocksDB working directory
> /mnt/yarn/usercache/hadoop/appcache/application_1631886817823_0058/flink-io-db923140-fce6-433e-b26b-b7d53afbc38f/job_383b888cc0d49bbd7464906e45b88187_op_PythonKeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__uuid_16eefb37-8bae-43ef-b680-d6a4f9660c39.
> >> 2021-09-22 16:18:14,409 WARN org.apache.flink.runtime.taskmanager.Task
> [] - KEYED PROCESS -> Sink: Unnamed (1/1)#34
> (8f4fd40e863dd058822060dc3cf98831) switched from RUNNING to FAILED with
> failure cause: java.io.IOException: Could not perform checkpoint 2 for
> operator KEYED PROCESS -> Sink: Unnamed (1/1)#34.
> >> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1045)
> >> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135)
> >> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250)
> >> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61)
> >> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431)
> >> at
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61)
> >> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227)
> >> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
> >> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
> >> at org.apache.flink.streaming.runtime.io
> .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> >> at org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
> >> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
> >> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
> >> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
> >> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
> >> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
> >> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
> >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> >> at java.lang.Thread.run(Thread.java:748)
> >> Caused by: java.lang.RuntimeException: Failed to close remote bundle
> >> at
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:383)
> >> at
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:331)
> >> at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:320)
> >> at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.prepareSnapshotPreBarrier(AbstractPythonFunctionOperator.java:175)
> >> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.prepareSnapshotPreBarrier(OperatorChain.java:415)
> >> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:292)
> >> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1089)
> >> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> >> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1073)
> >> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1029)
> >> ... 19 more
> >> Caused by: java.util.concurrent.ExecutionException:
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException:
> CANCELLED: cancelled before receiving half close
> >> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> >> at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> >> at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
> >> at
> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
> >> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:555)
> >> at
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:381)
> >> ... 28 more
> >> Caused by:
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException:
> CANCELLED: cancelled before receiving half close
> >> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status.asRuntimeException(Status.java:524)
> >> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:275)
> >> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
> >> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
> >> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
> >> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
> >> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:353)
> >> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:341)
> >> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:867)
> >> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
> >> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
> >> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >> ... 1 more
> >>
> >> 2021-09-22 16:18:14,410 INFO org.apache.flink.runtime.taskmanager.Task
> [] - Freeing task resources for KEYED PROCESS -> Sink: Unnamed (1/1)#34
> (8f4fd40e863dd058822060dc3cf98831).
> >> 2021-09-22 16:18:14,411 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task
> and sending final execution state FAILED to JobManager for task KEYED
> PROCESS -> Sink: Unnamed (1/1)#34 8f4fd40e863dd058822060dc3cf98831.
> >> 2021-09-22 16:18:14,426 INFO org.apache.flink.runtime.taskmanager.Task
> [] - Attempting to cancel task Source: Custom Source ->
> _stream_key_by_map_operator (1/1)#34 (290905523bfbf344381b7a04a8d36831).
> >> 2021-09-22 16:18:14,426 INFO org.apache.flink.runtime.taskmanager.Task
> [] - Source: Custom Source -> _stream_key_by_map_operator (1/1)#34
> (290905523bfbf344381b7a04a8d36831) switched from RUNNING to CANCELING.
> >> 2021-09-22 16:18:14,426 INFO org.apache.flink.runtime.taskmanager.Task
> [] - Triggering cancellation of task code Source: Custom Source ->
> _stream_key_by_map_operator (1/1)#34 (290905523bfbf344381b7a04a8d36831).
> >> 2021-09-22 16:18:14,428 INFO org.apache.flink.runtime.taskmanager.Task
> [] - Ignoring checkpoint aborted notification for non-running task Source:
> Custom Source -> _stream_key_by_map_operator (1/1)#34.
> >> 2021-09-22 16:18:14,430 INFO org.apache.kafka.common.metrics.Metrics []
> - Metrics scheduler closed
> >> 2021-09-22 16:18:14,430 INFO org.apache.kafka.common.metrics.Metrics []
> - Closing reporter org.apache.kafka.common.metrics.JmxReporter
> >> 2021-09-22 16:18:14,430 INFO org.apache.kafka.common.metrics.Metrics []
> - Metrics reporters closed
> >>
> >>
> >> And, here is some source code:
> >>
> >> class DedupeFunction(FlatMapFunction):
> >> def __init__(self, schema):
> >> super().__init__()
> >> self.logger = None
> >> self.state = None
> >> self.my_state = None
> >> self.schema = schema
> >> self.metric_columns = [c.column_name for c in schema.columns if
> c.is_metric]
> >>
> >> def open(self, runtime_context: RuntimeContext):
> >> self.logger = logging
> >> self.logger.info('Opening the FlatMapFunction')
> >> descriptor = MapStateDescriptor("my_map_state_descriptor",
> Types.PICKLED_BYTE_ARRAY(), Types.PICKLED_BYTE_ARRAY())
> >> self.state = runtime_context.get_map_state(descriptor)
> >>
> >> def flat_map(self, value):
> >> try:
> >> if not self.state.is_empty():
> >> # self.logger.info('key in state')
> >> previous_dict = {}
> >> for k, v in self.state.items():
> >> # reverse the metric columns
> >> if k in self.metric_columns:
> >> if v:
> >> v = -v
> >> previous_dict[k] = v
> >> yield Row(**previous_dict)
> >> new_dict = value.as_dict()
> >> self.state.put_all(new_dict.items())
> >> yield value
> >> except Exception as ex:
> >> self.logger.error(f'ERROR in dedupe_datastream: {str(ex)}')
> >>
> >> class PrimaryKeySelector(KeySelector):
> >>
> >> def __init__(self, primary_key):
> >> self.__primary_key__ = primary_key
> >>
> >> def get_key(self, kv_obj):
> >> return kv_obj.as_dict().get(self.__primary_key__)
> >>
> >>
> >>
> >> backend = RocksDBStateBackend(self.__conf__.get('CHECKPOINT_DIR'))
> >>
> >> self.__env__.set_state_backend(backend)
> >>
> >>
> >> input_ds = input_ds.key_by(PrimaryKeySelector(self.__primary_key__),
> key_type_info=primary_key_type_info)
> >>
> >> deduped_ds = input_ds.flat_map(DedupeFunction(self.__schema__),
> output_type=type_info)
> >>
> >>
> >> This program works fine if checkpointing is not enabled. Any advice
> here?
> >>
> >>
> >> Thanks
>
>
Re: pyflink keyed stream checkpoint error
Posted by Dian Fu <di...@gmail.com>.
PS: there are more information about this configuration in https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/python/python_config/#python-fn-execution-bundle-size
> 2021年9月24日 上午10:07,Dian Fu <di...@gmail.com> 写道:
>
> I agree with Roman that it seems that the Python process has crashed.
>
> Besides the suggestions from Roman, I guess you could also try to configure the bundle size to smaller value via “python.fn-execution.bundle.size”.
>
> Regards,
> Dian
>
>> 2021年9月24日 上午3:48,Roman Khachatryan <ro...@apache.org> 写道:
>>
>> Hi,
>>
>> Is it possible that the python process crashed or hung up? (probably
>> performing a snapshot)
>> Could you validate this by checking the OS logs for OOM killer
>> messages or process status?
>>
>> Regards,
>> Roman
>>
>> On Wed, Sep 22, 2021 at 6:30 PM Curt Buechter <tr...@gmail.com> wrote:
>>>
>>> Hi,
>>> I'm getting an error after enabling checkpointing in my pyflink application that uses a keyed stream and rocksdb state.
>>>
>>> Here is the error message:
>>>
>>> 2021-09-22 16:18:14,408 INFO org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend [] - Closed RocksDB State Backend. Cleaning up RocksDB working directory /mnt/yarn/usercache/hadoop/appcache/application_1631886817823_0058/flink-io-db923140-fce6-433e-b26b-b7d53afbc38f/job_383b888cc0d49bbd7464906e45b88187_op_PythonKeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__uuid_16eefb37-8bae-43ef-b680-d6a4f9660c39.
>>> 2021-09-22 16:18:14,409 WARN org.apache.flink.runtime.taskmanager.Task [] - KEYED PROCESS -> Sink: Unnamed (1/1)#34 (8f4fd40e863dd058822060dc3cf98831) switched from RUNNING to FAILED with failure cause: java.io.IOException: Could not perform checkpoint 2 for operator KEYED PROCESS -> Sink: Unnamed (1/1)#34.
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1045)
>>> at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135)
>>> at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250)
>>> at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61)
>>> at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431)
>>> at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61)
>>> at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227)
>>> at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
>>> at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
>>> at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
>>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
>>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.RuntimeException: Failed to close remote bundle
>>> at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:383)
>>> at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:331)
>>> at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:320)
>>> at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.prepareSnapshotPreBarrier(AbstractPythonFunctionOperator.java:175)
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain.prepareSnapshotPreBarrier(OperatorChain.java:415)
>>> at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:292)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1089)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1073)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1029)
>>> ... 19 more
>>> Caused by: java.util.concurrent.ExecutionException: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: CANCELLED: cancelled before receiving half close
>>> at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>>> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>>> at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
>>> at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
>>> at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:555)
>>> at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:381)
>>> ... 28 more
>>> Caused by: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: CANCELLED: cancelled before receiving half close
>>> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status.asRuntimeException(Status.java:524)
>>> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:275)
>>> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
>>> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
>>> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
>>> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
>>> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:353)
>>> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:341)
>>> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:867)
>>> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>>> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> ... 1 more
>>>
>>> 2021-09-22 16:18:14,410 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for KEYED PROCESS -> Sink: Unnamed (1/1)#34 (8f4fd40e863dd058822060dc3cf98831).
>>> 2021-09-22 16:18:14,411 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FAILED to JobManager for task KEYED PROCESS -> Sink: Unnamed (1/1)#34 8f4fd40e863dd058822060dc3cf98831.
>>> 2021-09-22 16:18:14,426 INFO org.apache.flink.runtime.taskmanager.Task [] - Attempting to cancel task Source: Custom Source -> _stream_key_by_map_operator (1/1)#34 (290905523bfbf344381b7a04a8d36831).
>>> 2021-09-22 16:18:14,426 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> _stream_key_by_map_operator (1/1)#34 (290905523bfbf344381b7a04a8d36831) switched from RUNNING to CANCELING.
>>> 2021-09-22 16:18:14,426 INFO org.apache.flink.runtime.taskmanager.Task [] - Triggering cancellation of task code Source: Custom Source -> _stream_key_by_map_operator (1/1)#34 (290905523bfbf344381b7a04a8d36831).
>>> 2021-09-22 16:18:14,428 INFO org.apache.flink.runtime.taskmanager.Task [] - Ignoring checkpoint aborted notification for non-running task Source: Custom Source -> _stream_key_by_map_operator (1/1)#34.
>>> 2021-09-22 16:18:14,430 INFO org.apache.kafka.common.metrics.Metrics [] - Metrics scheduler closed
>>> 2021-09-22 16:18:14,430 INFO org.apache.kafka.common.metrics.Metrics [] - Closing reporter org.apache.kafka.common.metrics.JmxReporter
>>> 2021-09-22 16:18:14,430 INFO org.apache.kafka.common.metrics.Metrics [] - Metrics reporters closed
>>>
>>>
>>> And, here is some source code:
>>>
>>> class DedupeFunction(FlatMapFunction):
>>> def __init__(self, schema):
>>> super().__init__()
>>> self.logger = None
>>> self.state = None
>>> self.my_state = None
>>> self.schema = schema
>>> self.metric_columns = [c.column_name for c in schema.columns if c.is_metric]
>>>
>>> def open(self, runtime_context: RuntimeContext):
>>> self.logger = logging
>>> self.logger.info('Opening the FlatMapFunction')
>>> descriptor = MapStateDescriptor("my_map_state_descriptor", Types.PICKLED_BYTE_ARRAY(), Types.PICKLED_BYTE_ARRAY())
>>> self.state = runtime_context.get_map_state(descriptor)
>>>
>>> def flat_map(self, value):
>>> try:
>>> if not self.state.is_empty():
>>> # self.logger.info('key in state')
>>> previous_dict = {}
>>> for k, v in self.state.items():
>>> # reverse the metric columns
>>> if k in self.metric_columns:
>>> if v:
>>> v = -v
>>> previous_dict[k] = v
>>> yield Row(**previous_dict)
>>> new_dict = value.as_dict()
>>> self.state.put_all(new_dict.items())
>>> yield value
>>> except Exception as ex:
>>> self.logger.error(f'ERROR in dedupe_datastream: {str(ex)}')
>>>
>>> class PrimaryKeySelector(KeySelector):
>>>
>>> def __init__(self, primary_key):
>>> self.__primary_key__ = primary_key
>>>
>>> def get_key(self, kv_obj):
>>> return kv_obj.as_dict().get(self.__primary_key__)
>>>
>>>
>>>
>>> backend = RocksDBStateBackend(self.__conf__.get('CHECKPOINT_DIR'))
>>>
>>> self.__env__.set_state_backend(backend)
>>>
>>>
>>> input_ds = input_ds.key_by(PrimaryKeySelector(self.__primary_key__), key_type_info=primary_key_type_info)
>>>
>>> deduped_ds = input_ds.flat_map(DedupeFunction(self.__schema__), output_type=type_info)
>>>
>>>
>>> This program works fine if checkpointing is not enabled. Any advice here?
>>>
>>>
>>> Thanks
>
Re: pyflink keyed stream checkpoint error
Posted by Curt Buechter <tr...@gmail.com>.
Guess my last reply didn't go through, so here goes again...
Possibly, but I don't think so. Since I submitted this, I have done some
more testing. It works fine with file system or memory state backends, but
not with rocksdb. I will try again and check the logs, though.
I've also tested rocksdb checkpointing on other jobs, and it works fine.
But when I combine rocksdb with the keyed stream, it fails.
Thanks for the suggestions, I'll look into them.
On Thu, Sep 23, 2021 at 9:07 PM Dian Fu <di...@gmail.com> wrote:
> I agree with Roman that it seems that the Python process has crashed.
>
> Besides the suggestions from Roman, I guess you could also try to
> configure the bundle size to smaller value via
> “python.fn-execution.bundle.size”.
>
> Regards,
> Dian
>
> > 2021年9月24日 上午3:48,Roman Khachatryan <ro...@apache.org> 写道:
> >
> > Hi,
> >
> > Is it possible that the python process crashed or hung up? (probably
> > performing a snapshot)
> > Could you validate this by checking the OS logs for OOM killer
> > messages or process status?
> >
> > Regards,
> > Roman
> >
> > On Wed, Sep 22, 2021 at 6:30 PM Curt Buechter <tr...@gmail.com>
> wrote:
> >>
> >> Hi,
> >> I'm getting an error after enabling checkpointing in my pyflink
> application that uses a keyed stream and rocksdb state.
> >>
> >> Here is the error message:
> >>
> >> 2021-09-22 16:18:14,408 INFO
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend [] -
> Closed RocksDB State Backend. Cleaning up RocksDB working directory
> /mnt/yarn/usercache/hadoop/appcache/application_1631886817823_0058/flink-io-db923140-fce6-433e-b26b-b7d53afbc38f/job_383b888cc0d49bbd7464906e45b88187_op_PythonKeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__uuid_16eefb37-8bae-43ef-b680-d6a4f9660c39.
> >> 2021-09-22 16:18:14,409 WARN org.apache.flink.runtime.taskmanager.Task
> [] - KEYED PROCESS -> Sink: Unnamed (1/1)#34
> (8f4fd40e863dd058822060dc3cf98831) switched from RUNNING to FAILED with
> failure cause: java.io.IOException: Could not perform checkpoint 2 for
> operator KEYED PROCESS -> Sink: Unnamed (1/1)#34.
> >> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1045)
> >> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135)
> >> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250)
> >> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61)
> >> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431)
> >> at
> org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61)
> >> at
> org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227)
> >> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
> >> at
> org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
> >> at org.apache.flink.streaming.runtime.io
> .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> >> at org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
> >> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
> >> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
> >> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
> >> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
> >> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
> >> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
> >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> >> at java.lang.Thread.run(Thread.java:748)
> >> Caused by: java.lang.RuntimeException: Failed to close remote bundle
> >> at
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:383)
> >> at
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:331)
> >> at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:320)
> >> at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.prepareSnapshotPreBarrier(AbstractPythonFunctionOperator.java:175)
> >> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.prepareSnapshotPreBarrier(OperatorChain.java:415)
> >> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:292)
> >> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1089)
> >> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> >> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1073)
> >> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1029)
> >> ... 19 more
> >> Caused by: java.util.concurrent.ExecutionException:
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException:
> CANCELLED: cancelled before receiving half close
> >> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> >> at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> >> at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
> >> at
> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
> >> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:555)
> >> at
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:381)
> >> ... 28 more
> >> Caused by:
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException:
> CANCELLED: cancelled before receiving half close
> >> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status.asRuntimeException(Status.java:524)
> >> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:275)
> >> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
> >> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
> >> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
> >> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
> >> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:353)
> >> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:341)
> >> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:867)
> >> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
> >> at
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
> >> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >> ... 1 more
> >>
> >> 2021-09-22 16:18:14,410 INFO org.apache.flink.runtime.taskmanager.Task
> [] - Freeing task resources for KEYED PROCESS -> Sink: Unnamed (1/1)#34
> (8f4fd40e863dd058822060dc3cf98831).
> >> 2021-09-22 16:18:14,411 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task
> and sending final execution state FAILED to JobManager for task KEYED
> PROCESS -> Sink: Unnamed (1/1)#34 8f4fd40e863dd058822060dc3cf98831.
> >> 2021-09-22 16:18:14,426 INFO org.apache.flink.runtime.taskmanager.Task
> [] - Attempting to cancel task Source: Custom Source ->
> _stream_key_by_map_operator (1/1)#34 (290905523bfbf344381b7a04a8d36831).
> >> 2021-09-22 16:18:14,426 INFO org.apache.flink.runtime.taskmanager.Task
> [] - Source: Custom Source -> _stream_key_by_map_operator (1/1)#34
> (290905523bfbf344381b7a04a8d36831) switched from RUNNING to CANCELING.
> >> 2021-09-22 16:18:14,426 INFO org.apache.flink.runtime.taskmanager.Task
> [] - Triggering cancellation of task code Source: Custom Source ->
> _stream_key_by_map_operator (1/1)#34 (290905523bfbf344381b7a04a8d36831).
> >> 2021-09-22 16:18:14,428 INFO org.apache.flink.runtime.taskmanager.Task
> [] - Ignoring checkpoint aborted notification for non-running task Source:
> Custom Source -> _stream_key_by_map_operator (1/1)#34.
> >> 2021-09-22 16:18:14,430 INFO org.apache.kafka.common.metrics.Metrics []
> - Metrics scheduler closed
> >> 2021-09-22 16:18:14,430 INFO org.apache.kafka.common.metrics.Metrics []
> - Closing reporter org.apache.kafka.common.metrics.JmxReporter
> >> 2021-09-22 16:18:14,430 INFO org.apache.kafka.common.metrics.Metrics []
> - Metrics reporters closed
> >>
> >>
> >> And, here is some source code:
> >>
> >> class DedupeFunction(FlatMapFunction):
> >> def __init__(self, schema):
> >> super().__init__()
> >> self.logger = None
> >> self.state = None
> >> self.my_state = None
> >> self.schema = schema
> >> self.metric_columns = [c.column_name for c in schema.columns if
> c.is_metric]
> >>
> >> def open(self, runtime_context: RuntimeContext):
> >> self.logger = logging
> >> self.logger.info('Opening the FlatMapFunction')
> >> descriptor = MapStateDescriptor("my_map_state_descriptor",
> Types.PICKLED_BYTE_ARRAY(), Types.PICKLED_BYTE_ARRAY())
> >> self.state = runtime_context.get_map_state(descriptor)
> >>
> >> def flat_map(self, value):
> >> try:
> >> if not self.state.is_empty():
> >> # self.logger.info('key in state')
> >> previous_dict = {}
> >> for k, v in self.state.items():
> >> # reverse the metric columns
> >> if k in self.metric_columns:
> >> if v:
> >> v = -v
> >> previous_dict[k] = v
> >> yield Row(**previous_dict)
> >> new_dict = value.as_dict()
> >> self.state.put_all(new_dict.items())
> >> yield value
> >> except Exception as ex:
> >> self.logger.error(f'ERROR in dedupe_datastream: {str(ex)}')
> >>
> >> class PrimaryKeySelector(KeySelector):
> >>
> >> def __init__(self, primary_key):
> >> self.__primary_key__ = primary_key
> >>
> >> def get_key(self, kv_obj):
> >> return kv_obj.as_dict().get(self.__primary_key__)
> >>
> >>
> >>
> >> backend = RocksDBStateBackend(self.__conf__.get('CHECKPOINT_DIR'))
> >>
> >> self.__env__.set_state_backend(backend)
> >>
> >>
> >> input_ds = input_ds.key_by(PrimaryKeySelector(self.__primary_key__),
> key_type_info=primary_key_type_info)
> >>
> >> deduped_ds = input_ds.flat_map(DedupeFunction(self.__schema__),
> output_type=type_info)
> >>
> >>
> >> This program works fine if checkpointing is not enabled. Any advice
> here?
> >>
> >>
> >> Thanks
>
>
Re: pyflink keyed stream checkpoint error
Posted by Dian Fu <di...@gmail.com>.
I agree with Roman that it seems that the Python process has crashed.
Besides the suggestions from Roman, I guess you could also try to configure the bundle size to smaller value via “python.fn-execution.bundle.size”.
Regards,
Dian
> 2021年9月24日 上午3:48,Roman Khachatryan <ro...@apache.org> 写道:
>
> Hi,
>
> Is it possible that the python process crashed or hung up? (probably
> performing a snapshot)
> Could you validate this by checking the OS logs for OOM killer
> messages or process status?
>
> Regards,
> Roman
>
> On Wed, Sep 22, 2021 at 6:30 PM Curt Buechter <tr...@gmail.com> wrote:
>>
>> Hi,
>> I'm getting an error after enabling checkpointing in my pyflink application that uses a keyed stream and rocksdb state.
>>
>> Here is the error message:
>>
>> 2021-09-22 16:18:14,408 INFO org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend [] - Closed RocksDB State Backend. Cleaning up RocksDB working directory /mnt/yarn/usercache/hadoop/appcache/application_1631886817823_0058/flink-io-db923140-fce6-433e-b26b-b7d53afbc38f/job_383b888cc0d49bbd7464906e45b88187_op_PythonKeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__uuid_16eefb37-8bae-43ef-b680-d6a4f9660c39.
>> 2021-09-22 16:18:14,409 WARN org.apache.flink.runtime.taskmanager.Task [] - KEYED PROCESS -> Sink: Unnamed (1/1)#34 (8f4fd40e863dd058822060dc3cf98831) switched from RUNNING to FAILED with failure cause: java.io.IOException: Could not perform checkpoint 2 for operator KEYED PROCESS -> Sink: Unnamed (1/1)#34.
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1045)
>> at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135)
>> at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250)
>> at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61)
>> at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431)
>> at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61)
>> at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227)
>> at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
>> at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
>> at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.RuntimeException: Failed to close remote bundle
>> at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:383)
>> at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:331)
>> at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:320)
>> at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.prepareSnapshotPreBarrier(AbstractPythonFunctionOperator.java:175)
>> at org.apache.flink.streaming.runtime.tasks.OperatorChain.prepareSnapshotPreBarrier(OperatorChain.java:415)
>> at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:292)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1089)
>> at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1073)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1029)
>> ... 19 more
>> Caused by: java.util.concurrent.ExecutionException: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: CANCELLED: cancelled before receiving half close
>> at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>> at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
>> at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
>> at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:555)
>> at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:381)
>> ... 28 more
>> Caused by: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: CANCELLED: cancelled before receiving half close
>> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status.asRuntimeException(Status.java:524)
>> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:275)
>> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
>> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
>> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
>> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
>> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:353)
>> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:341)
>> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:867)
>> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> ... 1 more
>>
>> 2021-09-22 16:18:14,410 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for KEYED PROCESS -> Sink: Unnamed (1/1)#34 (8f4fd40e863dd058822060dc3cf98831).
>> 2021-09-22 16:18:14,411 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FAILED to JobManager for task KEYED PROCESS -> Sink: Unnamed (1/1)#34 8f4fd40e863dd058822060dc3cf98831.
>> 2021-09-22 16:18:14,426 INFO org.apache.flink.runtime.taskmanager.Task [] - Attempting to cancel task Source: Custom Source -> _stream_key_by_map_operator (1/1)#34 (290905523bfbf344381b7a04a8d36831).
>> 2021-09-22 16:18:14,426 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> _stream_key_by_map_operator (1/1)#34 (290905523bfbf344381b7a04a8d36831) switched from RUNNING to CANCELING.
>> 2021-09-22 16:18:14,426 INFO org.apache.flink.runtime.taskmanager.Task [] - Triggering cancellation of task code Source: Custom Source -> _stream_key_by_map_operator (1/1)#34 (290905523bfbf344381b7a04a8d36831).
>> 2021-09-22 16:18:14,428 INFO org.apache.flink.runtime.taskmanager.Task [] - Ignoring checkpoint aborted notification for non-running task Source: Custom Source -> _stream_key_by_map_operator (1/1)#34.
>> 2021-09-22 16:18:14,430 INFO org.apache.kafka.common.metrics.Metrics [] - Metrics scheduler closed
>> 2021-09-22 16:18:14,430 INFO org.apache.kafka.common.metrics.Metrics [] - Closing reporter org.apache.kafka.common.metrics.JmxReporter
>> 2021-09-22 16:18:14,430 INFO org.apache.kafka.common.metrics.Metrics [] - Metrics reporters closed
>>
>>
>> And, here is some source code:
>>
>> class DedupeFunction(FlatMapFunction):
>> def __init__(self, schema):
>> super().__init__()
>> self.logger = None
>> self.state = None
>> self.my_state = None
>> self.schema = schema
>> self.metric_columns = [c.column_name for c in schema.columns if c.is_metric]
>>
>> def open(self, runtime_context: RuntimeContext):
>> self.logger = logging
>> self.logger.info('Opening the FlatMapFunction')
>> descriptor = MapStateDescriptor("my_map_state_descriptor", Types.PICKLED_BYTE_ARRAY(), Types.PICKLED_BYTE_ARRAY())
>> self.state = runtime_context.get_map_state(descriptor)
>>
>> def flat_map(self, value):
>> try:
>> if not self.state.is_empty():
>> # self.logger.info('key in state')
>> previous_dict = {}
>> for k, v in self.state.items():
>> # reverse the metric columns
>> if k in self.metric_columns:
>> if v:
>> v = -v
>> previous_dict[k] = v
>> yield Row(**previous_dict)
>> new_dict = value.as_dict()
>> self.state.put_all(new_dict.items())
>> yield value
>> except Exception as ex:
>> self.logger.error(f'ERROR in dedupe_datastream: {str(ex)}')
>>
>> class PrimaryKeySelector(KeySelector):
>>
>> def __init__(self, primary_key):
>> self.__primary_key__ = primary_key
>>
>> def get_key(self, kv_obj):
>> return kv_obj.as_dict().get(self.__primary_key__)
>>
>>
>>
>> backend = RocksDBStateBackend(self.__conf__.get('CHECKPOINT_DIR'))
>>
>> self.__env__.set_state_backend(backend)
>>
>>
>> input_ds = input_ds.key_by(PrimaryKeySelector(self.__primary_key__), key_type_info=primary_key_type_info)
>>
>> deduped_ds = input_ds.flat_map(DedupeFunction(self.__schema__), output_type=type_info)
>>
>>
>> This program works fine if checkpointing is not enabled. Any advice here?
>>
>>
>> Thanks
Re: pyflink keyed stream checkpoint error
Posted by Roman Khachatryan <ro...@apache.org>.
Hi,
Is it possible that the python process crashed or hung up? (probably
performing a snapshot)
Could you validate this by checking the OS logs for OOM killer
messages or process status?
Regards,
Roman
On Wed, Sep 22, 2021 at 6:30 PM Curt Buechter <tr...@gmail.com> wrote:
>
> Hi,
> I'm getting an error after enabling checkpointing in my pyflink application that uses a keyed stream and rocksdb state.
>
> Here is the error message:
>
> 2021-09-22 16:18:14,408 INFO org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend [] - Closed RocksDB State Backend. Cleaning up RocksDB working directory /mnt/yarn/usercache/hadoop/appcache/application_1631886817823_0058/flink-io-db923140-fce6-433e-b26b-b7d53afbc38f/job_383b888cc0d49bbd7464906e45b88187_op_PythonKeyedProcessOperator_90bea66de1c231edf33913ecd54406c1__1_1__uuid_16eefb37-8bae-43ef-b680-d6a4f9660c39.
> 2021-09-22 16:18:14,409 WARN org.apache.flink.runtime.taskmanager.Task [] - KEYED PROCESS -> Sink: Unnamed (1/1)#34 (8f4fd40e863dd058822060dc3cf98831) switched from RUNNING to FAILED with failure cause: java.io.IOException: Could not perform checkpoint 2 for operator KEYED PROCESS -> Sink: Unnamed (1/1)#34.
> at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1045)
> at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135)
> at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250)
> at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61)
> at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431)
> at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61)
> at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227)
> at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
> at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
> at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Failed to close remote bundle
> at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:383)
> at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:331)
> at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:320)
> at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.prepareSnapshotPreBarrier(AbstractPythonFunctionOperator.java:175)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain.prepareSnapshotPreBarrier(OperatorChain.java:415)
> at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:292)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1089)
> at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1073)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1029)
> ... 19 more
> Caused by: java.util.concurrent.ExecutionException: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: CANCELLED: cancelled before receiving half close
> at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
> at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
> at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:555)
> at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:381)
> ... 28 more
> Caused by: org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusRuntimeException: CANCELLED: cancelled before receiving half close
> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status.asRuntimeException(Status.java:524)
> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:275)
> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:353)
> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:341)
> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:867)
> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
> at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ... 1 more
>
> 2021-09-22 16:18:14,410 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for KEYED PROCESS -> Sink: Unnamed (1/1)#34 (8f4fd40e863dd058822060dc3cf98831).
> 2021-09-22 16:18:14,411 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FAILED to JobManager for task KEYED PROCESS -> Sink: Unnamed (1/1)#34 8f4fd40e863dd058822060dc3cf98831.
> 2021-09-22 16:18:14,426 INFO org.apache.flink.runtime.taskmanager.Task [] - Attempting to cancel task Source: Custom Source -> _stream_key_by_map_operator (1/1)#34 (290905523bfbf344381b7a04a8d36831).
> 2021-09-22 16:18:14,426 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> _stream_key_by_map_operator (1/1)#34 (290905523bfbf344381b7a04a8d36831) switched from RUNNING to CANCELING.
> 2021-09-22 16:18:14,426 INFO org.apache.flink.runtime.taskmanager.Task [] - Triggering cancellation of task code Source: Custom Source -> _stream_key_by_map_operator (1/1)#34 (290905523bfbf344381b7a04a8d36831).
> 2021-09-22 16:18:14,428 INFO org.apache.flink.runtime.taskmanager.Task [] - Ignoring checkpoint aborted notification for non-running task Source: Custom Source -> _stream_key_by_map_operator (1/1)#34.
> 2021-09-22 16:18:14,430 INFO org.apache.kafka.common.metrics.Metrics [] - Metrics scheduler closed
> 2021-09-22 16:18:14,430 INFO org.apache.kafka.common.metrics.Metrics [] - Closing reporter org.apache.kafka.common.metrics.JmxReporter
> 2021-09-22 16:18:14,430 INFO org.apache.kafka.common.metrics.Metrics [] - Metrics reporters closed
>
>
> And, here is some source code:
>
> class DedupeFunction(FlatMapFunction):
> def __init__(self, schema):
> super().__init__()
> self.logger = None
> self.state = None
> self.my_state = None
> self.schema = schema
> self.metric_columns = [c.column_name for c in schema.columns if c.is_metric]
>
> def open(self, runtime_context: RuntimeContext):
> self.logger = logging
> self.logger.info('Opening the FlatMapFunction')
> descriptor = MapStateDescriptor("my_map_state_descriptor", Types.PICKLED_BYTE_ARRAY(), Types.PICKLED_BYTE_ARRAY())
> self.state = runtime_context.get_map_state(descriptor)
>
> def flat_map(self, value):
> try:
> if not self.state.is_empty():
> # self.logger.info('key in state')
> previous_dict = {}
> for k, v in self.state.items():
> # reverse the metric columns
> if k in self.metric_columns:
> if v:
> v = -v
> previous_dict[k] = v
> yield Row(**previous_dict)
> new_dict = value.as_dict()
> self.state.put_all(new_dict.items())
> yield value
> except Exception as ex:
> self.logger.error(f'ERROR in dedupe_datastream: {str(ex)}')
>
> class PrimaryKeySelector(KeySelector):
>
> def __init__(self, primary_key):
> self.__primary_key__ = primary_key
>
> def get_key(self, kv_obj):
> return kv_obj.as_dict().get(self.__primary_key__)
>
>
>
> backend = RocksDBStateBackend(self.__conf__.get('CHECKPOINT_DIR'))
>
> self.__env__.set_state_backend(backend)
>
>
> input_ds = input_ds.key_by(PrimaryKeySelector(self.__primary_key__), key_type_info=primary_key_type_info)
>
> deduped_ds = input_ds.flat_map(DedupeFunction(self.__schema__), output_type=type_info)
>
>
> This program works fine if checkpointing is not enabled. Any advice here?
>
>
> Thanks