You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Congxian Qiu(klion26) (Jira)" <ji...@apache.org> on 2020/01/02 08:00:20 UTC

[jira] [Commented] (FLINK-15152) Job running without periodic checkpoint for stop failed at the beginning

    [ https://issues.apache.org/jira/browse/FLINK-15152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17006650#comment-17006650 ] 

Congxian Qiu(klion26) commented on FLINK-15152:
-----------------------------------------------

[~pnowojski] thanks for your reply.

Yes, duplicate a logic from {{org.apache.flink.runtime.scheduler.SchedulerBase#triggerSavepoint}}.

I think we should {{restart}} the {{CheckpointCoordinator}} in two cases below:
 # {{CheckpointCoordinator#triggerSynchronousSavepoint}} failed
 # stop job failed(even if the synchronous savepoint succeed)

 

For the new issues/extra complexity that might be introduced. IMO, we should make the {{CheckpointCoordinator}} running if the job is not stopped. so I think {{restart}} CheckpointCoordinator is needed.

So I proposed the previous change.

> Job running without periodic checkpoint for stop failed at the beginning
> ------------------------------------------------------------------------
>
>                 Key: FLINK-15152
>                 URL: https://issues.apache.org/jira/browse/FLINK-15152
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.9.1
>            Reporter: Feng Jiajie
>            Priority: Critical
>              Labels: checkpoint, scheduler
>
> I have a streaming job configured with periodically checkpoint, but after one week running, I found there isn't any checkpoint file.
> h2. Reproduce the problem:
> 1. Job was submitted to YARN:
> {code:java}
> bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m flink-example-1.0-SNAPSHOT.jar{code}
> 2. Then immediately, before all the task switch to RUNNING (about seconds), I(actually a job control script) send a "stop with savepoint" command by flink cli:
> {code:java}
> bin/flink stop -yid application_1575872737452_0019 f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir
> {code}
> log in jobmanager.log:
> {code:java}
> 2019-12-09 17:56:56,512 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint triggering task Source: Socket Stream -> Map (1/1) of job f75ca6f457828427ed3d413031b92722 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint.
> {code}
> Then the job task(taskmanager) *continues to run normally without* checkpoint.
> h2. The cause of the problem:
> 1. "stop with savepoint" command call the code stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612) and then triggerSynchronousSavepoint:
> {code:java}
> // we stop the checkpoint coordinator so that we are guaranteed
> // to have only the data of the synchronous savepoint committed.
> // in case of failure, and if the job restarts, the coordinator
> // will be restarted by the CheckpointCoordinatorDeActivator.
> checkpointCoordinator.stopCheckpointScheduler();{code}
> 2. but "before all the task switch to RUNNING", triggerSynchronousSavepoint failed at org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509
> {code:java}
> LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.",
>   tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
>   job,
>   ExecutionState.RUNNING,
>   ee.getState());
> throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);{code}
> 3. finally, "stop with savepoint" failed, with "checkpointCoordinator.stopCheckpointScheduler()" but without the termination of the job. The job is still running without periodically checkpoint. 
>  
> sample code for reproduce:
> {code:java}
> public class StreamingJob {
>   private static StateBackend makeRocksdbBackend() throws IOException {
>     RocksDBStateBackend rocksdbBackend = new RocksDBStateBackend("file:///tmp/aaa");
>     rocksdbBackend.enableTtlCompactionFilter();
>     rocksdbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
>     return rocksdbBackend;
>   }
>   public static void main(String[] args) throws Exception {
>     // set up the streaming execution environment
>     final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>     // 10 sec
>     env.enableCheckpointing(10_000L, CheckpointingMode.AT_LEAST_ONCE);
>     env.setStateBackend(makeRocksdbBackend());
>     env.setRestartStrategy(RestartStrategies.noRestart());
>     CheckpointConfig checkpointConfig = env.getCheckpointConfig();
>     checkpointConfig.enableExternalizedCheckpoints(
>         CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>     checkpointConfig.setFailOnCheckpointingErrors(true);
>     DataStream<String> text = env.socketTextStream("127.0.0.1", 8912, "\n");
>     text.map(new MapFunction<String, Tuple2<Long, Long>>() {
>       @Override
>       public Tuple2<Long, Long> map(String s) {
>         String[] s1 = s.split(" ");
>         return Tuple2.of(Long.parseLong(s1[0]), Long.parseLong(s1[1]));
>       }
>     }).keyBy(0).flatMap(new CountWindowAverage()).print();
>     env.execute("Flink Streaming Java API Skeleton");
>   }
>   public static class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
>     private transient ValueState<Tuple2<Long, Long>> sum;
>     @Override
>     public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
>       Tuple2<Long, Long> currentSum = sum.value();
>       currentSum.f0 += 1;
>       currentSum.f1 += input.f1;
>       sum.update(currentSum);
>       out.collect(new Tuple2<>(input.f0, currentSum.f1));
>     }
>     @Override
>     public void open(Configuration config) {
>       ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
>           new ValueStateDescriptor<>(
>               "average", // the state name
>               TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {
>               }), // type information
>               Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
>       sum = getRuntimeContext().getState(descriptor);
>     }
>   }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)