You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by JasonLee <17...@163.com> on 2020/07/18 01:02:49 UTC

回复:state无法从checkpoint中恢复

hi
你在UI上checkpoint那里可以看到是否从上一次成功的checkpoint恢复了 先确定一下这个问题


| |
JasonLee
|
|
邮箱:17610775726@163.com
|

Signature is customized by Netease Mail Master

在2020年07月17日 17:21,sun 写道:
你好:counts 的数据 我是在下面打印出来了 List<String&gt; list = Lists.newArrayList(counts.get()) ;
           for(String ss : list){
               System.out.println("!!!" + ss);
               log.info("!!!" + ss);
           },但是我重启服务之后,之前存的那些内容打印不出来了。
@Slf4j
public class FlatMapTestState extends RichFlatMapFunction<String, Test222&gt; {


   private transient ListState<String&gt; counts;


   @Override
   public void open(Configuration parameters) throws Exception {
       StateTtlConfig ttlConfig = StateTtlConfig
               .newBuilder(Time.minutes(30))
               .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
               .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
               .build();

       ListStateDescriptor<String&gt; lastUserLogin = new ListStateDescriptor<&gt;("lastUserLogin", String.class);
       lastUserLogin.enableTimeToLive(ttlConfig);
       counts = getRuntimeContext().getListState(lastUserLogin);
   }


   @Override
   public void flatMap(String s, Collector<Test222&gt; collector) throws Exception {
           Test222 message = JSONUtil.toObject(s, new TypeReference<Test222&gt;() {
           });

           System.out.println(DateUtil.toLongDateString(new Date()));
           log.info(DateUtil.toLongDateString(new Date()));
           counts.add(message.getId());
           List<String&gt; list = Lists.newArrayList(counts.get()) ;
           for(String ss : list){
               System.out.println("!!!" + ss);
               log.info("!!!" + ss);
           }
             log.info(DateUtil.toLongDateString(new Date()));
           System.out.println(DateUtil.toLongDateString(new Date()));
   }
}










------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <qcx978132955@gmail.com&gt;;
发送时间:&nbsp;2020年7月16日(星期四) 晚上8:16
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re: state无法从checkpoint中恢复



Hi

1 counts 的数据丢失了能否详细描述一下呢?你预期是什么,看到什么现象
2 能否把你关于 counts 的其他代码也贴一下
3. 你的作业是否从 checkpoint 恢复了呢?这个可以从 JM log 来查看
4. 如果你确定是数据有丢失的话,或许你可以使用 state-process-api[1] 看一下是序列化出去有问题,还是 restore 回来有问题

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html
Best,
Congxian


sun <1392427699@qq.com&gt; 于2020年7月16日周四 下午6:16写道:

&gt;
&gt; 配置代码env.enableCheckpointing(1000);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
&gt; //作业失败后不重启
&gt; env.setRestartStrategy(RestartStrategies.noRestart());
&gt; env.getCheckpointConfig().setCheckpointTimeout(500);
&gt;
&gt; env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
&gt;
&gt; env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
&gt; env.setStateBackend(new
&gt; RocksDBStateBackend("file:///opt/flink/flink-1.7.2/checkpoints"));
&gt;&nbsp;&nbsp; 使用状态的代码private transient ListState<String&amp;gt; counts;
&gt;
&gt;
&gt; @Override
&gt; public void open(Configuration parameters) throws Exception {
&gt;&nbsp;&nbsp;&nbsp;&nbsp; StateTtlConfig ttlConfig = StateTtlConfig
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .newBuilder(Time.minutes(30))
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
&gt;
&gt; .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .build();
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp; ListStateDescriptor<String&amp;gt; lastUserLogin = new
&gt; ListStateDescriptor<&amp;gt;("lastUserLogin", String.class);
&gt;&nbsp;&nbsp;&nbsp;&nbsp; lastUserLogin.enableTimeToLive(ttlConfig);
&gt;&nbsp;&nbsp;&nbsp;&nbsp; counts = getRuntimeContext().getListState(lastUserLogin);
&gt; }
&gt; 我重启了task managers 后。发现&nbsp; counts&nbsp; 里面的数据都丢失了