You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Aljoscha Krettek (JIRA)" <ji...@apache.org> on 2017/10/12 15:38:00 UTC

[jira] [Updated] (FLINK-7756) RocksDB state backend Checkpointing (Async and Incremental) is not working with CEP.

     [ https://issues.apache.org/jira/browse/FLINK-7756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Aljoscha Krettek updated FLINK-7756:
------------------------------------
    Fix Version/s: 1.4.0

> RocksDB state backend Checkpointing (Async and Incremental)  is not working with CEP.
> -------------------------------------------------------------------------------------
>
>                 Key: FLINK-7756
>                 URL: https://issues.apache.org/jira/browse/FLINK-7756
>             Project: Flink
>          Issue Type: Bug
>          Components: CEP, State Backends, Checkpointing, Streaming
>    Affects Versions: 1.3.2
>         Environment: Flink 1.3.2, Yarn, HDFS, RocksDB backend
>            Reporter: Shashank Agarwal
>            Priority: Blocker
>             Fix For: 1.4.0
>
>
> When i try to use RocksDBStateBackend on my staging cluster (which is using HDFS as file system) it crashes. But When i use FsStateBackend on staging (which is using HDFS as file system) it is working fine.
> On local with local file system it's working fine in both cases.
> Please check attached logs. I have around 20-25 tasks in my app.
> {code:java}
> 2017-09-29 14:21:31,639 INFO  org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state to restore for the BucketingSink (taskIdx=0).
> 2017-09-29 14:21:31,640 INFO  org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Initializing RocksDB keyed state backend from snapshot.
> 2017-09-29 14:21:32,020 INFO  org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state to restore for the BucketingSink (taskIdx=1).
> 2017-09-29 14:21:32,022 INFO  org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - Initializing RocksDB keyed state backend from snapshot.
> 2017-09-29 14:21:32,078 INFO  com.datastax.driver.core.NettyUtil                            - Found Netty's native epoll transport in the classpath, using it
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally Co-Flat Map (1/2) (b879f192c4e8aae6671cdafb3a24c00a).
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally Map (2/2) (1ea5aef6ccc7031edc6b37da2912d90b).
> 2017-09-29 14:21:34,178 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally Co-Flat Map (2/2) (4bac8e764c67520d418a4c755be23d4d).
> 2017-09-29 14:21:34,178 INFO  org.apache.flink.runtime.taskmanager.Task                     - Co-Flat Map (1/2) (b879f192c4e8aae6671cdafb3a24c00a) switched from RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 for operator Co-Flat Map (1/2).}
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator Co-Flat Map (1/2).
> 	... 6 more
> Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException
> 	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> 	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> 	at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
> 	... 5 more
> 	Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future.
> 		at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
> 		at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
> 		at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
> 		... 5 more
> 	Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException
> 		at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> 		at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> 		at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
> 		at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
> 		at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
> 		... 7 more
> 	Caused by: java.lang.IllegalStateException
> 		at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
> 		at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeSnapshot(RocksDBKeyedStateBackend.java:878)
> 		at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:353)
> 		at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:350)
> 		at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 		at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
> 		at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
> 		... 5 more
> 	[CIRCULAR REFERENCE:java.lang.IllegalStateException]
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally Map (1/2) (a06925261e74b4efdf50a30089e2b778).
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally Map (1/2) (1747902c96e63fefd977ac4d4a01d2fa).
> 2017-09-29 14:21:34,180 INFO  org.apache.flink.runtime.taskmanager.Task                     - Map (1/2) (a06925261e74b4efdf50a30089e2b778) switched from RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 for operator Map (1/2).}
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator Map (1/2).
> 	... 6 more
> Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException
> 	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> 	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> 	at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
> 	... 5 more
> 	Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future.
> 		at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
> 		at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
> 		at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
> 		... 5 more
> 	Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException
> 		at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> 		at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> 		at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
> 		at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
> 		at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
> 		... 7 more
> 	Caused by: java.lang.IllegalStateException
> 		at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
> 		at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeSnapshot(RocksDBKeyedStateBackend.java:878)
> 		at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:353)
> 		at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:350)
> 		at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 		at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
> 		at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
> 		... 5 more
> 	[CIRCULAR REFERENCE:java.lang.IllegalStateException]
> {code}
> That same printed for around 12-13 tasks. Than following logs printed :
> {code:java}
> 2017-09-29 14:21:35,039 INFO  org.apache.flink.runtime.taskmanager.Task                     - Ensuring all FileSystem streams are closed for task Source: Custom Source (2/2) (77c896e2a2063e98f399244cae21c260) [CANCELED]
> 2017-09-29 14:21:35,041 WARN  org.apache.hadoop.ipc.Client                                  - interrupted waiting to send rpc request to server
> java.lang.InterruptedException
> 	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
> 	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
> 	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:1059)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1454)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1412)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
> 	at com.sun.proxy.$Proxy12.delete(Unknown Source)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.delete(ClientNamenodeProtocolTranslatorPB.java:540)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:497)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> 	at com.sun.proxy.$Proxy13.delete(Unknown Source)
> 	at org.apache.hadoop.hdfs.DFSClient.delete(DFSClient.java:2044)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:707)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:703)
> 	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.delete(DistributedFileSystem.java:714)
> 	at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.delete(HadoopFileSystem.java:435)
> 	at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.delete(SafetyNetWrapperFileSystem.java:106)
> 	at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:324)
> 	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeMetaData(RocksDBKeyedStateBackend.java:826)
> 	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeSnapshot(RocksDBKeyedStateBackend.java:875)
> 	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:353)
> 	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:350)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> 2017-09-29 14:21:35,042 WARN  org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory  - Could not delete the checkpoint stream file hdfs://static.175.87.9.5.clients.your-server.de:8020/flink/flink-checkpoints/rocksDB/events/e10dbe09aa2ecccb22737ddce8b4dc9f/chk-2/a28796de-978a-4f1a-8ff5-5f5c654b0ffc.
> java.io.IOException: java.lang.InterruptedException
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1460)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1412)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
> 	at com.sun.proxy.$Proxy12.delete(Unknown Source)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.delete(ClientNamenodeProtocolTranslatorPB.java:540)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:497)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> 	at com.sun.proxy.$Proxy13.delete(Unknown Source)
> 	at org.apache.hadoop.hdfs.DFSClient.delete(DFSClient.java:2044)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:707)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:703)
> 	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.delete(DistributedFileSystem.java:714)
> 	at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.delete(HadoopFileSystem.java:435)
> 	at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.delete(SafetyNetWrapperFileSystem.java:106)
> 	at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:324)
> 	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeMetaData(RocksDBKeyedStateBackend.java:826)
> 	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeSnapshot(RocksDBKeyedStateBackend.java:875)
> 	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:353)
> 	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$1.call(RocksDBKeyedStateBackend.java:350)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.InterruptedException
> 	at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:404)
> 	at java.util.concurrent.FutureTask.get(FutureTask.java:191)
> 	at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:1059)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1454)
> 	... 31 more
> 2017-09-29 14:21:35,054 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to cancel task KeyedCEPPatternOperator -> Flat Map -> (Flat Map, Flat Map) (1/2) (8c6eff62d47c4a624a7554065bac36ee).
> 2017-09-29 14:21:35,055 INFO  org.apache.flink.runtime.taskmanager.Task                     - KeyedCEPPatternOperator -> Flat Map -> (Flat Map, Flat Map) (1/2) (8c6eff62d47c4a624a7554065bac36ee) switched from RUNNING to CANCELING.
> {code}
> Than same printed for 12-13 tasks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)