You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "wangzhewenyi (Jira)" <ji...@apache.org> on 2022/11/08 04:34:00 UTC

[jira] [Commented] (FLINK-5463) RocksDB.disposeInternal does not react to interrupts, blocks task cancellation

    [ https://issues.apache.org/jira/browse/FLINK-5463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17630174#comment-17630174 ] 

wangzhewenyi commented on FLINK-5463:
-------------------------------------

We had the same problem。 We use flink version 1.11 and submit the flink task in standalon mode。{*}An error occurs when the sstable file in the tmp directory of linux is deleted。{*} *As you can see from the flink thread stack, the disposeInternal function will be suspended。This results in flink tasks that cannot be canceled and new tasks that cannot be restarted.* Flink should be able to handle the error elegantly instead of hanging forever.{*}{*}

 

*error info* *as follows:*

org.apache.flink.util.FlinkRuntimeException: Error while adding data to RocksDB
    at org.apache.flink.contrib.streaming.state.RocksDBListState.add(RocksDBListState.java:168)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:394)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.rocksdb.RocksDBException: While open a file for appending: /tmp/flink-io-b990a9a9-fb96-4d30-bc60-2a7cce923887/job_0cb5b535f8b9a74302233a9a520c828e_op_WindowOperator_d3340fe028bd2a0d8d6de62cea3d11cf__14_20__uuid_6f52bf03-612c-4d13-bd72-76893dfcf709/db/000013.sst: No such file or directory
    at org.rocksdb.RocksDB.merge(Native Method)
    at org.rocksdb.RocksDB.merge(RocksDB.java:683)
    at org.apache.flink.contrib.streaming.state.RocksDBListState.add(RocksDBListState.java:161)
    ... 13 more

 

*The stack as follows:*

"Window(ProcessingTimeSessionWindows(30000), ProcessingTimeTrigger, ProcessWindowFunction$2) (8/12)" #1733 prio=5 os_prio=0 cpu=95.13ms elapsed=1829.37s tid=0x00007f3550015000 nid=0x98a8 runnable  [0x00007f33c88e6000]
   java.lang.Thread.State: RUNNABLE
    at org.rocksdb.RocksDB.disposeInternal(Native Method)
    at org.rocksdb.RocksObject.disposeInternal(RocksObject.java:37)
    at org.rocksdb.AbstractImmutableNativeReference.close(AbstractImmutableNativeReference.java:57)
    at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:279)
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:366)
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.lambda$dispose$1(StreamOperatorStateHandler.java:131)
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler$$Lambda$929/0x00000017ce31cc40.close(Unknown Source)
    at org.apache.flink.shaded.guava18.com.google.common.io.Closer.close(Closer.java:214)
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.dispose(StreamOperatorStateHandler.java:133)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:298)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:114)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.dispose(WindowOperator.java:286)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:703)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:635)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:542)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(java.base@11.0.2/Thread.java:834)

   Locked ownable synchronizers:
    - None

> RocksDB.disposeInternal does not react to interrupts, blocks task cancellation
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-5463
>                 URL: https://issues.apache.org/jira/browse/FLINK-5463
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.2.0, 1.9.0
>            Reporter: Robert Metzger
>            Priority: Not a Priority
>              Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> I'm using Flink 699f4b0.
> My Flink job is slow while cancelling because RockDB seems to be busy with disposing its state:
> {code}
> 2017-01-11 18:48:23,315 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code TriggerWindow(TumblingEventTimeWindows(4), ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@2edcd071
> }, EventTimeTrigger(), WindowedStream.apply(AllWindowedStream.java:440)) (1/1) (2accc6ca2727c4f7ec963318fbd237e9).
> 2017-01-11 18:48:53,318 WARN  org.apache.flink.runtime.taskmanager.Task                     - Task 'TriggerWindow(TumblingEventTimeWindows(4), ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@2edcd071}, EventTimeTrigger(), Windowed
> Stream.apply(AllWindowedStream.java:440)) (1/1)' did not react to cancelling signal, but is stuck in method:
>  org.rocksdb.RocksDB.disposeInternal(Native Method)
> org.rocksdb.RocksObject.disposeInternal(RocksObject.java:37)
> org.rocksdb.AbstractImmutableNativeReference.close(AbstractImmutableNativeReference.java:56)
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:250)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:331)
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:169)
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.dispose(WindowOperator.java:273)
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:439)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:340)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:654)
> java.lang.Thread.run(Thread.java:745)
> 2017-01-11 18:48:53,319 WARN  org.apache.flink.runtime.taskmanager.Task                     - Task 'TriggerWindow(TumblingEventTimeWindows(4), ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@2edcd071}, EventTimeTrigger(), WindowedStream.apply(AllWindowedStream.java:440)) (1/1)' did not react to cancelling signal, but is stuck in method:
>  org.rocksdb.RocksDB.disposeInternal(Native Method)
> org.rocksdb.RocksObject.disposeInternal(RocksObject.java:37)
> org.rocksdb.AbstractImmutableNativeReference.close(AbstractImmutableNativeReference.java:56)
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:250)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:331)
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:169)
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.dispose(WindowOperator.java:273)
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:439)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:340)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:654)
> java.lang.Thread.run(Thread.java:745)
> 2017-01-11 18:49:23,319 WARN  org.apache.flink.runtime.taskmanager.Task                     - Task 'TriggerWindow(TumblingEventTimeWindows(4), ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@2edcd071}, EventTimeTrigger(), WindowedStream.apply(AllWindowedStream.java:440)) (1/1)' did not react to cancelling signal, but is stuck in method:
>  org.rocksdb.RocksDB.disposeInternal(Native Method)
> org.rocksdb.RocksObject.disposeInternal(RocksObject.java:37)
> org.rocksdb.AbstractImmutableNativeReference.close(AbstractImmutableNativeReference.java:56)
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.dispose(RocksDBKeyedStateBackend.java:250)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.dispose(AbstractStreamOperator.java:331)
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:169)
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.dispose(WindowOperator.java:273)
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:439)
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:340)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:654)
> java.lang.Thread.run(Thread.java:745)
> 2017-01-11 18:49:50,080 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for TriggerWindow(TumblingEventTimeWindows(4), ListStateDescriptor{serializer=org.apache.flink.api.java.typeutils.runtime.TupleSerializer@2edcd071}, EventTimeTrigger(), WindowedStream.apply(AllWindowedStream.java:440)) (1/1) (2accc6ca2727c4f7ec963318fbd237e9)
> {code}
> I'm filing this issue because I didn't see such a behavior in Flink 1.1. I guess Flink's code should be well behaved when it comes to cancelling tasks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)