You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yun Tang (Jira)" <ji...@apache.org> on 2021/01/21 07:02:00 UTC

[jira] [Updated] (FLINK-21057) Streaming checkpointing with small interval leads app to hang

     [ https://issues.apache.org/jira/browse/FLINK-21057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yun Tang updated FLINK-21057:
-----------------------------
    Component/s:     (was: Runtime / Checkpointing)
                 Connectors / Kafka

> Streaming checkpointing with small interval leads app to hang
> -------------------------------------------------------------
>
>                 Key: FLINK-21057
>                 URL: https://issues.apache.org/jira/browse/FLINK-21057
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.11.3
>         Environment: * streaming app
> * flink cluster in standalone-job / application mode
> * 1.11.3 Flink version
> * jobmanager --> 1 instance
> * taskmanager --> 1 instance
> * parallelism --> 2
>            Reporter: Nazar Volynets
>            Priority: Major
>         Attachments: jobmanager.log, taskmanager.log
>
>
> There is a simple streaming app with enabled checkpointing:
>  * statebackend --> RockDB
>  * mode --> EXACTLY_ONCE
> STRs:
>  1. Run Flink cluster in standalone-job / application mode (with embedded streaming app)
>  2. Get error
>  3. Wait 1 min
>  4. Stop Flink cluster
>  4. Repeat steps from 1 to 3 util error :
> {code:java|title=taskmanager}
> org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
> flink-kafka-mirror-maker-jobmanager     | 	at org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352) ~[?:?]
> flink-kafka-mirror-maker-jobmanager     | 	at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260) ~[?:?]
> flink-kafka-mirror-maker-jobmanager     | 	at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) ~[?:?]
> flink-kafka-mirror-maker-jobmanager     | 	at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:572) ~[?:?]
> flink-kafka-mirror-maker-jobmanager     | 	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564) ~[?:?]
> flink-kafka-mirror-maker-jobmanager     | 	at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414) ~[?:?]
> flink-kafka-mirror-maker-jobmanager     | 	at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312) ~[?:?]
> flink-kafka-mirror-maker-jobmanager     | 	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) ~[?:?]
> flink-kafka-mirror-maker-jobmanager     | 	at java.lang.Thread.run(Unknown Source) ~[?:?]
> {code}
> It is obvious 
> Please find below:
>  * streaming app code base (example)
>  * attached logs
>  ** jobmanager
>  ** taskmanager
> *Example*
> +App+
> {code:java|title=build.gradle (dependencies)}
> ...
> ext {
>   ...
>   javaVersion = '11'
>   flinkVersion = '1.12.0'
>   scalaBinaryVersion = '2.11'
>   ...
> }
> dependencies {
>   ...
>   implementation "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
>   implementation "org.apache.flink:flink-clients_${scalaBinaryVersion}:${flinkVersion}"
>   implementation "org.apache.flink:flink-statebackend-rocksdb_${scalaBinaryVersion}:${flinkVersion}"
>   ...
> }
> {code}
> {code:java|title=App}
> public static void main(String[] args) {
>   ...
>   StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>   env.setParallelism(2);
>   env.enableCheckpointing(500);
>   env.setStateBackend(new RocksDBStateBackend("file:///xxx/config/checkpoints/rocksdb", true));
>   env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>   env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
>   env.getCheckpointConfig().setCheckpointTimeout(600000);
>   env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>   FlinkKafkaConsumer<Record> consumer = createConsumer();
>   FlinkKafkaProducer<Record> producer = createProducer();
>   env
>     .addSource(consumer)
>     .uid("kafka-consumer")
>     .addSink(producer)
>     .uid("kafka-producer")
>   ;
>   env.execute();
> }
> public static FlinkKafkaConsumer<Record> createConsumer() {
>   ...
>   Properties props = new Properties();
>   props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-source-1:9091");
>   ... // nothing special
>   props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
>   FlinkKafkaConsumer<Record> consumer = new FlinkKafkaConsumer<>("topic-1", new RecordKafkaDerSchema(), props);
>   ... // RecordKafkaDerSchema --> custom schema is used to copy not only message body but message key too
>   ... // SimpleStringSchema --> can be used instead to reproduce issue
>   consumer.setStartFromGroupOffsets();
>   consumer.setCommitOffsetsOnCheckpoints(true);
>   return consumer;
> }
> public static FlinkKafkaProducer<Record> createProducer() {
>   ...
>   Properties props = new Properties();
>   ...
>   props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-target-1:9094");
>   props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
>   props.setProperty(ProducerConfig.ACKS_CONFIG, "all");
>   props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
>   props.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1000");
>   props.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "2000");
>   props.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "9000");
>   props.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
>   props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "xxx"); // ignored due to expected behaviour - https://issues.apache.org/jira/browse/FLINK-17691
>   props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "" + (15 * 60 * 1000)); // decreased from 1 hour to 15 mins; app is going to be restarted quickly
>   ...
>   FlinkKafkaProducer<Record> producer = new FlinkKafkaProducer<>("topic-1", new RecordKafkaSerSchema(true), props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
>   ... // RecordKafkaSerSchema --> custom schema is used to copy not only message body but message key too
>   ... // SimpleStringSchema --> can be used instead to reproduce issue
>   return producer;
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)