You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Underwood (Jira)" <ji...@apache.org> on 2022/06/15 11:24:00 UTC
[jira] [Comment Edited] (FLINK-27570) A checkpoint path error does not cause the job to stop
[ https://issues.apache.org/jira/browse/FLINK-27570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17554525#comment-17554525 ]
Underwood edited comment on FLINK-27570 at 6/15/22 11:23 AM:
-------------------------------------------------------------
This time, I ran a task in the IDEA, a wordcount, and filled in a nonexistent path. The phenomenon is different from that before. The result is that the checkpoint is successful, but there is no checkpoint file in this folder.
{code:java}
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class StreamWordCount {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set (ExecutionCheckpointingOptions.TOLERABLE_FAILURE_NUMBER, 0);
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
conf.setInteger(RestOptions.PORT, 8081);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.enableCheckpointing(1000);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointStorage(
new FileSystemCheckpointStorage("file:///D/Desktop/Log/Flink/noDirectory"));
checkpointConfig.setTolerableCheckpointFailureNumber(0);
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
checkpointConfig.setExternalizedCheckpointCleanup(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setRestartStrategy(RestartStrategies.noRestart());
env.setParallelism(1);
DataStream<String> inputDataStream = env.socketTextStream("localhost", 7777);
DataStream<Tuple2<String, Integer>> resultStream =
inputDataStream
.flatMap(new WordCount.MyFlatMapper())
.setParallelism(1)
.keyBy(0)
.sum(1)
.setParallelism(1)
.setParallelism(1);
resultStream.print().setParallelism(1);
env.execute();
}
} {code}
!image-2022-06-15-19-16-55-758.png!
!image-2022-06-15-19-20-41-916.png!
!image-2022-06-15-19-20-58-213.png!
!image-2022-06-15-19-21-24-740.png!
was (Author: JIRAUSER288006):
{code:java}
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class StreamWordCount {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set (ExecutionCheckpointingOptions.TOLERABLE_FAILURE_NUMBER, 0);
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
conf.setInteger(RestOptions.PORT, 8081);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.enableCheckpointing(1000);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointStorage(
new FileSystemCheckpointStorage("file:///D/Desktop/Log/Flink/noDirectory"));
checkpointConfig.setTolerableCheckpointFailureNumber(0);
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
checkpointConfig.setExternalizedCheckpointCleanup(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setRestartStrategy(RestartStrategies.noRestart());
env.setParallelism(1);
DataStream<String> inputDataStream = env.socketTextStream("localhost", 7777);
DataStream<Tuple2<String, Integer>> resultStream =
inputDataStream
.flatMap(new WordCount.MyFlatMapper())
.setParallelism(1)
.keyBy(0)
.sum(1)
.setParallelism(1)
.setParallelism(1);
resultStream.print().setParallelism(1);
env.execute();
}
} {code}
!image-2022-06-15-19-16-55-758.png!
!image-2022-06-15-19-20-41-916.png!
!image-2022-06-15-19-20-58-213.png!
!image-2022-06-15-19-21-24-740.png!
> A checkpoint path error does not cause the job to stop
> ------------------------------------------------------
>
> Key: FLINK-27570
> URL: https://issues.apache.org/jira/browse/FLINK-27570
> Project: Flink
> Issue Type: Bug
> Components: API / Core
> Affects Versions: 1.14.4
> Reporter: Underwood
> Priority: Critical
> Attachments: image-2022-05-11-16-12-11-818.png, image-2022-05-11-16-12-22-157.png, image-2022-05-11-16-13-20-709.png, image-2022-06-15-19-16-55-758.png, image-2022-06-15-19-20-41-916.png, image-2022-06-15-19-20-58-213.png, image-2022-06-15-19-21-24-740.png
>
>
> I configured the wrong checkpoint path when starting the job, and set:
> {code:java}
> conf.set (executioncheckpointingoptions. Tolerable_failure_number, 0);
> env setRestartStrategy(RestartStrategies.noRestart());
> {code}
> The job is expected to stop due to a checkpoint error, but the job is still running.
>
>
> Here is my job configuration and environment:
> !image-2022-05-11-16-13-20-709.png!
> !image-2022-05-11-16-12-11-818.png!
> !image-2022-05-11-16-12-22-157.png!
--
This message was sent by Atlassian Jira
(v8.20.7#820007)