You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Vinodhini (Jira)" <ji...@apache.org> on 2020/07/06 09:31:00 UTC

[jira] [Updated] (KAFKA-10236) Kafka Streams | onCommit interceptor with EOS enabled

     [ https://issues.apache.org/jira/browse/KAFKA-10236?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Vinodhini updated KAFKA-10236:
------------------------------
    Description: 
Coming from [https://stackoverflow.com/questions/62700075/is-there-a-way-to-get-committed-offset-in-eos-kafka-stream|https://stackoverflow.com/questions/62700075/is-there-a-way-to-get-committed-offset-in-eos-kafka-stream?]

 

*Background :*

Setting consumer interceptor to StreamsConfig will ensure that the interceptor(s) are called when messages are consumed/committed. Snippet from {{org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#commitOffsetsSync}}

 
{code:java}
         if (future.succeeded()) {
            if (interceptors != null)
                interceptors.onCommit(offsets);
            return true;
        }{code}
 

But the {{consumerInterceptor.onCommit()}} was never called even though I saw the offsets being committed at the source topic.

*Issue:*

I figured that it was because I was using kstreams with Exactly once processing guarantee enabled.

This was the logic at {{org.apache.kafka.streams.processor.internals.StreamTask#commit}}

 
{code:java}
        if (this.eosEnabled) {
            this.producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata,
              this.applicationId);
            this.producer.commitTransaction();
            if (startNewTransaction) {
                this.producer.beginTransaction();
            }
        } else {
            this.consumer.commitSync(consumedOffsetsAndMetadata);
        }
{code}
As you can see, {{consumer.commitSync}} which in turns calls the {{consumerCoordinator.commit}} which calls the {{interceptor.onCommit}}, never gets called. Because with eos enabled, it is the transaction api that gets invoked.

 

*Request* 

Provide a way to get committed offset from Interceptors for EOS enabled also.

  was:
Coming from [https://stackoverflow.com/questions/62700075/is-there-a-way-to-get-committed-offset-in-eos-kafka-stream|https://stackoverflow.com/questions/62700075/is-there-a-way-to-get-committed-offset-in-eos-kafka-stream?]

 

*Background :*

Setting consumer interceptor to StreamsConfig will ensure that the interceptor(s) are called when messages are consumed/committed. Snippet from {{org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#commitOffsetsSync}}

 

{{}}
{code:java}
         if (future.succeeded()) {
            if (interceptors != null)
                interceptors.onCommit(offsets);
            return true;
        }{code}
 

But the {{consumerInterceptor.onCommit()}} was never called even though I saw the offsets being committed at the source topic.

*Issue:*

I figured that it was because I was using kstreams with Exactly once processing guarantee enabled.

This was the logic at {{org.apache.kafka.streams.processor.internals.StreamTask#commit}}

 
{code:java}
        if (this.eosEnabled) {
            this.producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata,
              this.applicationId);
            this.producer.commitTransaction();
            if (startNewTransaction) {
                this.producer.beginTransaction();
            }
        } else {
            this.consumer.commitSync(consumedOffsetsAndMetadata);
        }
{code}
As you can see, {{consumer.commitSync}} which in turns calls the {{consumerCoordinator.commit}} which calls the {{interceptor.onCommit}}, never gets called. Because with eos enabled, it is the transaction api that gets invoked.

 

*Request* 

Provide a way to get committed offset from Interceptors for EOS enabled also.


> Kafka Streams | onCommit interceptor with EOS enabled 
> ------------------------------------------------------
>
>                 Key: KAFKA-10236
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10236
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.5.0
>            Reporter: Vinodhini
>            Priority: Major
>
> Coming from [https://stackoverflow.com/questions/62700075/is-there-a-way-to-get-committed-offset-in-eos-kafka-stream|https://stackoverflow.com/questions/62700075/is-there-a-way-to-get-committed-offset-in-eos-kafka-stream?]
>  
> *Background :*
> Setting consumer interceptor to StreamsConfig will ensure that the interceptor(s) are called when messages are consumed/committed. Snippet from {{org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#commitOffsetsSync}}
>  
> {code:java}
>          if (future.succeeded()) {
>             if (interceptors != null)
>                 interceptors.onCommit(offsets);
>             return true;
>         }{code}
>  
> But the {{consumerInterceptor.onCommit()}} was never called even though I saw the offsets being committed at the source topic.
> *Issue:*
> I figured that it was because I was using kstreams with Exactly once processing guarantee enabled.
> This was the logic at {{org.apache.kafka.streams.processor.internals.StreamTask#commit}}
>  
> {code:java}
>         if (this.eosEnabled) {
>             this.producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata,
>               this.applicationId);
>             this.producer.commitTransaction();
>             if (startNewTransaction) {
>                 this.producer.beginTransaction();
>             }
>         } else {
>             this.consumer.commitSync(consumedOffsetsAndMetadata);
>         }
> {code}
> As you can see, {{consumer.commitSync}} which in turns calls the {{consumerCoordinator.commit}} which calls the {{interceptor.onCommit}}, never gets called. Because with eos enabled, it is the transaction api that gets invoked.
>  
> *Request* 
> Provide a way to get committed offset from Interceptors for EOS enabled also.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)