You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Underwood (Jira)" <ji...@apache.org> on 2022/06/15 11:24:00 UTC

[jira] [Comment Edited] (FLINK-27570) A checkpoint path error does not cause the job to stop

    [ https://issues.apache.org/jira/browse/FLINK-27570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17554525#comment-17554525 ] 

Underwood edited comment on FLINK-27570 at 6/15/22 11:23 AM:
-------------------------------------------------------------

This time, I ran a task in the IDEA, a wordcount, and filled in a nonexistent path. The phenomenon is different from that before. The result is that the checkpoint is successful, but there is no checkpoint file in this folder.

 
{code:java}
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class StreamWordCount {
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    conf.set (ExecutionCheckpointingOptions.TOLERABLE_FAILURE_NUMBER, 0);
    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
    conf.setInteger(RestOptions.PORT, 8081);

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.enableCheckpointing(1000);
    CheckpointConfig checkpointConfig = env.getCheckpointConfig();
    checkpointConfig.setCheckpointStorage(
        new FileSystemCheckpointStorage("file:///D/Desktop/Log/Flink/noDirectory"));
    checkpointConfig.setTolerableCheckpointFailureNumber(0);
    checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    checkpointConfig.setExternalizedCheckpointCleanup(
            CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
   env.setRestartStrategy(RestartStrategies.noRestart());
    env.setParallelism(1);
    
    DataStream<String> inputDataStream = env.socketTextStream("localhost", 7777);

    DataStream<Tuple2<String, Integer>> resultStream =
        inputDataStream
            .flatMap(new WordCount.MyFlatMapper())
            .setParallelism(1)
            .keyBy(0)
            .sum(1)
            .setParallelism(1)
            .setParallelism(1);

    resultStream.print().setParallelism(1);

    env.execute();
  }
} {code}
!image-2022-06-15-19-16-55-758.png!

!image-2022-06-15-19-20-41-916.png!

!image-2022-06-15-19-20-58-213.png!

!image-2022-06-15-19-21-24-740.png!


was (Author: JIRAUSER288006):
{code:java}
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class StreamWordCount {
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    conf.set (ExecutionCheckpointingOptions.TOLERABLE_FAILURE_NUMBER, 0);
    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
    conf.setInteger(RestOptions.PORT, 8081);

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
    env.enableCheckpointing(1000);
    CheckpointConfig checkpointConfig = env.getCheckpointConfig();
    checkpointConfig.setCheckpointStorage(
        new FileSystemCheckpointStorage("file:///D/Desktop/Log/Flink/noDirectory"));
    checkpointConfig.setTolerableCheckpointFailureNumber(0);
    checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    checkpointConfig.setExternalizedCheckpointCleanup(
            CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
   env.setRestartStrategy(RestartStrategies.noRestart());
    env.setParallelism(1);
    
    DataStream<String> inputDataStream = env.socketTextStream("localhost", 7777);

    DataStream<Tuple2<String, Integer>> resultStream =
        inputDataStream
            .flatMap(new WordCount.MyFlatMapper())
            .setParallelism(1)
            .keyBy(0)
            .sum(1)
            .setParallelism(1)
            .setParallelism(1);

    resultStream.print().setParallelism(1);

    env.execute();
  }
} {code}
!image-2022-06-15-19-16-55-758.png!

!image-2022-06-15-19-20-41-916.png!

!image-2022-06-15-19-20-58-213.png!

!image-2022-06-15-19-21-24-740.png!

> A checkpoint path error does not cause the job to stop
> ------------------------------------------------------
>
>                 Key: FLINK-27570
>                 URL: https://issues.apache.org/jira/browse/FLINK-27570
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core
>    Affects Versions: 1.14.4
>            Reporter: Underwood
>            Priority: Critical
>         Attachments: image-2022-05-11-16-12-11-818.png, image-2022-05-11-16-12-22-157.png, image-2022-05-11-16-13-20-709.png, image-2022-06-15-19-16-55-758.png, image-2022-06-15-19-20-41-916.png, image-2022-06-15-19-20-58-213.png, image-2022-06-15-19-21-24-740.png
>
>
> I configured the wrong checkpoint path when starting the job, and set:
> {code:java}
> conf.set (executioncheckpointingoptions. Tolerable_failure_number, 0);
> env setRestartStrategy(RestartStrategies.noRestart());
> {code}
> The job is expected to stop due to a checkpoint error, but the job is still running.
>  
>  
> Here is my job configuration and environment:
> !image-2022-05-11-16-13-20-709.png!
> !image-2022-05-11-16-12-11-818.png!
> !image-2022-05-11-16-12-22-157.png!



--
This message was sent by Atlassian Jira
(v8.20.7#820007)