You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by sun <13...@qq.com> on 2020/09/03 08:13:30 UTC

回复:无法从checkpoint中恢复state

啥?




------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <13162790856@163.com&gt;;
发送时间:&nbsp;2020年9月3日(星期四) 下午4:10
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re:无法从checkpoint中恢复state















再启动服务的时候 需要指定checkpoint回复地址,你这里只是指定了做checkpint地址 





在 2020-09-03 16:03:41,"sun" <1392427699@qq.com&gt; 写道:
&gt;你好,我有2个问题
&gt;
&gt;1:每次重启服务,checkpoint的目录中chk-&amp;nbsp; 总是从chk-1开始,chk-2 ........,没有从上次的编号开始
&gt;
&gt;2:重启服务后,没有从checkpoint中恢复state的数据
&gt;
&gt;下面是我的配置,我是在本地调试的,单机
&gt;
&gt;
&gt;
&gt;final StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
&gt;
&gt;//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; StateBackend stateBackend = new RocksDBStateBackend("hdfs://10.100.51.101:9000/flink/checkpoints",true);
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; StateBackend stateBackend = new FsStateBackend("file:///flink/checkpoints");
&gt;//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; StateBackend stateBackend = new MemoryStateBackend();
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; streamExecutionEnvironment.setStateBackend(stateBackend);
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; streamExecutionEnvironment.enableCheckpointing(1000);
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; streamExecutionEnvironment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; streamExecutionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; streamExecutionEnvironment.getCheckpointConfig().setCheckpointTimeout(60000);
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; streamExecutionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; streamExecutionEnvironment.getCheckpointConfig()
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);