You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Arthur Li <li...@126.com> on 2022/04/30 04:09:51 UTC

Flink - 1.11.6 - FsStateBackend没有存储checkpoint

大家好,


我在学习Flink checkpoint时,做了一个示例没有得到期望结果,麻烦帮忙看看是哪里设置有问题。谢谢
1. 启动checkpoint
2. 设置statebackend为FsStateBackend
3. 从socketTextStream读取数据,统计单词个数
    (“hello”, 5), (“world”, 1)
4. 通过触发异常,来模拟终止程序
5. 重新启动程序,那么启动之后的统计数据的初始值应该是上一次checkpoint 成功存储的值
    (“hello”, 5), (“world”, 1) , 那么再次输入hello, 应该输出(“hello”, 6)
    而在实际输出结果为(“hello”, 1)

环境和版本信息
1. MacOS - Oracle JDK 1.8
2. 版本信息
<properties>
   <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
   <flink.version>1.11.6</flink.version>
   <flink.lang>scala</flink.lang>
   <target.java.version>1.8</target.java.version>
   <scala.binary.version>2.12</scala.binary.version>
   <maven.compiler.source>${target.java.version}</maven.compiler.source>
   <maven.compiler.target>${target.java.version}</maven.compiler.target>
   <log4j.version>2.12.1</log4j.version>
</properties>

代码
object RestartStrategyFsStateBackend {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.enableCheckpointing(1000L) 
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 2))

    val backendPath = "file:///Users/arthur/Documents/Workspace/java/quickstart" +
      "/flink-spring/src/main/resources/backend.out/restartstrategyv3"
    env.setStateBackend(new FsStateBackend(backendPath))

    // socket数据源
    env.socketTextStream("localhost", 7077)
      .map(value => {
        if (value == "restart") {
          throw new RuntimeException("restart is triggered, oooops~~~~~")
        }
        (value, 1)
      }
      )
      .keyBy(_._1)
      .sum(1)
      .print("RestartStrategy")

    env.execute("RestartStrategy")
  }
}

BR.
Arthur





 

Re: Flink - 1.11.6 - FsStateBackend没有存储checkpoint

Posted by Arthur Li <li...@126.com>.
打扰了,解决了,原因是因为启动时没有配置savepoint路径。


> 2022年4月30日 12:09,Arthur Li <li...@126.com> 写道:
> 
> 大家好,
> 
> 
> 我在学习Flink checkpoint时,做了一个示例没有得到期望结果,麻烦帮忙看看是哪里设置有问题。谢谢
> 1. 启动checkpoint
> 2. 设置statebackend为FsStateBackend
> 3. 从socketTextStream读取数据,统计单词个数
>    (“hello”, 5), (“world”, 1)
> 4. 通过触发异常,来模拟终止程序
> 5. 重新启动程序,那么启动之后的统计数据的初始值应该是上一次checkpoint 成功存储的值
>    (“hello”, 5), (“world”, 1) , 那么再次输入hello, 应该输出(“hello”, 6)
>    而在实际输出结果为(“hello”, 1)
> 
> 环境和版本信息
> 1. MacOS - Oracle JDK 1.8
> 2. 版本信息
> <properties>
>   <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>   <flink.version>1.11.6</flink.version>
>   <flink.lang>scala</flink.lang>
>   <target.java.version>1.8</target.java.version>
>   <scala.binary.version>2.12</scala.binary.version>
>   <maven.compiler.source>${target.java.version}</maven.compiler.source>
>   <maven.compiler.target>${target.java.version}</maven.compiler.target>
>   <log4j.version>2.12.1</log4j.version>
> </properties>
> 
> 代码
> object RestartStrategyFsStateBackend {
>  def main(args: Array[String]): Unit = {
>    val env = StreamExecutionEnvironment.getExecutionEnvironment
> 
>    env.enableCheckpointing(1000L) 
>    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 2))
> 
>    val backendPath = "file:///Users/arthur/Documents/Workspace/java/quickstart" +
>      "/flink-spring/src/main/resources/backend.out/restartstrategyv3"
>    env.setStateBackend(new FsStateBackend(backendPath))
> 
>    // socket数据源
>    env.socketTextStream("localhost", 7077)
>      .map(value => {
>        if (value == "restart") {
>          throw new RuntimeException("restart is triggered, oooops~~~~~")
>        }
>        (value, 1)
>      }
>      )
>      .keyBy(_._1)
>      .sum(1)
>      .print("RestartStrategy")
> 
>    env.execute("RestartStrategy")
>  }
> }
> 
> BR.
> Arthur
> 
> 
> 
> 
>