You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Aakash Gupta (Jira)" <ji...@apache.org> on 2020/08/31 23:21:00 UTC

[jira] [Comment Edited] (KAFKA-2200) kafkaProducer.send() should not call callback.onCompletion()

    [ https://issues.apache.org/jira/browse/KAFKA-2200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17188042#comment-17188042 ] 

Aakash Gupta edited comment on KAFKA-2200 at 8/31/20, 11:20 PM:
----------------------------------------------------------------

Hi [~becket_qin] 
 I am willing to take this ticket. 

As of now till date, this is how exceptions are being handled in kafkaProducer.send() method:
{code:java}
catch (ApiException e) {
            log.debug("Exception occurred during message send:", e);
            if (callback != null)
                callback.onCompletion(null, e);
            this.errors.record();
            this.interceptors.onSendError(record, tp, e);
            return new FutureFailure(e);        
} catch (InterruptedException e) {
            this.errors.record();
            this.interceptors.onSendError(record, tp, e);
            throw new InterruptException(e);        
} catch (KafkaException e) {
            this.errors.record();
            this.interceptors.onSendError(record, tp, e);
            throw e;        
} catch (Exception e) {
            // we notify interceptor about all exceptions, since onSend is called before anything else in this method
            this.interceptors.onSendError(record, tp, e);
            throw e;        
}
{code}
 # TimeoutException in waiting for metadata update, what is your suggestion? How should it be handled if not via ApiException callback? As you mentioned, we are misusing this TimeoutException as idea was to use it only where replication couldn't complete within the allowed time, so should we create a new exception 'ClientTimeoutException' to handle such scenarios, and also use the same in waitOnMetadata() method ?
 # Validation of message size is throwing RecordTooLargeException which extends ApiException. In this case, you are correct to say that producer client is throwing RecordTooLargeException without even interacting with server.
You've suggested 2 scenarios which can cause exceptions :
 ## *If the size of serialised uncompressed message is more than maxRequestSize*: I'm not sure if we can estimate the size of message keeping compression type in consideration. So, current implementation throws RecordTooLargeException based on the ESTIMATE w/o keeping into account the compression type. What is the expected behaviour in this case? 
 ## *If the message size is bigger than the totalMemorySize or memoryBufferSize* : Buffer pool would throw IllegalArgumentException when asked for allocation. Should we just catch this exception, record it and throw it back?

[~becket_qin] Can you please answer above queries and validate my understanding? Apologies if I've misunderstood something as I am new to Kafka community.


was (Author: aakashgupta96):
Hi [~becket_qin] 
I am willing to take this ticket. 

As of now till date, this is how exceptions are being handled in kafkaProducer.send() method:
{code:java}
catch (ApiException e) {
            log.debug("Exception occurred during message send:", e);
            if (callback != null)
                callback.onCompletion(null, e);
            this.errors.record();
            this.interceptors.onSendError(record, tp, e);
            return new FutureFailure(e);        
} catch (InterruptedException e) {
            this.errors.record();
            this.interceptors.onSendError(record, tp, e);
            throw new InterruptException(e);        
} catch (KafkaException e) {
            this.errors.record();
            this.interceptors.onSendError(record, tp, e);
            throw e;        
} catch (Exception e) {
            // we notify interceptor about all exceptions, since onSend is called before anything else in this method
            this.interceptors.onSendError(record, tp, e);
            throw e;        
}
{code}
 # TimeoutException in waiting for metadata update, what is your suggestion? How should it be handled if not via ApiException callback? As you mentioned, we are misusing this TimeoutException as idea was to use it only where replication couldn't complete within the allowed time, so should we create a new exception 'ClientTimeoutException' to handle such scenarios, and also use the same in waitOnMetadata() method ?


 # Validation of message size is throwing RecordTooLargeException which extends ApiException. In this case, you are correct to say that producer client is throwing RecordTooLargeException without even interacting with server. 

You've suggested 2 scenarios which can cause exceptions :

 ## *If the size of serialised uncompressed message is more than maxRequestSize*: I'm not sure if we can estimate the size of message keeping compression type in consideration. So, current implementation throws RecordTooLargeException based on the ESTIMATE w/o keeping into account the compression type. What is the expected behaviour in this case? 


 ## *If the message size is bigger than the totalMemorySize or memoryBufferSize* : **Buffer pool would throw IllegalArgumentException when asked for allocation. Should we just catch this exception, record it and throw it back?

 

[~becket_qin] Can you please answer above queries and validate my understanding? Apologies if I've misunderstood something as I am new to Kafka community.

> kafkaProducer.send() should not call callback.onCompletion()
> ------------------------------------------------------------
>
>                 Key: KAFKA-2200
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2200
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 0.10.1.0
>            Reporter: Jiangjie Qin
>            Priority: Major
>              Labels: newbie
>
> KafkaProducer.send() should not call callback.onCompletion() because this might break the callback firing order.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)