You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Tzu-Li (Gordon) Tai (JIRA)" <ji...@apache.org> on 2018/01/10 14:57:01 UTC
[jira] [Created] (FLINK-8409) Race condition in KafkaConsumerThread
leads to potential NPE
Tzu-Li (Gordon) Tai created FLINK-8409:
------------------------------------------
Summary: Race condition in KafkaConsumerThread leads to potential NPE
Key: FLINK-8409
URL: https://issues.apache.org/jira/browse/FLINK-8409
Project: Flink
Issue Type: Bug
Components: Kafka Connector
Affects Versions: 1.3.2, 1.4.0, 1.5.0
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai
Priority: Blocker
Fix For: 1.3.3, 1.5.0, 1.4.1
The following lines in the {{KafkaConsumerThread::setOffsetsToCommit(...)}} suggests a race condition with the asynchronous callback from committing offsets to Kafka:
{code}
// record the work to be committed by the main consumer thread and make sure the consumer notices that
if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
log.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " +
"Skipping commit of previous offsets because newer complete checkpoint offsets are available. " +
"This does not compromise Flink's checkpoint integrity.");
}
this.offsetCommitCallback = commitCallback;
{code}
In the main consumer thread's main loop, {{nextOffsetsToCommit}} will be checked if there are any offsets to commit. If so, an asynchronous offset commit operation will be performed. The NPE happens in the case when the commit completes, but {{this.offsetCommitCallback = commitCallback;}} is not yet reached.
A possible fix is to make setting the next offsets to commit along with the callback instance a single atomic operation.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)