You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Nazar Volynets (Jira)" <ji...@apache.org> on 2021/01/20 23:36:00 UTC
[jira] [Created] (FLINK-21057) Streaming checkpointing with small
interval leads app to hang
Nazar Volynets created FLINK-21057:
--------------------------------------
Summary: Streaming checkpointing with small interval leads app to hang
Key: FLINK-21057
URL: https://issues.apache.org/jira/browse/FLINK-21057
Project: Flink
Issue Type: Bug
Components: Runtime / Checkpointing
Affects Versions: 1.11.3
Environment: * streaming app
* flink cluster in standalone-job / application mode
* 1.11.3 Flink version
* jobmanager --> 1 instance
* taskmanager --> 1 instance
* parallelism --> 2
Reporter: Nazar Volynets
Attachments: jobmanager.log, taskmanager.log
There is a simple streaming app with enabled checkpointing:
* statebackend --> RockDB
* mode --> EXACTLY_ONCE
STRs:
1. Run Flink cluster in standalone-job / application mode (with embedded streaming app)
2. Get error
3. Wait 1 min
4. Stop Flink cluster
4. Repeat steps from 1 to 3 util error :
{code:java|title=taskmanager}
org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
flink-kafka-mirror-maker-jobmanager | at org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352) ~[?:?]
flink-kafka-mirror-maker-jobmanager | at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260) ~[?:?]
flink-kafka-mirror-maker-jobmanager | at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) ~[?:?]
flink-kafka-mirror-maker-jobmanager | at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:572) ~[?:?]
flink-kafka-mirror-maker-jobmanager | at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564) ~[?:?]
flink-kafka-mirror-maker-jobmanager | at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414) ~[?:?]
flink-kafka-mirror-maker-jobmanager | at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312) ~[?:?]
flink-kafka-mirror-maker-jobmanager | at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) ~[?:?]
flink-kafka-mirror-maker-jobmanager | at java.lang.Thread.run(Unknown Source) ~[?:?]
{code}
It is obvious
Please find below:
* streaming app code base (example)
* attached logs
** jobmanager
** taskmanager
*Example*
+App+
{code:java|title=build.gradle (dependencies)}
...
ext {
...
javaVersion = '11'
flinkVersion = '1.12.0'
scalaBinaryVersion = '2.11'
...
}
dependencies {
...
implementation "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
implementation "org.apache.flink:flink-clients_${scalaBinaryVersion}:${flinkVersion}"
implementation "org.apache.flink:flink-statebackend-rocksdb_${scalaBinaryVersion}:${flinkVersion}"
...
}
{code}
{code:java|title=App}
public static void main(String[] args) {
...
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
env.enableCheckpointing(500);
env.setStateBackend(new RocksDBStateBackend("file:///xxx/config/checkpoints/rocksdb", true));
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
env.getCheckpointConfig().setCheckpointTimeout(600000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
FlinkKafkaConsumer<Record> consumer = createConsumer();
FlinkKafkaProducer<Record> producer = createProducer();
env
.addSource(consumer)
.uid("kafka-consumer")
.addSink(producer)
.uid("kafka-producer")
;
env.execute();
}
public static FlinkKafkaConsumer<Record> createConsumer() {
...
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-source-1:9091");
... // nothing special
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
FlinkKafkaConsumer<Record> consumer = new FlinkKafkaConsumer<>("topic-1", new RecordKafkaDerSchema(), props);
... // RecordKafkaDerSchema --> custom schema is used to copy not only message body but message key too
... // SimpleStringSchema --> can be used instead to reproduce issue
consumer.setStartFromGroupOffsets();
consumer.setCommitOffsetsOnCheckpoints(true);
return consumer;
}
public static FlinkKafkaProducer<Record> createProducer() {
...
Properties props = new Properties();
...
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-target-1:9094");
props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
props.setProperty(ProducerConfig.ACKS_CONFIG, "all");
props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
props.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1000");
props.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "2000");
props.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "9000");
props.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "xxx"); // ignored due to expected behaviour - https://issues.apache.org/jira/browse/FLINK-17691
props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "" + (15 * 60 * 1000)); // decreased from 1 hour to 15 mins; app is going to be restarted quickly
...
FlinkKafkaProducer<Record> producer = new FlinkKafkaProducer<>("topic-1", new RecordKafkaSerSchema(true), props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
... // RecordKafkaSerSchema --> custom schema is used to copy not only message body but message key too
... // SimpleStringSchema --> can be used instead to reproduce issue
return producer;
}
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)