You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Suresh Chandran (Jira)" <ji...@apache.org> on 2020/01/31 19:00:00 UTC

[jira] [Commented] (KAFKA-3686) Kafka producer is not fault tolerant

    [ https://issues.apache.org/jira/browse/KAFKA-3686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17027743#comment-17027743 ] 

Suresh Chandran commented on KAFKA-3686:
----------------------------------------

Hi,

Has this been fixed?

 

We have a set up, where in we have a cluster of 3 nodes ( N1, N2 and N3) that also run 3 kafka cluster nodes. During our testing, we separate N1 from N2 and N3. Now when we post some content to the producer in N1 , it throws an exception 

java.nio.channels.ClosedChannelException

        at kafka.network.BlockingChannel.send(BlockingChannel.scala:112)

        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:101)

        at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:86)

        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:135)

        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:135)

        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:135)

        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)

        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:134)

        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:134)

        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:134)

        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)

        at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:133)

        at kafka.consumer.ConsumerFetcherThread.fetch(ConsumerFetcherThread.scala:117)

        at kafka.consumer.ConsumerFetcherThread.fetch(ConsumerFetcherThread.scala:36)

        at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:149)

        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)

        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)

 

Please not that the leader for the particular topic was original in N2.  While facing this issue, I would like to know how the kafkas split brain cases are handled. If there are three nodes N1, N2 and N3 and if N1 is separated from others, can it post topic to N1 itself and work by itself ( ignoring the quorum part of it)? If in case N2 was the original leader of the topic, does that mean N1 can can never work and it will always be stuck?

 

Thanks

Suresh

> Kafka producer is not fault tolerant
> ------------------------------------
>
>                 Key: KAFKA-3686
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3686
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 0.9.0.1
>            Reporter: Luca Bruno
>            Priority: Major
>
> *Setup*
> I have a cluster of 3 kafka server, a topic with 12 partitions with replica 2, and a zookeeper cluster of 3 nodes.
> Producer config:
> {code}
>  props.put("bootstrap.servers", "k1:9092,k2:9092,k3:9092");
>  props.put("acks", "1");
>  props.put("batch.size", 16384);
>  props.put("retries", 3);
>  props.put("buffer.memory", 33554432);
>  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
>  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
> {code}
> Producer code:
> {code}
>  Producer<String, String> producer = new KafkaProducer<>(props);
>  for(int i = 0; i < 10; i++) {
>      Future<RecordMetadata> f = producer.send(new ProducerRecord<String, String>("topic", null, Integer.toString(i)));
>      f.get();
>  }
> {code}
> *Problem*
> Cut the network between the producer (p1) and one of the kafka servers (say k1).
> The cluster is healthy, hence the kafka bootstrap tells the producer that there are 3 kafka servers (as I understood it), and the leaders of the partitions of the topic.
> So the producer will send messages to all of the 3 leaders for each partition. If the leader happens to be k1 for a message, the producer raises the following exception after request.timeout.ms:
> {code}
> Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Batch Expired
> 	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56)
> 	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43)
> 	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
> 	at Test.main(Test.java:25)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Batch Expired
> {code}
> In theory, the application should handle the failure. In practice, messages are getting lost, even though there are other 2 leaders available for writing.
> I tried with values of acks both 1 and -1.
> *What I expected*
> Given the client library is automatically deciding the hashing / round robin schema for the partition, I would say it's not very important which partition the message is being sent to.
> I expect the client library to handle the failure by sending the message to a partition of a different leader.
> Neither kafka-clients nor rdkafka handle this failure. Given those are the main client libraries being used for kafka as far as I know, I find it a serious problem in terms of fault tolerance.
> EDIT: I cannot add comments to this issue, don't understand why. To answer [~fpj] yes, I want the first. In the case of network partitions I want to ensure my messages are stored. If the libraries don't do that, it means I have to reimplement them. Or otherwise, postpone sending such messages until the network partition resolves (which means implementing some kind of backlog on disk of the producer, which should instead be the kafka purpose after all). In both cases, it's something that is not documented and it's very inconvenient.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)