You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by likai <11...@qq.com> on 2020/09/10 02:39:43 UTC
Flink 1.5.0 savepoint 失败
hi all. 我使用flink 1.5.0 在触发 savepoint失败。
共享目录:/data/emr_flink_savepoint_share/
触发命令:bin/flink savepoint feaab3ec9031bce4eab0b677693ab9f0 file:///data/emr_flink_savepoint_share <file:///data/emr_flink_savepoint_share>
Hadoop conf 默认文件系统是 hdfs://flink-hdfs
报错:
Caused by: java.lang.Exception: Could not materialize checkpoint 9381 for operator Source: KafkaJSONStringTableSource -> Map -> where: (OR(=(e, _UTF-16LE'INSERT'), =(e, _UTF-16LE'DELETE'), =(e, _UTF-16LE'UPDATE'))), select: (CAST(get_json_object(data, _UTF-16LE'pid')) AS EXPR$0, CAST(get_json_object(data, _UTF-16LE'tag_id')) AS EXPR$1, CAST(get_json_object(data, _UTF-16LE'tag_type')) AS EXPR$2, get_json_object(data, _UTF-16LE'tag_name') AS EXPR$3, CAST(get_json_object(data, _UTF-16LE'tag_version')) AS EXPR$4, CAST(get_json_object(data, _UTF-16LE'att_type')) AS EXPR$5, CAST(get_json_object(data, _UTF-16LE'is_del')) AS EXPR$6, e) -> to: Tuple2 -> Sink: Unnamed (1/1).
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:943)
... 6 more
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not open output stream for state backend
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:854)
... 5 more
Caused by: java.io.IOException: Could not open output stream for state backend
at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:360)
at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:225)
at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:161)
at org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoWriterV2.writeOperatorStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:142)
at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.write(OperatorBackendSerializationProxy.java:77)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:411)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:352)
at org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
... 7 more
Caused by: java.lang.IllegalArgumentException: Wrong FS: file:///data/emr_flink_savepoint_share/savepoint-0705a3-09a2f171a080/e2f63448-eed9-4038-a64a-e874a1a99ba1, expected: hdfs://flink-hdfs
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:645)
at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:193)
at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:105)
at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:397)
at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:393)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:393)
at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:337)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:908)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:889)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:140)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:36)
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:121)
at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:348)
... 18 more
在本地测试的时候。1.5.0 是可以触发 savepoint 的。online 环境配置的 checkpoint 目录是。hdfs:XXX. 还有一个1.5.6 的online 集群是可以触发 savepoint 的。两个集群 flink-conf.yaml 配置一致。
有没有大佬知道是什么情况,或者提供一下排查思路。十分感谢。
Re: Flink 1.5.0 savepoint 失败
Posted by Congxian Qiu <qc...@gmail.com>.
Hi
从错误栈看是 Wrong FS:
file:///data/emr_flink_savepoint_share/savepoint-0705a3-09a2f171a080/e2f63448-eed9-4038-a64a-e874a1a99ba1,
expected: hdfs://flink-hdfs 这个导致的,你能把 savepoint 写到 hdfs://flink-hdfs 这个集群吗?
Best,
Congxian
hk__lrzy <hk...@163.com> 于2020年9月11日周五 下午2:46写道:
> 代码是不是主动设置过stagebackend的地址呢
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
Re: Flink 1.5.0 savepoint 失败
Posted by hk__lrzy <hk...@163.com>.
代码是不是主动设置过stagebackend的地址呢
--
Sent from: http://apache-flink.147419.n8.nabble.com/