You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by likai <11...@qq.com> on 2020/09/10 02:39:43 UTC

Flink 1.5.0 savepoint 失败

hi all. 我使用flink 1.5.0 在触发 savepoint失败。
共享目录:/data/emr_flink_savepoint_share/
触发命令:bin/flink savepoint feaab3ec9031bce4eab0b677693ab9f0 file:///data/emr_flink_savepoint_share <file:///data/emr_flink_savepoint_share>
Hadoop conf 默认文件系统是  hdfs://flink-hdfs
报错:
Caused by: java.lang.Exception: Could not materialize checkpoint 9381 for operator Source: KafkaJSONStringTableSource -> Map -> where: (OR(=(e, _UTF-16LE'INSERT'), =(e, _UTF-16LE'DELETE'), =(e, _UTF-16LE'UPDATE'))), select: (CAST(get_json_object(data, _UTF-16LE'pid')) AS EXPR$0, CAST(get_json_object(data, _UTF-16LE'tag_id')) AS EXPR$1, CAST(get_json_object(data, _UTF-16LE'tag_type')) AS EXPR$2, get_json_object(data, _UTF-16LE'tag_name') AS EXPR$3, CAST(get_json_object(data, _UTF-16LE'tag_version')) AS EXPR$4, CAST(get_json_object(data, _UTF-16LE'att_type')) AS EXPR$5, CAST(get_json_object(data, _UTF-16LE'is_del')) AS EXPR$6, e) -> to: Tuple2 -> Sink: Unnamed (1/1).
	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:943)
	... 6 more
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not open output stream for state backend
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
	at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:854)
	... 5 more
Caused by: java.io.IOException: Could not open output stream for state backend
	at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:360)
	at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:225)
	at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209)
	at java.io.DataOutputStream.write(DataOutputStream.java:107)
	at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:161)
	at org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoWriterV2.writeOperatorStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:142)
	at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.write(OperatorBackendSerializationProxy.java:77)
	at org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:411)
	at org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:352)
	at org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
	... 7 more
Caused by: java.lang.IllegalArgumentException: Wrong FS: file:///data/emr_flink_savepoint_share/savepoint-0705a3-09a2f171a080/e2f63448-eed9-4038-a64a-e874a1a99ba1, expected: hdfs://flink-hdfs
	at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:645)
	at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:193)
	at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:105)
	at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:397)
	at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:393)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:393)
	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:337)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:908)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:889)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786)
	at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:140)
	at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:36)
	at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:121)
	at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:348)
	... 18 more
在本地测试的时候。1.5.0 是可以触发 savepoint 的。online 环境配置的 checkpoint 目录是。hdfs:XXX. 还有一个1.5.6 的online 集群是可以触发 savepoint 的。两个集群 flink-conf.yaml 配置一致。
有没有大佬知道是什么情况,或者提供一下排查思路。十分感谢。





Re: Flink 1.5.0 savepoint 失败

Posted by Congxian Qiu <qc...@gmail.com>.
Hi
   从错误栈看是  Wrong FS:
file:///data/emr_flink_savepoint_share/savepoint-0705a3-09a2f171a080/e2f63448-eed9-4038-a64a-e874a1a99ba1,
expected: hdfs://flink-hdfs 这个导致的,你能把 savepoint 写到 hdfs://flink-hdfs 这个集群吗?
Best,
Congxian


hk__lrzy <hk...@163.com> 于2020年9月11日周五 下午2:46写道:

> 代码是不是主动设置过stagebackend的地址呢
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>

Re: Flink 1.5.0 savepoint 失败

Posted by hk__lrzy <hk...@163.com>.
代码是不是主动设置过stagebackend的地址呢



--
Sent from: http://apache-flink.147419.n8.nabble.com/