You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by sun <13...@qq.com> on 2020/07/16 10:16:45 UTC

state无法从checkpoint中恢复

配置代码env.enableCheckpointing(1000);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//作业失败后不重启
env.setRestartStrategy(RestartStrategies.noRestart());
env.getCheckpointConfig().setCheckpointTimeout(500);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setStateBackend(new RocksDBStateBackend("file:///opt/flink/flink-1.7.2/checkpoints"));            使用状态的代码private transient ListState<String&gt; counts;


@Override
public void open(Configuration parameters) throws Exception {
    StateTtlConfig ttlConfig = StateTtlConfig
            .newBuilder(Time.minutes(30))
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .build();

    ListStateDescriptor<String&gt; lastUserLogin = new ListStateDescriptor<&gt;("lastUserLogin", String.class);
    lastUserLogin.enableTimeToLive(ttlConfig);
    counts = getRuntimeContext().getListState(lastUserLogin);
}
我重启了task managers 后。发现  counts  里面的数据都丢失了

回复:state无法从checkpoint中恢复

Posted by JasonLee <17...@163.com>.
hi
你在UI上checkpoint那里可以看到是否从上一次成功的checkpoint恢复了 先确定一下这个问题


| |
JasonLee
|
|
邮箱:17610775726@163.com
|

Signature is customized by Netease Mail Master

在2020年07月17日 17:21,sun 写道:
你好:counts 的数据 我是在下面打印出来了 List<String&gt; list = Lists.newArrayList(counts.get()) ;
           for(String ss : list){
               System.out.println("!!!" + ss);
               log.info("!!!" + ss);
           },但是我重启服务之后,之前存的那些内容打印不出来了。
@Slf4j
public class FlatMapTestState extends RichFlatMapFunction<String, Test222&gt; {


   private transient ListState<String&gt; counts;


   @Override
   public void open(Configuration parameters) throws Exception {
       StateTtlConfig ttlConfig = StateTtlConfig
               .newBuilder(Time.minutes(30))
               .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
               .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
               .build();

       ListStateDescriptor<String&gt; lastUserLogin = new ListStateDescriptor<&gt;("lastUserLogin", String.class);
       lastUserLogin.enableTimeToLive(ttlConfig);
       counts = getRuntimeContext().getListState(lastUserLogin);
   }


   @Override
   public void flatMap(String s, Collector<Test222&gt; collector) throws Exception {
           Test222 message = JSONUtil.toObject(s, new TypeReference<Test222&gt;() {
           });

           System.out.println(DateUtil.toLongDateString(new Date()));
           log.info(DateUtil.toLongDateString(new Date()));
           counts.add(message.getId());
           List<String&gt; list = Lists.newArrayList(counts.get()) ;
           for(String ss : list){
               System.out.println("!!!" + ss);
               log.info("!!!" + ss);
           }
             log.info(DateUtil.toLongDateString(new Date()));
           System.out.println(DateUtil.toLongDateString(new Date()));
   }
}










------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <qcx978132955@gmail.com&gt;;
发送时间:&nbsp;2020年7月16日(星期四) 晚上8:16
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re: state无法从checkpoint中恢复



Hi

1 counts 的数据丢失了能否详细描述一下呢?你预期是什么,看到什么现象
2 能否把你关于 counts 的其他代码也贴一下
3. 你的作业是否从 checkpoint 恢复了呢?这个可以从 JM log 来查看
4. 如果你确定是数据有丢失的话,或许你可以使用 state-process-api[1] 看一下是序列化出去有问题,还是 restore 回来有问题

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html
Best,
Congxian


sun <1392427699@qq.com&gt; 于2020年7月16日周四 下午6:16写道:

&gt;
&gt; 配置代码env.enableCheckpointing(1000);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
&gt; //作业失败后不重启
&gt; env.setRestartStrategy(RestartStrategies.noRestart());
&gt; env.getCheckpointConfig().setCheckpointTimeout(500);
&gt;
&gt; env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
&gt;
&gt; env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
&gt; env.setStateBackend(new
&gt; RocksDBStateBackend("file:///opt/flink/flink-1.7.2/checkpoints"));
&gt;&nbsp;&nbsp; 使用状态的代码private transient ListState<String&amp;gt; counts;
&gt;
&gt;
&gt; @Override
&gt; public void open(Configuration parameters) throws Exception {
&gt;&nbsp;&nbsp;&nbsp;&nbsp; StateTtlConfig ttlConfig = StateTtlConfig
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .newBuilder(Time.minutes(30))
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
&gt;
&gt; .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .build();
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp; ListStateDescriptor<String&amp;gt; lastUserLogin = new
&gt; ListStateDescriptor<&amp;gt;("lastUserLogin", String.class);
&gt;&nbsp;&nbsp;&nbsp;&nbsp; lastUserLogin.enableTimeToLive(ttlConfig);
&gt;&nbsp;&nbsp;&nbsp;&nbsp; counts = getRuntimeContext().getListState(lastUserLogin);
&gt; }
&gt; 我重启了task managers 后。发现&nbsp; counts&nbsp; 里面的数据都丢失了

回复: state无法从checkpoint中恢复

Posted by sun <13...@qq.com>.
JM日志有点不熟悉,不知道是否从 checkpoint 恢复了


18:08:07.615 [Checkpoint Timer] INFO&nbsp; org.apache.flink.runtime.checkpoint.CheckpointCoordinator&nbsp; - Triggering checkpoint 116 @ 1595239687615 for job acd456ff6f2f9f59ee89b126503c20f0.
18:08:07.628 [flink-akka.actor.default-dispatcher-420] INFO&nbsp; org.apache.flink.runtime.checkpoint.CheckpointCoordinator&nbsp; - Completed checkpoint 116 for job acd456ff6f2f9f59ee89b126503c20f0 (74305 bytes in 13 ms).
18:08:08.615 [Checkpoint Timer] INFO&nbsp; org.apache.flink.runtime.checkpoint.CheckpointCoordinator&nbsp; - Triggering checkpoint 117 @ 1595239688615 for job acd456ff6f2f9f59ee89b126503c20f0.
18:08:08.626 [flink-akka.actor.default-dispatcher-420] INFO&nbsp; org.apache.flink.runtime.checkpoint.CheckpointCoordinator&nbsp; - Completed checkpoint 117 for job acd456ff6f2f9f59ee89b126503c20f0 (74305 bytes in 11 ms).
18:08:09.354 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - Job ty-bi-flink (acd456ff6f2f9f59ee89b126503c20f0) switched from state RUNNING to CANCELLING.
18:08:09.354 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - Source: Custom Source (1/4) (4d8a61b0a71ff37d1e7d7da578878e55) switched from RUNNING to CANCELING.
18:08:09.354 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - Source: Custom Source (2/4) (97909ed1fcf34f658a3b6d9b3e8ee412) switched from RUNNING to CANCELING.
18:08:09.354 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - Source: Custom Source (3/4) (7be70346e0c7fc8f2b2224ca3a0907f0) switched from RUNNING to CANCELING.
18:08:09.354 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - Source: Custom Source (4/4) (4df2905ee56b06d9fc384e4beb228015) switched from RUNNING to CANCELING.
18:08:09.355 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - map_sub_order_detail -&gt; Sink: Print to Std. Out (1/4) (87d4c7af7d5fb5f81bae48aae77de473) switched from RUNNING to CANCELING.
18:08:09.355 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - map_sub_order_detail -&gt; Sink: Print to Std. Out (2/4) (7dfdd54faf11bc364fb6afc3dfdfb4dd) switched from RUNNING to CANCELING.
18:08:09.355 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - map_sub_order_detail -&gt; Sink: Print to Std. Out (3/4) (9035e059e465b8c520edf37ec734b43e) switched from RUNNING to CANCELING.
18:08:09.355 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - map_sub_order_detail -&gt; Sink: Print to Std. Out (4/4) (e6ff47b0da505b2aa4d775d7821b8356) switched from RUNNING to CANCELING.
18:08:09.377 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - map_sub_order_detail -&gt; Sink: Print to Std. Out (4/4) (e6ff47b0da505b2aa4d775d7821b8356) switched from CANCELING to CANCELED.
18:08:09.377 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - map_sub_order_detail -&gt; Sink: Print to Std. Out (3/4) (9035e059e465b8c520edf37ec734b43e) switched from CANCELING to CANCELED.
18:08:09.378 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - map_sub_order_detail -&gt; Sink: Print to Std. Out (2/4) (7dfdd54faf11bc364fb6afc3dfdfb4dd) switched from CANCELING to CANCELED.
18:08:09.378 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - map_sub_order_detail -&gt; Sink: Print to Std. Out (1/4) (87d4c7af7d5fb5f81bae48aae77de473) switched from CANCELING to CANCELED.
18:08:09.378 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - Source: Custom Source (1/4) (4d8a61b0a71ff37d1e7d7da578878e55) switched from CANCELING to CANCELED.
18:08:09.379 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - Source: Custom Source (2/4) (97909ed1fcf34f658a3b6d9b3e8ee412) switched from CANCELING to CANCELED.
18:08:09.379 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - Source: Custom Source (3/4) (7be70346e0c7fc8f2b2224ca3a0907f0) switched from CANCELING to CANCELED.
18:08:09.381 [flink-akka.actor.default-dispatcher-416] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - Source: Custom Source (4/4) (4df2905ee56b06d9fc384e4beb228015) switched from CANCELING to CANCELED.
18:08:09.381 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - Job ty-bi-flink (acd456ff6f2f9f59ee89b126503c20f0) switched from state CANCELLING to CANCELED.
18:08:09.381 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.checkpoint.CheckpointCoordinator&nbsp; - Stopping checkpoint coordinator for job acd456ff6f2f9f59ee89b126503c20f0.
18:08:09.381 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; o.a.f.runtime.checkpoint.StandaloneCompletedCheckpointStore&nbsp; - Shutting down
18:08:09.381 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.checkpoint.CompletedCheckpoint&nbsp; - Checkpoint with ID 117 at 'file:/opt/flink/flink-1.7.2/checkpoints/acd456ff6f2f9f59ee89b126503c20f0/chk-117' not discarded.
18:08:09.382 [flink-akka.actor.default-dispatcher-427] INFO&nbsp; org.apache.flink.runtime.dispatcher.StandaloneDispatcher&nbsp; - Job acd456ff6f2f9f59ee89b126503c20f0 reached globally terminal state CANCELED.
18:08:09.384 [flink-akka.actor.default-dispatcher-416] INFO&nbsp; org.apache.flink.runtime.jobmaster.JobMaster&nbsp; - Stopping the JobMaster for job ty-bi-flink(acd456ff6f2f9f59ee89b126503c20f0).
18:08:09.385 [flink-akka.actor.default-dispatcher-416] INFO&nbsp; org.apache.flink.runtime.jobmaster.JobMaster&nbsp; - Close ResourceManager connection 7f7791cdc957a13cfaf639062c495fb9: JobManager is shutting down..
18:08:09.385 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.jobmaster.slotpool.SlotPool&nbsp; - Suspending SlotPool.
18:08:09.385 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.jobmaster.slotpool.SlotPool&nbsp; - Stopping SlotPool.
18:08:09.385 [flink-akka.actor.default-dispatcher-416] INFO&nbsp; o.a.flink.runtime.resourcemanager.StandaloneResourceManager&nbsp; - Disconnect job manager 00000000000000000000000000000000@akka.tcp://flink@rcx51101:6123/user/jobmanager_4 for job acd456ff6f2f9f59ee89b126503c20f0 from the resource manager.
18:08:09.385 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.jobmaster.JobManagerRunner&nbsp; - JobManagerRunner already shutdown.
18:08:33.384 [flink-rest-server-netty-worker-thread-4] WARN&nbsp; org.apache.flink.runtime.webmonitor.handlers.JarRunHandler&nbsp; - Configuring the job submission via query parameters is deprecated. Please migrate to submitting a JSON request instead.
18:08:34.205 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.dispatcher.StandaloneDispatcher&nbsp; - Submitting job 6dbecb3e4f536c2c92ca7931cba54fd2 (ty-bi-flink).
18:08:34.205 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.rpc.akka.AkkaRpcService&nbsp; - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/jobmanager_5 .
18:08:34.206 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.jobmaster.JobMaster&nbsp; - Initializing job ty-bi-flink (6dbecb3e4f536c2c92ca7931cba54fd2).
18:08:34.206 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.jobmaster.JobMaster&nbsp; - Using restart strategy NoRestartStrategy for ty-bi-flink (6dbecb3e4f536c2c92ca7931cba54fd2).
18:08:34.206 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.rpc.akka.AkkaRpcService&nbsp; - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.slotpool.SlotPool at akka://flink/user/c8a89ca4-afcc-41c0-b121-bbfe4354e502 .
18:08:34.206 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - Job recovers via failover strategy: full graph restart
18:08:34.206 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.jobmaster.JobMaster&nbsp; - Running initialization on master for job ty-bi-flink (6dbecb3e4f536c2c92ca7931cba54fd2).
18:08:34.206 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.jobmaster.JobMaster&nbsp; - Successfully ran initialization on master in 0 ms.
18:08:34.207 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.jobmaster.JobMaster&nbsp; - Using application-defined state backend: RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 'file:/opt/flink/flink-1.7.2/checkpoints', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1), localRocksDbDirectories=null, enableIncrementalCheckpointing=UNDEFINED}
18:08:34.207 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.jobmaster.JobMaster&nbsp; - Configuring application-defined state backend with job/cluster config
18:08:34.208 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.jobmaster.JobManagerRunner&nbsp; - JobManager runner for job ty-bi-flink (6dbecb3e4f536c2c92ca7931cba54fd2) was granted leadership with session id 00000000-0000-0000-0000-000000000000 at akka.tcp://flink@rcx51101:6123/user/jobmanager_5.
18:08:34.208 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.jobmaster.JobMaster&nbsp; - Starting execution of job ty-bi-flink (6dbecb3e4f536c2c92ca7931cba54fd2)
18:08:34.208 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - Job ty-bi-flink (6dbecb3e4f536c2c92ca7931cba54fd2) switched from state CREATED to RUNNING.
18:08:34.208 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - Source: Custom Source (1/4) (c06f0e753f644bdbcfe50cc8d2364cf6) switched from CREATED to SCHEDULED.
18:08:34.208 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - Source: Custom Source (2/4) (5436dd5759d18472fcf171f5df9d9bc9) switched from CREATED to SCHEDULED.
18:08:34.208 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - Source: Custom Source (3/4) (2a8cf04be945d59a70a3d82f50b38cd6) switched from CREATED to SCHEDULED.
18:08:34.208 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - Source: Custom Source (4/4) (9797fe0ec397922dff0c8bde4fb89ba2) switched from CREATED to SCHEDULED.
18:08:34.208 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - map_sub_order_detail -&gt; Sink: Print to Std. Out (1/4) (4127cdcd8ad7bd2011b7f8a8330663b9) switched from CREATED to SCHEDULED.
18:08:34.208 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - map_sub_order_detail -&gt; Sink: Print to Std. Out (2/4) (c0e50cbfbab0b29973cc517056f3f561) switched from CREATED to SCHEDULED.
18:08:34.208 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - map_sub_order_detail -&gt; Sink: Print to Std. Out (3/4) (989517f5535736062e6ce870e30742ee) switched from CREATED to SCHEDULED.
18:08:34.208 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - map_sub_order_detail -&gt; Sink: Print to Std. Out (4/4) (eaf47a632d3e735f1341e1d6d4ec7b7f) switched from CREATED to SCHEDULED.
18:08:34.208 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.jobmaster.JobMaster&nbsp; - Connecting to ResourceManager akka.tcp://flink@rcx51101:6123/user/resourcemanager(00000000000000000000000000000000)
18:08:34.209 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.jobmaster.slotpool.SlotPool&nbsp; - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{092c50cc73f659cbca805205e07b239c}]
18:08:34.209 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.jobmaster.slotpool.SlotPool&nbsp; - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{16e0d6c68cbbf62c056758903c129661}]
18:08:34.209 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.jobmaster.slotpool.SlotPool&nbsp; - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{f727b58cc9c5abe1627216c5973f98b5}]
18:08:34.209 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.jobmaster.slotpool.SlotPool&nbsp; - Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{bb8a60407b3fa9329ccc1ae8454bf239}]
18:08:34.209 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.jobmaster.JobMaster&nbsp; - Resolved ResourceManager address, beginning registration
18:08:34.209 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.jobmaster.JobMaster&nbsp; - Registration at ResourceManager attempt 1 (timeout=100ms)
18:08:34.209 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; o.a.flink.runtime.resourcemanager.StandaloneResourceManager&nbsp; - Registering job manager 00000000000000000000000000000000@akka.tcp://flink@rcx51101:6123/user/jobmanager_5 for job 6dbecb3e4f536c2c92ca7931cba54fd2.
18:08:34.209 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; o.a.flink.runtime.resourcemanager.StandaloneResourceManager&nbsp; - Registered job manager 00000000000000000000000000000000@akka.tcp://flink@rcx51101:6123/user/jobmanager_5 for job 6dbecb3e4f536c2c92ca7931cba54fd2.
18:08:34.209 [flink-akka.actor.default-dispatcher-420] INFO&nbsp; org.apache.flink.runtime.jobmaster.JobMaster&nbsp; - JobManager successfully registered at ResourceManager, leader id: 00000000000000000000000000000000.
18:08:34.209 [flink-akka.actor.default-dispatcher-420] INFO&nbsp; org.apache.flink.runtime.jobmaster.slotpool.SlotPool&nbsp; - Requesting new slot [SlotRequestId{bb8a60407b3fa9329ccc1ae8454bf239}] and profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager.
18:08:34.210 [flink-akka.actor.default-dispatcher-420] INFO&nbsp; org.apache.flink.runtime.jobmaster.slotpool.SlotPool&nbsp; - Requesting new slot [SlotRequestId{f727b58cc9c5abe1627216c5973f98b5}] and profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager.
18:08:34.210 [flink-akka.actor.default-dispatcher-415] INFO&nbsp; o.a.flink.runtime.resourcemanager.StandaloneResourceManager&nbsp; - Request slot with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} for job 6dbecb3e4f536c2c92ca7931cba54fd2 with allocation id AllocationID{db118025945481bba66b8ffa734e4202}.
18:08:34.210 [flink-akka.actor.default-dispatcher-420] INFO&nbsp; org.apache.flink.runtime.jobmaster.slotpool.SlotPool&nbsp; - Requesting new slot [SlotRequestId{16e0d6c68cbbf62c056758903c129661}] and profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager.
18:08:34.210 [flink-akka.actor.default-dispatcher-420] INFO&nbsp; org.apache.flink.runtime.jobmaster.slotpool.SlotPool&nbsp; - Requesting new slot [SlotRequestId{092c50cc73f659cbca805205e07b239c}] and profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager.
18:08:34.210 [flink-akka.actor.default-dispatcher-415] INFO&nbsp; o.a.flink.runtime.resourcemanager.StandaloneResourceManager&nbsp; - Request slot with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} for job 6dbecb3e4f536c2c92ca7931cba54fd2 with allocation id AllocationID{d5147bcc731a51f09bdb32e366d93b02}.
18:08:34.210 [flink-akka.actor.default-dispatcher-415] INFO&nbsp; o.a.flink.runtime.resourcemanager.StandaloneResourceManager&nbsp; - Request slot with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} for job 6dbecb3e4f536c2c92ca7931cba54fd2 with allocation id AllocationID{14001529dd0d04ebbd169241cb59f918}.
18:08:34.210 [flink-akka.actor.default-dispatcher-415] INFO&nbsp; o.a.flink.runtime.resourcemanager.StandaloneResourceManager&nbsp; - Request slot with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} for job 6dbecb3e4f536c2c92ca7931cba54fd2 with allocation id AllocationID{bb02373b91c626c6fde666512d5b62ed}.
18:08:34.219 [flink-akka.actor.default-dispatcher-378] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - Source: Custom Source (1/4) (c06f0e753f644bdbcfe50cc8d2364cf6) switched from SCHEDULED to DEPLOYING.
18:08:34.219 [flink-akka.actor.default-dispatcher-378] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - Deploying Source: Custom Source (1/4) (attempt #0) to rcx51102
18:08:34.219 [flink-akka.actor.default-dispatcher-378] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - Source: Custom Source (3/4) (2a8cf04be945d59a70a3d82f50b38cd6) switched from SCHEDULED to DEPLOYING.
18:08:34.219 [flink-akka.actor.default-dispatcher-378] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - Deploying Source: Custom Source (3/4) (attempt #0) to rcx51102
18:08:34.219 [flink-akka.actor.default-dispatcher-378] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - Source: Custom Source (2/4) (5436dd5759d18472fcf171f5df9d9bc9) switched from SCHEDULED to DEPLOYING.
18:08:34.219 [flink-akka.actor.default-dispatcher-378] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - Deploying Source: Custom Source (2/4) (attempt #0) to rcx51102
18:08:34.219 [flink-akka.actor.default-dispatcher-378] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - Source: Custom Source (4/4) (9797fe0ec397922dff0c8bde4fb89ba2) switched from SCHEDULED to DEPLOYING.
18:08:34.219 [flink-akka.actor.default-dispatcher-378] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - Deploying Source: Custom Source (4/4) (attempt #0) to rcx51102
18:08:34.220 [flink-akka.actor.default-dispatcher-378] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - map_sub_order_detail -&gt; Sink: Print to Std. Out (4/4) (eaf47a632d3e735f1341e1d6d4ec7b7f) switched from SCHEDULED to DEPLOYING.
18:08:34.220 [flink-akka.actor.default-dispatcher-378] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - Deploying map_sub_order_detail -&gt; Sink: Print to Std. Out (4/4) (attempt #0) to rcx51102
18:08:34.222 [flink-akka.actor.default-dispatcher-378] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - map_sub_order_detail -&gt; Sink: Print to Std. Out (3/4) (989517f5535736062e6ce870e30742ee) switched from SCHEDULED to DEPLOYING.
18:08:34.222 [flink-akka.actor.default-dispatcher-378] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - Deploying map_sub_order_detail -&gt; Sink: Print to Std. Out (3/4) (attempt #0) to rcx51102
18:08:34.222 [flink-akka.actor.default-dispatcher-378] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - map_sub_order_detail -&gt; Sink: Print to Std. Out (2/4) (c0e50cbfbab0b29973cc517056f3f561) switched from SCHEDULED to DEPLOYING.
18:08:34.222 [flink-akka.actor.default-dispatcher-378] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - Deploying map_sub_order_detail -&gt; Sink: Print to Std. Out (2/4) (attempt #0) to rcx51102
18:08:34.222 [flink-akka.actor.default-dispatcher-378] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - map_sub_order_detail -&gt; Sink: Print to Std. Out (1/4) (4127cdcd8ad7bd2011b7f8a8330663b9) switched from SCHEDULED to DEPLOYING.
18:08:34.222 [flink-akka.actor.default-dispatcher-378] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - Deploying map_sub_order_detail -&gt; Sink: Print to Std. Out (1/4) (attempt #0) to rcx51102
18:08:34.506 [Checkpoint Timer] INFO&nbsp; org.apache.flink.runtime.checkpoint.CheckpointCoordinator&nbsp; - Checkpoint triggering task Source: Custom Source (1/4) of job 6dbecb3e4f536c2c92ca7931cba54fd2 is not in state RUNNING but DEPLOYING instead. Aborting checkpoint.
18:08:35.036 [flink-akka.actor.default-dispatcher-430] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - map_sub_order_detail -&gt; Sink: Print to Std. Out (3/4) (989517f5535736062e6ce870e30742ee) switched from DEPLOYING to RUNNING.
18:08:35.037 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - map_sub_order_detail -&gt; Sink: Print to Std. Out (2/4) (c0e50cbfbab0b29973cc517056f3f561) switched from DEPLOYING to RUNNING.
18:08:35.057 [flink-akka.actor.default-dispatcher-430] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - map_sub_order_detail -&gt; Sink: Print to Std. Out (4/4) (eaf47a632d3e735f1341e1d6d4ec7b7f) switched from DEPLOYING to RUNNING.
18:08:35.058 [flink-akka.actor.default-dispatcher-430] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - map_sub_order_detail -&gt; Sink: Print to Std. Out (1/4) (4127cdcd8ad7bd2011b7f8a8330663b9) switched from DEPLOYING to RUNNING.
18:08:35.069 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - Source: Custom Source (4/4) (9797fe0ec397922dff0c8bde4fb89ba2) switched from DEPLOYING to RUNNING.
18:08:35.070 [flink-akka.actor.default-dispatcher-430] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - Source: Custom Source (1/4) (c06f0e753f644bdbcfe50cc8d2364cf6) switched from DEPLOYING to RUNNING.
18:08:35.076 [flink-akka.actor.default-dispatcher-418] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - Source: Custom Source (3/4) (2a8cf04be945d59a70a3d82f50b38cd6) switched from DEPLOYING to RUNNING.
18:08:35.076 [flink-akka.actor.default-dispatcher-430] INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp; - Source: Custom Source (2/4) (5436dd5759d18472fcf171f5df9d9bc9) switched from DEPLOYING to RUNNING.
18:08:35.506 [Checkpoint Timer] INFO&nbsp; org.apache.flink.runtime.checkpoint.CheckpointCoordinator&nbsp; - Triggering checkpoint 1 @ 1595239715506 for job 6dbecb3e4f536c2c92ca7931cba54fd2.
18:08:35.530 [flink-akka.actor.default-dispatcher-430] INFO&nbsp; org.apache.flink.runtime.checkpoint.CheckpointCoordinator&nbsp; - Completed checkpoint 1 for job 6dbecb3e4f536c2c92ca7931cba54fd2 (74134 bytes in 24 ms).







------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <qcx978132955@gmail.com&gt;;
发送时间:&nbsp;2020年7月17日(星期五) 晚上10:58
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re: state无法从checkpoint中恢复



Hi

1 你需要回复一下我之前问你的问题:你可以从 JM log 看一下是否从 checkpoint 恢复了
2. 这里没有打印只是表明当前处理的 key 没有 state 数据,并不能表示 state 没有恢复回来,state 值是绑定到某个 key
上的(keyby 的 key)

Best,
Congxian


sun <1392427699@qq.com&gt; 于2020年7月17日周五 下午5:22写道:

&gt; 你好:counts 的数据 我是在下面打印出来了 List<String&amp;gt; list =
&gt; Lists.newArrayList(counts.get()) ;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; for(String ss : list){
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; System.out.println("!!!" + ss);
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; log.info("!!!" + ss);
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; },但是我重启服务之后,之前存的那些内容打印不出来了。
&gt; @Slf4j
&gt; public class FlatMapTestState extends RichFlatMapFunction<String,
&gt; Test222&amp;gt; {
&gt;
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp; private transient ListState<String&amp;gt; counts;
&gt;
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp; @Override
&gt;&nbsp;&nbsp;&nbsp;&nbsp; public void open(Configuration parameters) throws Exception {
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; StateTtlConfig ttlConfig = StateTtlConfig
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .newBuilder(Time.minutes(30))
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
&gt;
&gt; .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .build();
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ListStateDescriptor<String&amp;gt; lastUserLogin = new
&gt; ListStateDescriptor<&amp;gt;("lastUserLogin", String.class);
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; lastUserLogin.enableTimeToLive(ttlConfig);
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; counts = getRuntimeContext().getListState(lastUserLogin);
&gt;&nbsp;&nbsp;&nbsp;&nbsp; }
&gt;
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp; @Override
&gt;&nbsp;&nbsp;&nbsp;&nbsp; public void flatMap(String s, Collector<Test222&amp;gt; collector) throws
&gt; Exception {
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Test222 message = JSONUtil.toObject(s, new
&gt; TypeReference<Test222&amp;gt;() {
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; });
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; System.out.println(DateUtil.toLongDateString(new Date()));
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; log.info(DateUtil.toLongDateString(new Date()));
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; counts.add(message.getId());
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; List<String&amp;gt; list = Lists.newArrayList(counts.get()) ;
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; for(String ss : list){
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; System.out.println("!!!" + ss);
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; log.info("!!!" + ss);
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; log.info(DateUtil.toLongDateString(new Date()));
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; System.out.println(DateUtil.toLongDateString(new Date()));
&gt;&nbsp;&nbsp;&nbsp;&nbsp; }
&gt; }
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt; 发件人:
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "user-zh"
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; <
&gt; qcx978132955@gmail.com&amp;gt;;
&gt; 发送时间:&amp;nbsp;2020年7月16日(星期四) 晚上8:16
&gt; 收件人:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;
&gt;
&gt; 主题:&amp;nbsp;Re: state无法从checkpoint中恢复
&gt;
&gt;
&gt;
&gt; Hi
&gt;
&gt; 1 counts 的数据丢失了能否详细描述一下呢?你预期是什么,看到什么现象
&gt; 2 能否把你关于 counts 的其他代码也贴一下
&gt; 3. 你的作业是否从 checkpoint 恢复了呢?这个可以从 JM log 来查看
&gt; 4. 如果你确定是数据有丢失的话,或许你可以使用 state-process-api[1] 看一下是序列化出去有问题,还是 restore 回来有问题
&gt;
&gt; [1]
&gt;
&gt; https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html
&gt; Best,
&gt; Congxian
&gt;
&gt;
&gt; sun <1392427699@qq.com&amp;gt; 于2020年7月16日周四 下午6:16写道:
&gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; 配置代码env.enableCheckpointing(1000);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
&gt; &amp;gt; //作业失败后不重启
&gt; &amp;gt; env.setRestartStrategy(RestartStrategies.noRestart());
&gt; &amp;gt; env.getCheckpointConfig().setCheckpointTimeout(500);
&gt; &amp;gt;
&gt; &amp;gt;
&gt; env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
&gt; &amp;gt;
&gt; &amp;gt;
&gt; env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
&gt; &amp;gt; env.setStateBackend(new
&gt; &amp;gt; RocksDBStateBackend("file:///opt/flink/flink-1.7.2/checkpoints"));
&gt; &amp;gt;&amp;nbsp;&amp;nbsp; 使用状态的代码private transient ListState<String&amp;amp;gt; counts;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; @Override
&gt; &amp;gt; public void open(Configuration parameters) throws Exception {
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; StateTtlConfig ttlConfig = StateTtlConfig
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; .newBuilder(Time.minutes(30))
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
&gt; &amp;gt;
&gt; &amp;gt; .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; .build();
&gt; &amp;gt;
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; ListStateDescriptor<String&amp;amp;gt;
&gt; lastUserLogin = new
&gt; &amp;gt; ListStateDescriptor<&amp;amp;gt;("lastUserLogin", String.class);
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; lastUserLogin.enableTimeToLive(ttlConfig);
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; counts =
&gt; getRuntimeContext().getListState(lastUserLogin);
&gt; &amp;gt; }
&gt; &amp;gt; 我重启了task managers 后。发现&amp;nbsp; counts&amp;nbsp; 里面的数据都丢失了

Re: state无法从checkpoint中恢复

Posted by Congxian Qiu <qc...@gmail.com>.
Hi

1 你需要回复一下我之前问你的问题:你可以从 JM log 看一下是否从 checkpoint 恢复了
2. 这里没有打印只是表明当前处理的 key 没有 state 数据,并不能表示 state 没有恢复回来,state 值是绑定到某个 key
上的(keyby 的 key)

Best,
Congxian


sun <13...@qq.com> 于2020年7月17日周五 下午5:22写道:

> 你好:counts 的数据 我是在下面打印出来了 List<String&gt; list =
> Lists.newArrayList(counts.get()) ;
>             for(String ss : list){
>                 System.out.println("!!!" + ss);
>                 log.info("!!!" + ss);
>             },但是我重启服务之后,之前存的那些内容打印不出来了。
> @Slf4j
> public class FlatMapTestState extends RichFlatMapFunction<String,
> Test222&gt; {
>
>
>     private transient ListState<String&gt; counts;
>
>
>     @Override
>     public void open(Configuration parameters) throws Exception {
>         StateTtlConfig ttlConfig = StateTtlConfig
>                 .newBuilder(Time.minutes(30))
>                 .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>                 .build();
>
>         ListStateDescriptor<String&gt; lastUserLogin = new
> ListStateDescriptor<&gt;("lastUserLogin", String.class);
>         lastUserLogin.enableTimeToLive(ttlConfig);
>         counts = getRuntimeContext().getListState(lastUserLogin);
>     }
>
>
>     @Override
>     public void flatMap(String s, Collector<Test222&gt; collector) throws
> Exception {
>             Test222 message = JSONUtil.toObject(s, new
> TypeReference<Test222&gt;() {
>             });
>
>             System.out.println(DateUtil.toLongDateString(new Date()));
>             log.info(DateUtil.toLongDateString(new Date()));
>             counts.add(message.getId());
>             List<String&gt; list = Lists.newArrayList(counts.get()) ;
>             for(String ss : list){
>                 System.out.println("!!!" + ss);
>                 log.info("!!!" + ss);
>             }
>               log.info(DateUtil.toLongDateString(new Date()));
>             System.out.println(DateUtil.toLongDateString(new Date()));
>     }
> }
>
>
>
>
>
>
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:
>                                                   "user-zh"
>                                                                     <
> qcx978132955@gmail.com&gt;;
> 发送时间:&nbsp;2020年7月16日(星期四) 晚上8:16
> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>
> 主题:&nbsp;Re: state无法从checkpoint中恢复
>
>
>
> Hi
>
> 1 counts 的数据丢失了能否详细描述一下呢?你预期是什么,看到什么现象
> 2 能否把你关于 counts 的其他代码也贴一下
> 3. 你的作业是否从 checkpoint 恢复了呢?这个可以从 JM log 来查看
> 4. 如果你确定是数据有丢失的话,或许你可以使用 state-process-api[1] 看一下是序列化出去有问题,还是 restore 回来有问题
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html
> Best,
> Congxian
>
>
> sun <1392427699@qq.com&gt; 于2020年7月16日周四 下午6:16写道:
>
> &gt;
> &gt;
> 配置代码env.enableCheckpointing(1000);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> &gt; //作业失败后不重启
> &gt; env.setRestartStrategy(RestartStrategies.noRestart());
> &gt; env.getCheckpointConfig().setCheckpointTimeout(500);
> &gt;
> &gt;
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> &gt;
> &gt;
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> &gt; env.setStateBackend(new
> &gt; RocksDBStateBackend("file:///opt/flink/flink-1.7.2/checkpoints"));
> &gt;&nbsp;&nbsp; 使用状态的代码private transient ListState<String&amp;gt; counts;
> &gt;
> &gt;
> &gt; @Override
> &gt; public void open(Configuration parameters) throws Exception {
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; StateTtlConfig ttlConfig = StateTtlConfig
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> .newBuilder(Time.minutes(30))
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> &gt;
> &gt; .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> .build();
> &gt;
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; ListStateDescriptor<String&amp;gt;
> lastUserLogin = new
> &gt; ListStateDescriptor<&amp;gt;("lastUserLogin", String.class);
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; lastUserLogin.enableTimeToLive(ttlConfig);
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; counts =
> getRuntimeContext().getListState(lastUserLogin);
> &gt; }
> &gt; 我重启了task managers 后。发现&nbsp; counts&nbsp; 里面的数据都丢失了

回复: state无法从checkpoint中恢复

Posted by sun <13...@qq.com>.
你好:counts 的数据 我是在下面打印出来了 List<String&gt; list = Lists.newArrayList(counts.get()) ;
            for(String ss : list){
                System.out.println("!!!" + ss);
                log.info("!!!" + ss);
            },但是我重启服务之后,之前存的那些内容打印不出来了。
@Slf4j
public class FlatMapTestState extends RichFlatMapFunction<String, Test222&gt; {


    private transient ListState<String&gt; counts;


    @Override
    public void open(Configuration parameters) throws Exception {
        StateTtlConfig ttlConfig = StateTtlConfig
                .newBuilder(Time.minutes(30))
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .build();

        ListStateDescriptor<String&gt; lastUserLogin = new ListStateDescriptor<&gt;("lastUserLogin", String.class);
        lastUserLogin.enableTimeToLive(ttlConfig);
        counts = getRuntimeContext().getListState(lastUserLogin);
    }


    @Override
    public void flatMap(String s, Collector<Test222&gt; collector) throws Exception {
            Test222 message = JSONUtil.toObject(s, new TypeReference<Test222&gt;() {
            });

            System.out.println(DateUtil.toLongDateString(new Date()));
            log.info(DateUtil.toLongDateString(new Date()));
            counts.add(message.getId());
            List<String&gt; list = Lists.newArrayList(counts.get()) ;
            for(String ss : list){
                System.out.println("!!!" + ss);
                log.info("!!!" + ss);
            }
              log.info(DateUtil.toLongDateString(new Date()));
            System.out.println(DateUtil.toLongDateString(new Date()));
    }
}










------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                                                        "user-zh"                                                                                    <qcx978132955@gmail.com&gt;;
发送时间:&nbsp;2020年7月16日(星期四) 晚上8:16
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re: state无法从checkpoint中恢复



Hi

1 counts 的数据丢失了能否详细描述一下呢?你预期是什么,看到什么现象
2 能否把你关于 counts 的其他代码也贴一下
3. 你的作业是否从 checkpoint 恢复了呢?这个可以从 JM log 来查看
4. 如果你确定是数据有丢失的话,或许你可以使用 state-process-api[1] 看一下是序列化出去有问题,还是 restore 回来有问题

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html
Best,
Congxian


sun <1392427699@qq.com&gt; 于2020年7月16日周四 下午6:16写道:

&gt;
&gt; 配置代码env.enableCheckpointing(1000);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
&gt; //作业失败后不重启
&gt; env.setRestartStrategy(RestartStrategies.noRestart());
&gt; env.getCheckpointConfig().setCheckpointTimeout(500);
&gt;
&gt; env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
&gt;
&gt; env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
&gt; env.setStateBackend(new
&gt; RocksDBStateBackend("file:///opt/flink/flink-1.7.2/checkpoints"));
&gt;&nbsp;&nbsp; 使用状态的代码private transient ListState<String&amp;gt; counts;
&gt;
&gt;
&gt; @Override
&gt; public void open(Configuration parameters) throws Exception {
&gt;&nbsp;&nbsp;&nbsp;&nbsp; StateTtlConfig ttlConfig = StateTtlConfig
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .newBuilder(Time.minutes(30))
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
&gt;
&gt; .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; .build();
&gt;
&gt;&nbsp;&nbsp;&nbsp;&nbsp; ListStateDescriptor<String&amp;gt; lastUserLogin = new
&gt; ListStateDescriptor<&amp;gt;("lastUserLogin", String.class);
&gt;&nbsp;&nbsp;&nbsp;&nbsp; lastUserLogin.enableTimeToLive(ttlConfig);
&gt;&nbsp;&nbsp;&nbsp;&nbsp; counts = getRuntimeContext().getListState(lastUserLogin);
&gt; }
&gt; 我重启了task managers 后。发现&nbsp; counts&nbsp; 里面的数据都丢失了

Re: state无法从checkpoint中恢复

Posted by Congxian Qiu <qc...@gmail.com>.
Hi

1 counts 的数据丢失了能否详细描述一下呢?你预期是什么,看到什么现象
2 能否把你关于 counts 的其他代码也贴一下
3. 你的作业是否从 checkpoint 恢复了呢?这个可以从 JM log 来查看
4. 如果你确定是数据有丢失的话,或许你可以使用 state-process-api[1] 看一下是序列化出去有问题,还是 restore 回来有问题

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html
Best,
Congxian


sun <13...@qq.com> 于2020年7月16日周四 下午6:16写道:

>
> 配置代码env.enableCheckpointing(1000);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> //作业失败后不重启
> env.setRestartStrategy(RestartStrategies.noRestart());
> env.getCheckpointConfig().setCheckpointTimeout(500);
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> env.setStateBackend(new
> RocksDBStateBackend("file:///opt/flink/flink-1.7.2/checkpoints"));
>   使用状态的代码private transient ListState<String&gt; counts;
>
>
> @Override
> public void open(Configuration parameters) throws Exception {
>     StateTtlConfig ttlConfig = StateTtlConfig
>             .newBuilder(Time.minutes(30))
>             .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>             .build();
>
>     ListStateDescriptor<String&gt; lastUserLogin = new
> ListStateDescriptor<&gt;("lastUserLogin", String.class);
>     lastUserLogin.enableTimeToLive(ttlConfig);
>     counts = getRuntimeContext().getListState(lastUserLogin);
> }
> 我重启了task managers 后。发现  counts  里面的数据都丢失了