You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 刘海 <li...@163.com> on 2021/01/20 13:04:41 UTC

设置状态存储位置后,job运行起来后找不到状态数据

Hi all
小弟遇到个问题期望大佬解答解答:
通过         env.setStateBackend(new RocksDBStateBackend("file:///data/flink/checkpoints"));设置状态存储位置,job运行起来后找不到状态数据,


flink1.12 yarn pre job 模式,下面是我的配置,job运行起来后在服务器上找不到 “/data/flink/checkpoints”这个目录,像我设置了状态的存储位置是不是job一运行起来对应的存储位置就应该有状态的数据呢?


public class FlinkTestDemo {
public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000);
env.getConfig().setAutoWatermarkInterval(200);
env.setStateBackend(new RocksDBStateBackend("file:///data/flink/checkpoints"));
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);

bsTableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
bsTableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMinutes(5));

Configuration configuration = bsTableEnv.getConfig().getConfiguration();
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "6000");
configuration.setString("table.exec.mini-batch.size", "5000");

| |
刘海
|
|
liuhai35@163.com
|
签名由网易邮箱大师定制

Re: 设置状态存储位置后,job运行起来后找不到状态数据

Posted by zhisheng <zh...@gmail.com>.
你配置的是本地目录,不是 hdfs
目录,当重启后,可能新的任务运行的机器不是之前的那台机器了,那么之前作业的状态信息(在其他机器上)是不在新的机器上的,那么就会发现找不到状态文件,建议配置成
HDFS 的

Best
zhisheng

刘海 <li...@163.com> 于2021年1月20日周三 下午9:05写道:

> Hi all
> 小弟遇到个问题期望大佬解答解答:
> 通过         env.setStateBackend(new
> RocksDBStateBackend("file:///data/flink/checkpoints"));设置状态存储位置,job运行起来后找不到状态数据,
>
>
> flink1.12 yarn pre job 模式,下面是我的配置,job运行起来后在服务器上找不到
> “/data/flink/checkpoints”这个目录,像我设置了状态的存储位置是不是job一运行起来对应的存储位置就应该有状态的数据呢?
>
>
> public class FlinkTestDemo {
> public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.enableCheckpointing(60000);
> env.getConfig().setAutoWatermarkInterval(200);
> env.setStateBackend(new
> RocksDBStateBackend("file:///data/flink/checkpoints"));
> EnvironmentSettings bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env,
> bsSettings);
>
> bsTableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
> CheckpointingMode.EXACTLY_ONCE);
> CheckpointConfig config = env.getCheckpointConfig();
>
> config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> bsTableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
> Duration.ofMinutes(5));
>
> Configuration configuration = bsTableEnv.getConfig().getConfiguration();
> configuration.setString("table.exec.mini-batch.enabled", "true");
> configuration.setString("table.exec.mini-batch.allow-latency", "6000");
> configuration.setString("table.exec.mini-batch.size", "5000");
>
> | |
> 刘海
> |
> |
> liuhai35@163.com
> |
> 签名由网易邮箱大师定制