You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Bill Sobel (JIRA)" <ji...@apache.org> on 2017/05/24 21:41:04 UTC

[jira] [Resolved] (KAFKA-2060) Async onCompletion callback may not be called

     [ https://issues.apache.org/jira/browse/KAFKA-2060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Bill Sobel resolved KAFKA-2060.
-------------------------------
       Resolution: Fixed
    Fix Version/s: 0.9.0.2

Per notes this was fixed with the later Kafka libs

> Async onCompletion callback may not be called
> ---------------------------------------------
>
>                 Key: KAFKA-2060
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2060
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 0.8.1.2
>         Environment: All
>            Reporter: Bill Sobel
>            Priority: Critical
>              Labels: easyfix
>             Fix For: 0.9.0.2
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> The 'done' function in RecordBatch.java attempts to enumerate and call each onCompletion() callback.  However the call to thunk.future.get() can throw an exception.  When this occurs the callback is not invoked.  This appears to be the only place where a callback per async send would not occur and the callback orphaned.
> The call to thunk.future.get() appears to need to occur in its own try/catch and then the onCompletion called with the results if it doesn't throw an exception or thunk.callback.onCompletion(null, recordException) if it does.
> e.g.
>     /**
>      * Complete the request
>      * 
>      * @param baseOffset The base offset of the messages assigned by the server
>      * @param exception The exception that occurred (or null if the request was successful)
>      */
>     public void done(long baseOffset, RuntimeException exception) {
>         this.produceFuture.done(topicPartition, baseOffset, exception);
>         log.trace("Produced messages to topic-partition {} with base offset offset {} and error: {}.",
>                   topicPartition,
>                   baseOffset,
>                   exception);
>         // execute callbacks
>         for (int i = 0; i < this.thunks.size(); i++) {
>             try {
>                 Thunk thunk = this.thunks.get(i);
>                 if (exception == null) {
>                         RecordMetadata rc = null;
>                         try {
>                                 rc = thunk.future.get();
>                         }
>                          catch(Exception recordException) {
>                                 thunk.callback.onCompletion(null, recordException);
>                         }
>                         if(rc != null) {
>                                 thunk.callback.onCompletion(rc, null);
>                         }
>                 }
>                  else {
>                      thunk.callback.onCompletion(null, exception);
>                  }
>             } catch (Exception e) {
>                 log.error("Error executing user-provided callback on message for topic-partition {}:", topicPartition, e);
>             }
>         }
>     }



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)