You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Sihua Zhou (JIRA)" <ji...@apache.org> on 2017/07/14 14:42:00 UTC

[jira] [Comment Edited] (FLINK-7180) CoGroupStream perform checkpoint failed

    [ https://issues.apache.org/jira/browse/FLINK-7180?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16087397#comment-16087397 ] 

Sihua Zhou edited comment on FLINK-7180 at 7/14/17 2:41 PM:
------------------------------------------------------------

[~aljoscha] Sorry to late reply, i was on the train all the time. Here is the stack trace info.

AsynchronousException{java.lang.Exception: Could not materialize checkpoint 1 for operator TriggerWindow(sloth.SlothJoinWindow@e45f292, ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@e42116a0}, sloth.SlothWindowTrigger@31a5c39e, WindowedStream.apply(CoGroupedStreams.java:300)) (2/4).}
	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:963)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not materialize checkpoint 1 for operator TriggerWindow(sloth.SlothJoinWindow@e45f292, ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@e42116a0}, sloth.SlothWindowTrigger@31a5c39e, WindowedStream.apply(CoGroupedStreams.java:300)) (2/4).
	... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.UnsupportedOperationException: This serializer is not registered for managed state.
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:893)
	... 5 more
	Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future.
		at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
		at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1018)
		at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:957)
		... 5 more
	Caused by: java.util.concurrent.ExecutionException: java.lang.UnsupportedOperationException: This serializer is not registered for managed state.
		at java.util.concurrent.FutureTask.report(FutureTask.java:122)
		at java.util.concurrent.FutureTask.get(FutureTask.java:192)
		at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
		at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
		at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
		... 7 more
	Caused by: java.lang.UnsupportedOperationException: This serializer is not registered for managed state.
		at org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionSerializer.snapshotConfiguration(CoGroupedStreams.java:555)
		at org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.<init>(CompositeTypeSerializerConfigSnapshot.java:53)
		at org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot.<init>(CollectionSerializerConfigSnapshot.java:39)
		at org.apache.flink.api.common.typeutils.base.ListSerializer.snapshotConfiguration(ListSerializer.java:183)
		at org.apache.flink.api.common.typeutils.base.ListSerializer.snapshotConfiguration(ListSerializer.java:47)
		at org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.snapshot(RegisteredKeyedBackendStateMetaInfo.java:71)
		at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.writeKVStateMetaData(RocksDBKeyedStateBackend.java:591)
		at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.writeDBSnapshot(RocksDBKeyedStateBackend.java:510)
		at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:407)
		at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:389)
		at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
		at java.util.concurrent.FutureTask.run(FutureTask.java:266)
		at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
		at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:893)
		... 5 more
	[CIRCULAR REFERENCE:java.lang.UnsupportedOperationException: This serializer is not registered for managed state.]


was (Author: sihuazhou):
[~aljoscha] Sorry to reply late reply, i was on the train all the time. Here is the stack trace info.

AsynchronousException{java.lang.Exception: Could not materialize checkpoint 1 for operator TriggerWindow(sloth.SlothJoinWindow@e45f292, ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@e42116a0}, sloth.SlothWindowTrigger@31a5c39e, WindowedStream.apply(CoGroupedStreams.java:300)) (2/4).}
	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:963)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not materialize checkpoint 1 for operator TriggerWindow(sloth.SlothJoinWindow@e45f292, ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@e42116a0}, sloth.SlothWindowTrigger@31a5c39e, WindowedStream.apply(CoGroupedStreams.java:300)) (2/4).
	... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.UnsupportedOperationException: This serializer is not registered for managed state.
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:893)
	... 5 more
	Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future.
		at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
		at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1018)
		at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:957)
		... 5 more
	Caused by: java.util.concurrent.ExecutionException: java.lang.UnsupportedOperationException: This serializer is not registered for managed state.
		at java.util.concurrent.FutureTask.report(FutureTask.java:122)
		at java.util.concurrent.FutureTask.get(FutureTask.java:192)
		at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
		at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
		at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
		... 7 more
	Caused by: java.lang.UnsupportedOperationException: This serializer is not registered for managed state.
		at org.apache.flink.streaming.api.datastream.CoGroupedStreams$UnionSerializer.snapshotConfiguration(CoGroupedStreams.java:555)
		at org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.<init>(CompositeTypeSerializerConfigSnapshot.java:53)
		at org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot.<init>(CollectionSerializerConfigSnapshot.java:39)
		at org.apache.flink.api.common.typeutils.base.ListSerializer.snapshotConfiguration(ListSerializer.java:183)
		at org.apache.flink.api.common.typeutils.base.ListSerializer.snapshotConfiguration(ListSerializer.java:47)
		at org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.snapshot(RegisteredKeyedBackendStateMetaInfo.java:71)
		at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.writeKVStateMetaData(RocksDBKeyedStateBackend.java:591)
		at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.writeDBSnapshot(RocksDBKeyedStateBackend.java:510)
		at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:407)
		at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:389)
		at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
		at java.util.concurrent.FutureTask.run(FutureTask.java:266)
		at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
		at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:893)
		... 5 more
	[CIRCULAR REFERENCE:java.lang.UnsupportedOperationException: This serializer is not registered for managed state.]

> CoGroupStream perform checkpoint failed
> ---------------------------------------
>
>                 Key: FLINK-7180
>                 URL: https://issues.apache.org/jira/browse/FLINK-7180
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.3.1
>            Reporter: Sihua Zhou
>            Assignee: Sihua Zhou
>
> When using the CoGroup api and enable the checkpoint, Job will failed when performing checkpoint, e.g:
> {code:java}
>         input1.coGroup(input2)
>                 .where(new KeySelector<String, String>() {
>                     @Override
>                     public String getKey(String value) throws Exception {
>                         return value;
>                     }
>                 })
>                 .equalTo(new KeySelector<String, String>() {
>                     @Override
>                     public String getKey(String value) throws Exception {
>                         return value;
>                     }
>                 })
>                 .window(SlothJoinWindow.create())
>                 .trigger(new SlothWindowTrigger(0))
>                 .apply(new CoGroupFunction<String, String, String>() {
>                     @Override
>                     public void coGroup(Iterable<String> first, Iterable<String> second, Collector<String> out) throws Exception {
>                         String outputStr = "first:" + first + " , second:" + second;
>                         System.out.println(outputStr);
>                         out.collect(outputStr);
>                     }
>                 })
>                 .keyBy(new KeySelector<String, String>() {
>                     @Override
>                     public String getKey(String value) throws Exception {
>                         return value;
>                     }
>                 })
>                 .print();
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)