You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Ciprian Pascu (JIRA)" <ji...@apache.org> on 2016/11/02 09:56:58 UTC

[jira] [Created] (KAFKA-4365) In case async producer closes the TCP connection to Kafka broker, last sent messages might be lost.

Ciprian Pascu created KAFKA-4365:
------------------------------------

             Summary: In case async producer closes the TCP connection to Kafka broker, last sent messages might be lost.
                 Key: KAFKA-4365
                 URL: https://issues.apache.org/jira/browse/KAFKA-4365
             Project: Kafka
          Issue Type: Bug
          Components: clients
    Affects Versions: 0.10.0.1
            Reporter: Ciprian Pascu


I am using kafka-python producer (https://github.com/dpkp/kafka-python). The producer is set as async (acks=0) and sends a burst of, for example, 1000 messages. As consumer I use either Logstash or the Kafka console consumer. Quite often it can be seen that the consumer gets less than 1000 messages. Also, by checking the messages written by the brokers on the disk, it can be seen that not all messages are written. Still, by using tcpdump and Wireshark, I can see that all messages have reached the brokers. Also, by adding some test logs in Kafka code, I could see that the messages are added to the staged receives, but not to completed receives (org.apache.kafka.common.network.Selector class). And I believe that happens because of the 'isMute' method in the classes implementing org.apache.kafka.common.network.TransportLayer: they all(both) seem to check also that the 'key' is valid, which doesn't hold true anymore if the TCP connection has been closed; despite that, Kafka has already those messages as staged receives, so it could add them to the log; besides, since acks=0, no responses are needed to be sent. 
This issue is not visible if acks=1 (synchronous producer) or the producer keeps the TCP connections to brokers all the time up or enough time for Kafka to actually write the logs to disk.
Proposed solution: remove the 'key.isValid()' check from 'isMute' method in SslTransportLayer and PlaintextTransportLayer classes (org.apache.kafka.common.network package.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)