You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yun Tang (Jira)" <ji...@apache.org> on 2021/01/21 07:01:00 UTC

[jira] [Closed] (FLINK-21056) Streaming checkpointing is failing occasionally

     [ https://issues.apache.org/jira/browse/FLINK-21056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yun Tang closed FLINK-21056.
----------------------------
    Resolution: Invalid

> Streaming checkpointing is failing occasionally
> -----------------------------------------------
>
>                 Key: FLINK-21056
>                 URL: https://issues.apache.org/jira/browse/FLINK-21056
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.11.3
>         Environment: * streaming app
> * flink cluster in standalone-job / application mode
> * 1.11.3 Flink version
> * jobmanager --> 1 instance
> * taskmanager --> 1 instance
> * parallelism --> 2
>            Reporter: Nazar Volynets
>            Priority: Major
>         Attachments: jobmanager.log, taskmanager.log
>
>
> There is a simple streaming app with enabled checkpointing:
>  * statebackend --> RockDB
>  * mode --> EXACTLY_ONCE
> STRs:
>  1. Run Flink cluster in standalone-job / application mode (with embedded streaming app)
>  2. Wait 10 minutes
>  3. Restart Flink cluster (& consequently streaming app)
>  4. Repeat steps from #1 to #3 until you will get an checkpointing error
> {code:java|title=taskmanager}
> 2021-01-19 12:09:39,719 INFO  org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl [] - Could not complete snapshot 21 for operator Source: Custom Source -> Sink: Unnamed (1/2). Failure reason: Checkpoint was declined.
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 21 for operator Source: Custom Source -> Sink: Unnamed (1/2). Failure reason: Checkpoint was declined.
> ...
> Caused by: org.apache.flink.util.SerializedThrowable: Timeout expired after 60000milliseconds while awaiting InitProducerId
> {code}
> Based on stack trace quite tricky to define / determine the root cause.
> Please find below:
>  * streaming app code base (example)
>  * attached logs
>  ** jobmanager
>  ** taskmanager
> *Example*
> +App+
> {code:java|title=build.gradle (dependencies)}
> ...
> ext {
>   ...
>   javaVersion = '11'
>   flinkVersion = '1.12.0'
>   scalaBinaryVersion = '2.11'
>   ...
> }
> dependencies {
>   ...
>   implementation "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
>   implementation "org.apache.flink:flink-clients_${scalaBinaryVersion}:${flinkVersion}"
>   implementation "org.apache.flink:flink-statebackend-rocksdb_${scalaBinaryVersion}:${flinkVersion}"
>   ...
> }
> {code}
> {code:java|title=App}
> public static void main(String[] args) {
>   ...
>   StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>   env.setParallelism(2);
>   env.enableCheckpointing(10000);
>   env.setStateBackend(new RocksDBStateBackend("file:///xxx/config/checkpoints/rocksdb", true));
>   env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>   env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
>   env.getCheckpointConfig().setCheckpointTimeout(600000);
>   env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>   FlinkKafkaConsumer<Record> consumer = createConsumer();
>   FlinkKafkaProducer<Record> producer = createProducer();
>   env
>     .addSource(consumer)
>     .uid("kafka-consumer")
>     .addSink(producer)
>     .uid("kafka-producer")
>   ;
>   env.execute();
> }
> public static FlinkKafkaConsumer<Record> createConsumer() {
>   ...
>   Properties props = new Properties();
>   props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-source-1:9091");
>   ... // nothing special
>   props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
>   FlinkKafkaConsumer<Record> consumer = new FlinkKafkaConsumer<>("topic-1", new RecordKafkaDerSchema(), props);
>   ... // RecordKafkaDerSchema --> custom schema is used to copy not only message body but message key too
>   ... // SimpleStringSchema --> can be used instead to reproduce issue
>   consumer.setStartFromGroupOffsets();
>   consumer.setCommitOffsetsOnCheckpoints(true);
>   return consumer;
> }
> public static FlinkKafkaProducer<Record> createProducer() {
>   ...
>   Properties props = new Properties();
>   ...
>   props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-target-1:9094");
>   props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
>   props.setProperty(ProducerConfig.ACKS_CONFIG, "all");
>   props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
>   props.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1000");
>   props.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "2000");
>   props.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "9000");
>   props.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
>   props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "xxx"); // ignored due to expected behaviour - https://issues.apache.org/jira/browse/FLINK-17691
>   props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "" + (15 * 60 * 1000)); // decreased from 1 hour to 15 mins; app is going to be restarted quickly
>   ...
>   FlinkKafkaProducer<Record> producer = new FlinkKafkaProducer<>("topic-1", new RecordKafkaSerSchema(true), props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
>   ... // RecordKafkaSerSchema --> custom schema is used to copy not only message body but message key too
>   ... // SimpleStringSchema --> can be used instead to reproduce issue
>   return producer;
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)