You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Guozhang Wang (JIRA)" <ji...@apache.org> on 2015/02/13 21:04:11 UTC

[jira] [Commented] (KAFKA-1659) Ability to cleanly abort the KafkaProducer

    [ https://issues.apache.org/jira/browse/KAFKA-1659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14320673#comment-14320673 ] 

Guozhang Wang commented on KAFKA-1659:
--------------------------------------

One big use-case for this function call would be preserving ordering in the producer: say we have call send() for message 1, 2 and 3, when we get an error in the callback for message 2, we do not want the producer to continue sending 3 after it finishes this callback, and one way would be aborting the producer in the callback. Of course an even better solution would be an API such like drop() that cleans up the messages with the corresponding topic/partition in the buffer.

If we are pursuing the abort() function, since it may be called in the callback, killing the io-thread right away may not work appropriately.

An alternative approach would be adding an aborted flag to Sender, and have

{code}
public void abort() {
  this.sender.abort();
  this.close()
}
{code}

and in the Sender class, after the main loop check if aborted == true, if yes, clean up the buffer and return immediately; otherwise flush the buffer and return.

> Ability to cleanly abort the KafkaProducer
> ------------------------------------------
>
>                 Key: KAFKA-1659
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1659
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients, producer 
>    Affects Versions: 0.8.2.0
>            Reporter: Andrew Stein
>            Assignee: Jun Rao
>             Fix For: 0.8.3
>
>
> I would like the ability to "abort" the Java Client's KafkaProducer. This includes the stopping the writing of buffered records.
> The motivation for this is described [here|http://mail-archives.apache.org/mod_mbox/kafka-dev/201409.mbox/%3CCAOk4UxB7BJm6HSgLXrR01sksB2dOC3zdt0NHaKHz1EALR6%3DCTQ%40mail.gmail.com%3E].
> A sketch of this method is:
> {code}
> public void abort() {
>         try {
>             ioThread.interrupt();
>             ioThread.stop(new ThreadDeath());
>         } catch (IllegalAccessException e) {
>         }
> }
> {code}
> but of course it is preferable to stop the {{ioThread}} by cooperation, rather than use the deprecated {{Thread.stop(new ThreadDeath())}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)