You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Vinodhini (Jira)" <ji...@apache.org> on 2020/07/06 09:31:00 UTC
[jira] [Created] (KAFKA-10236) Kafka Streams | onCommit interceptor
with EOS enabled
Vinodhini created KAFKA-10236:
---------------------------------
Summary: Kafka Streams | onCommit interceptor with EOS enabled
Key: KAFKA-10236
URL: https://issues.apache.org/jira/browse/KAFKA-10236
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 2.5.0
Reporter: Vinodhini
Coming from [https://stackoverflow.com/questions/62700075/is-there-a-way-to-get-committed-offset-in-eos-kafka-stream|https://stackoverflow.com/questions/62700075/is-there-a-way-to-get-committed-offset-in-eos-kafka-stream?]
*Background :*
Setting consumer interceptor to StreamsConfig will ensure that the interceptor(s) are called when messages are consumed/committed. Snippet from {{org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#commitOffsetsSync}}
{{}}
{code:java}
if (future.succeeded()) {
if (interceptors != null)
interceptors.onCommit(offsets);
return true;
}{code}
But the {{consumerInterceptor.onCommit()}} was never called even though I saw the offsets being committed at the source topic.
*Issue:*
I figured that it was because I was using kstreams with Exactly once processing guarantee enabled.
This was the logic at {{org.apache.kafka.streams.processor.internals.StreamTask#commit}}
{code:java}
if (this.eosEnabled) {
this.producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata,
this.applicationId);
this.producer.commitTransaction();
if (startNewTransaction) {
this.producer.beginTransaction();
}
} else {
this.consumer.commitSync(consumedOffsetsAndMetadata);
}
{code}
As you can see, {{consumer.commitSync}} which in turns calls the {{consumerCoordinator.commit}} which calls the {{interceptor.onCommit}}, never gets called. Because with eos enabled, it is the transaction api that gets invoked.
*Request*
Provide a way to get committed offset from Interceptors for EOS enabled also.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)