You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Arthur Li <li...@126.com> on 2022/04/30 04:09:51 UTC
Flink - 1.11.6 - FsStateBackend没有存储checkpoint
大家好,
我在学习Flink checkpoint时,做了一个示例没有得到期望结果,麻烦帮忙看看是哪里设置有问题。谢谢
1. 启动checkpoint
2. 设置statebackend为FsStateBackend
3. 从socketTextStream读取数据,统计单词个数
(“hello”, 5), (“world”, 1)
4. 通过触发异常,来模拟终止程序
5. 重新启动程序,那么启动之后的统计数据的初始值应该是上一次checkpoint 成功存储的值
(“hello”, 5), (“world”, 1) , 那么再次输入hello, 应该输出(“hello”, 6)
而在实际输出结果为(“hello”, 1)
环境和版本信息
1. MacOS - Oracle JDK 1.8
2. 版本信息
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.11.6</flink.version>
<flink.lang>scala</flink.lang>
<target.java.version>1.8</target.java.version>
<scala.binary.version>2.12</scala.binary.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.12.1</log4j.version>
</properties>
代码
object RestartStrategyFsStateBackend {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(1000L)
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 2))
val backendPath = "file:///Users/arthur/Documents/Workspace/java/quickstart" +
"/flink-spring/src/main/resources/backend.out/restartstrategyv3"
env.setStateBackend(new FsStateBackend(backendPath))
// socket数据源
env.socketTextStream("localhost", 7077)
.map(value => {
if (value == "restart") {
throw new RuntimeException("restart is triggered, oooops~~~~~")
}
(value, 1)
}
)
.keyBy(_._1)
.sum(1)
.print("RestartStrategy")
env.execute("RestartStrategy")
}
}
BR.
Arthur
Re: Flink - 1.11.6 - FsStateBackend没有存储checkpoint
Posted by Arthur Li <li...@126.com>.
打扰了,解决了,原因是因为启动时没有配置savepoint路径。
> 2022年4月30日 12:09,Arthur Li <li...@126.com> 写道:
>
> 大家好,
>
>
> 我在学习Flink checkpoint时,做了一个示例没有得到期望结果,麻烦帮忙看看是哪里设置有问题。谢谢
> 1. 启动checkpoint
> 2. 设置statebackend为FsStateBackend
> 3. 从socketTextStream读取数据,统计单词个数
> (“hello”, 5), (“world”, 1)
> 4. 通过触发异常,来模拟终止程序
> 5. 重新启动程序,那么启动之后的统计数据的初始值应该是上一次checkpoint 成功存储的值
> (“hello”, 5), (“world”, 1) , 那么再次输入hello, 应该输出(“hello”, 6)
> 而在实际输出结果为(“hello”, 1)
>
> 环境和版本信息
> 1. MacOS - Oracle JDK 1.8
> 2. 版本信息
> <properties>
> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
> <flink.version>1.11.6</flink.version>
> <flink.lang>scala</flink.lang>
> <target.java.version>1.8</target.java.version>
> <scala.binary.version>2.12</scala.binary.version>
> <maven.compiler.source>${target.java.version}</maven.compiler.source>
> <maven.compiler.target>${target.java.version}</maven.compiler.target>
> <log4j.version>2.12.1</log4j.version>
> </properties>
>
> 代码
> object RestartStrategyFsStateBackend {
> def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>
> env.enableCheckpointing(1000L)
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 2))
>
> val backendPath = "file:///Users/arthur/Documents/Workspace/java/quickstart" +
> "/flink-spring/src/main/resources/backend.out/restartstrategyv3"
> env.setStateBackend(new FsStateBackend(backendPath))
>
> // socket数据源
> env.socketTextStream("localhost", 7077)
> .map(value => {
> if (value == "restart") {
> throw new RuntimeException("restart is triggered, oooops~~~~~")
> }
> (value, 1)
> }
> )
> .keyBy(_._1)
> .sum(1)
> .print("RestartStrategy")
>
> env.execute("RestartStrategy")
> }
> }
>
> BR.
> Arthur
>
>
>
>
>