You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Shrikant Patel <SP...@pdxinc.com> on 2017/04/18 23:10:06 UTC

ZK and Kafka failover testing

Hi All,

I am seeing strange behavior between ZK and Kafka. We have 5 node in ZK and Kafka cluster each. Kafka version - 2.11-0.10.1.1

The min.insync.replicas is 3, replication.factor is 5 for all topics, unclean.leader.election.enable is false. We have 15 partitions for each topic.

The step we are following in our testing.


*         My understanding is that ZK needs aleast 3 out of 5 server to be functional. Kafka could not be functional without zookeeper. In out testing, we bring down 3 ZK nodes and don't touch Kafka nodes. Kafka is still functional, consumer\producer can still consume\publish from Kafka cluster. We then bring down all ZK nodes, Kafka consumer\producers are still functional. I am not able to understand why Kafka cluster is not failing as soon as majority of ZK nodes are down. I do see error in Kafka that it cannot connection to ZK cluster.



*         With all or majority of ZK node down, we bring down 1 Kafka nodes (out of 5, so 4 are running). And at that point the consumer and producer start failing. My guess is the new leadership election cannot happen without ZK.



*         Then we bring up the majority of ZK node up. (1st Kafka is still down) Now the Kafka cluster become functional, consumer and producer now start working again. But Consumer sees big junk of message from kafka, and many of them are duplicates. It's like these messages were held up somewhere, Where\Why I don't know?  And why the duplicates? I can understand few duplicates for messages that consumer would not commit before 1st node when down. But why so many duplicates and like 4 copy for each message. I cannot understand this behavior.

Appreciate some insight about our issues. Also if there are blogs that describe the ZK and Kafka failover scenario behaviors, that would be extremely helpful.

Thanks,
Shri

This e-mail and its contents (to include attachments) are the property of National Health Systems, Inc., its subsidiaries and affiliates, including but not limited to Rx.com Community Healthcare Network, Inc. and its subsidiaries, and may contain confidential and proprietary or privileged information. If you are not the intended recipient of this e-mail, you are hereby notified that any unauthorized disclosure, copying, or distribution of this e-mail or of its attachments, or the taking of any unauthorized action based on information contained herein is strictly prohibited. Unauthorized use of information contained herein may subject you to civil and criminal prosecution and penalties. If you are not the intended recipient, please immediately notify the sender by telephone at 800-433-5719 or return e-mail and permanently delete the original e-mail.

Re: Re: Re: ZK and Kafka failover testing

Posted by Jun Rao <ju...@confluent.io>.
Hi, Shri,

So, it seems that the duplicates are introduced by the publisher.
Currently, producer retries can introduce duplicates. We are trying to
eliminate that in the EOS work (KIP-98). We plan to have a beta version of
that available in 0.11.0.

Hi, Jeff,

We have a known issue of recovering from ZK session expiration in the
controller. The controller code is getting a bit complicated and is the
source of several corner case bugs. We have started working on controller
improvement and will get to that particular issue in the next few months.
You can follow the progress in KAFKA-5027.

Thanks,

Jun


On Wed, Apr 19, 2017 at 3:48 PM, Shrikant Patel <SP...@pdxinc.com> wrote:

> Thanks Jeff, Onur, Jun, Hans. I am learning a lot from your response.
>
> Just to summarize briefly my steps, 5 node Kafka and ZK cluster.
> 1. ZK cluster has all node working. Consumer is down.
> 2. Bring down majority of ZK nodes.
> 3. Thing are functional no issue (no dup or lost message)
> 4. Now first kafka node come down.
> 5. My issue start happening - as you see below producer says message with
> key 34 and 35 failed.
> 6. Bring majority of ZK node up.
> 7. Other kafka nodes assumes leadership for node 1's topic.
> 8. Bring consumer up, it starts consuming from the last offset and I see
> duplicates. I see message 34 (3 times) and 35 (4 times)
>
>
> Jeff, in my case I don’t see issue with kafka cluster recovering, once the
> majority ZK nodes are up, other Kafka takes up leadership for down node
> immediately.
> Onur, as Jun mentioned since I have acks=all, I am not seeing any messages
> being lost.
>
> Jun, Hans, I had same thought of trying to eliminate the consumer getting
> duplicate because of incorrectly acking the message. In next run of this
> test case, I was not run client at all. My consumer, producer properties
> are in first email in this thread. As I understand RetriableException is
> for temporary issue and I would like retry to see if issue resolves itself
> by then, hence producer has retries =3
>
> Producer log
>
> ******************* Publisher #  Paritition - 12 Key - 26 Value - value 26
>  ******************* Publisher #  Paritition - 13 Key - 27 Value - value 27
>  ******************* Publisher #  Paritition - 14 Key - 28 Value - value 28
>  ******************* Publisher #  Paritition - 0 Key - 29 Value - value 29
>  ******************* Publisher #  Paritition - 1 Key - 30 Value - value 30
>  ******************* Publisher #  Paritition - 2 Key - 31 Value - value 31
>  ******************* Publisher #  Paritition - 3 Key - 32 Value - value 32
>  ******************* Publisher #  Paritition - 4 Key - 33 Value - value 33
>  ******************* Publisher #  Paritition - 5 Key - 34 Value - value 34
> 2017-04-19 16:39:08.008  WARN 399580 --- [| shri-producer]
> o.a.k.clients.producer.internals.Sender  : Got error produce response
> with correlation id 37 on topic-partition ${topic-name}-5, retrying (2
> attempts left). Error: NETWORK_EXCEPTION
> 2017-04-19 16:39:39.128  WARN 399580 --- [| shri-producer]
> o.a.k.clients.producer.internals.Sender  : Got error produce response
> with correlation id 39 on topic-partition ${topic-name}-5, retrying (1
> attempts left). Error: NETWORK_EXCEPTION
> 2017-04-19 16:40:10.271  WARN 399580 --- [| shri-producer]
> o.a.k.clients.producer.internals.Sender  : Got error produce response
> with correlation id 41 on topic-partition ${topic-name}-5, retrying (0
> attempts left). Error: NETWORK_EXCEPTION
> 2017-04-19 16:40:41.419 ERROR 399580 --- [| shri-producer] o.s.k.support.LoggingProducerListener
>   : Exception thrown when sending a message with key='34' and
> payload='value 34' to topic ${topic-name} and partition 5:
> org.apache.kafka.common.errors.NetworkException: The server disconnected
> before a response was received.
> 2017-04-19 16:42:50.621  INFO 399580 --- [pool-1-thread-1]
> c.p.p.SpringKafkaPublisher_Simple        : ******************* Failed to
> publish  Paritition - 5 Key - 34 Value - value 34
> java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException:
> Failed to send; nested exception is org.apache.kafka.common.errors.NetworkException:
> The server disconnected before a response was received.
> 2017-04-19 16:42:51.001  INFO 399580 --- [pool-1-thread-1]
> c.p.p.SpringKafkaPublisher_Simple        : ******************* Publisher
> #  Paritition - 6 Key - 35 Value - value 35
> 2017-04-19 16:43:21.010  WARN 399580 --- [| shri-producer]
> o.a.k.clients.producer.internals.Sender  : Got error produce response
> with correlation id 49 on topic-partition ${topic-name}-6, retrying (2
> attempts left). Error: NETWORK_EXCEPTION
> 2017-04-19 16:43:52.152  WARN 399580 --- [| shri-producer]
> o.a.k.clients.producer.internals.Sender  : Got error produce response
> with correlation id 51 on topic-partition ${topic-name}-6, retrying (1
> attempts left). Error: NETWORK_EXCEPTION
> 2017-04-19 16:44:23.234  WARN 399580 --- [| shri-producer]
> o.a.k.clients.producer.internals.Sender  : Got error produce response
> with correlation id 53 on topic-partition ${topic-name}-6, retrying (0
> attempts left). Error: NETWORK_EXCEPTION
> 2017-04-19 16:44:54.421 ERROR 399580 --- [| shri-producer] o.s.k.support.LoggingProducerListener
>   : Exception thrown when sending a message with key='35' and
> payload='value 35' to topic ${topic-name} and partition 6:
> org.apache.kafka.common.errors.NetworkException: The server disconnected
> before a response was received.
>
> Consumer log (consumer only started at the very end of the test scenario)
> value 21
> value 22
> value 23
> value 24
> value 25
> value 26
> value 27
> value 28
> value 29
> value 30
> value 31
> value 32
> value 33
> value 34
> value 34
> value 34
> value 35
> value 35
> value 35
> value 35
>
> Output of describe command at point 1.
>
> Topic:${topic-name}   PartitionCount:15       ReplicationFactor:5
>  Configs:min.insync.replicas=3
>         Topic: ${topic-name}  Partition: 0    Leader: 5       Replicas:
> 5,4,1,2,3     Isr: 5,1,2,3,4
>         Topic: ${topic-name}  Partition: 1    Leader: 1       Replicas:
> 1,5,2,3,4     Isr: 5,1,2,3,4
>         Topic: ${topic-name}  Partition: 2    Leader: 2       Replicas:
> 2,1,3,4,5     Isr: 5,1,2,3,4
>         Topic: ${topic-name}  Partition: 3    Leader: 3       Replicas:
> 3,2,4,5,1     Isr: 5,1,2,3,4
>         Topic: ${topic-name}  Partition: 4    Leader: 4       Replicas:
> 4,3,5,1,2     Isr: 5,1,2,3,4
>         Topic: ${topic-name}  Partition: 5    Leader: 5       Replicas:
> 5,1,2,3,4     Isr: 5,1,2,3,4
>         Topic: ${topic-name}  Partition: 6    Leader: 1       Replicas:
> 1,2,3,4,5     Isr: 5,1,2,3,4
>         Topic: ${topic-name}  Partition: 7    Leader: 2       Replicas:
> 2,3,4,5,1     Isr: 5,1,2,3,4
>         Topic: ${topic-name}  Partition: 8    Leader: 3       Replicas:
> 3,4,5,1,2     Isr: 5,1,2,3,4
>         Topic: ${topic-name}  Partition: 9    Leader: 4       Replicas:
> 4,5,1,2,3     Isr: 5,1,2,3,4
>         Topic: ${topic-name}  Partition: 10   Leader: 5       Replicas:
> 5,2,3,4,1     Isr: 5,1,2,3,4
>         Topic: ${topic-name}  Partition: 11   Leader: 1       Replicas:
> 1,3,4,5,2     Isr: 5,1,2,3,4
>         Topic: ${topic-name}  Partition: 12   Leader: 2       Replicas:
> 2,4,5,1,3     Isr: 5,1,2,3,4
>         Topic: ${topic-name}  Partition: 13   Leader: 3       Replicas:
> 3,5,1,2,4     Isr: 5,1,2,3,4
>         Topic: ${topic-name}  Partition: 14   Leader: 4       Replicas:
> 4,1,2,3,5     Isr: 5,1,2,3,4
>
> (since majority ZK are down at point 6 my describe command does not work)
> Output of describe command at point 2.
>
> Topic:${topic-name}   PartitionCount:15       ReplicationFactor:5
>  Configs:min.insync.replicas=3
>         Topic: ${topic-name}  Partition: 0    Leader: 5       Replicas:
> 5,4,1,2,3     Isr: 2,3,4,5
>         Topic: ${topic-name}  Partition: 1    Leader: 5       Replicas:
> 1,5,2,3,4     Isr: 2,5,3,4
>         Topic: ${topic-name}  Partition: 2    Leader: 2       Replicas:
> 2,1,3,4,5     Isr: 4,2,5,3
>         Topic: ${topic-name}  Partition: 3    Leader: 3       Replicas:
> 3,2,4,5,1     Isr: 3,4,2,5
>         Topic: ${topic-name}  Partition: 4    Leader: 4       Replicas:
> 4,3,5,1,2     Isr: 2,5,3,4
>         Topic: ${topic-name}  Partition: 5    Leader: 5       Replicas:
> 5,1,2,3,4     Isr: 4,5,2,3
>         Topic: ${topic-name}  Partition: 6    Leader: 2       Replicas:
> 1,2,3,4,5     Isr: 3,4,5,2
>         Topic: ${topic-name}  Partition: 7    Leader: 2       Replicas:
> 2,3,4,5,1     Isr: 2,3,5,4
>         Topic: ${topic-name}  Partition: 8    Leader: 3       Replicas:
> 3,4,5,1,2     Isr: 2,4,5,3
>         Topic: ${topic-name}  Partition: 9    Leader: 4       Replicas:
> 4,5,1,2,3     Isr: 3,4,2,5
>         Topic: ${topic-name}  Partition: 10   Leader: 5       Replicas:
> 5,2,3,4,1     Isr: 5,2,3,4
>         Topic: ${topic-name}  Partition: 11   Leader: 3       Replicas:
> 1,3,4,5,2     Isr: 5,2,3,4
>         Topic: ${topic-name}  Partition: 12   Leader: 2       Replicas:
> 2,4,5,1,3     Isr: 4,3,5,2
>         Topic: ${topic-name}  Partition: 13   Leader: 3       Replicas:
> 3,5,1,2,4     Isr: 5,3,2,4
>         Topic: ${topic-name}  Partition: 14   Leader: 4       Replicas:
> 4,1,2,3,5     Isr: 4,2,5,3
>
> Thanks,
> Shri
>
>
> -----Original Message-----
> From: Jeff Widman [mailto:jeff@netskope.com]
> Sent: Wednesday, April 19, 2017 4:11 PM
> To: users@kafka.apache.org
> Subject: [EXTERNAL] Re: Re: ZK and Kafka failover testing
>
> ***** Notice: This email was received from an external source *****
>
> Oops, I linked to the wrong ticket, this is the one we hit:
> https://issues.apache.org/jira/browse/KAFKA-3042
>
> On Wed, Apr 19, 2017 at 1:45 PM, Jeff Widman <je...@netskope.com> wrote:
>
> >
> >
> >
> >
> >
> > *As Onur explained, if ZK is down, Kafka can still work, but won't be
> > able to react to actual broker failures until ZK is up again. So if a
> > broker is down in that window, some of the partitions may not be ready
> > for read or
> > write.*
> > We had a production scenario where ZK had a long GC pause and Kafka
> > lost connection temporarily. The brokers kept sending data just fine
> > for existing topics. However, when ZK came back, the kafka cluster
> > could not recover gracefully because of this issue:
> > https://issues.apache.org/
> > jira/browse/KAFKA-2729
> > IIRC, in our case, the cached data that was mismatched was the
> > controller generations in zookeeper for the partition leaders did not
> > match the generation id listed in the controller znode.
> > Manually forcing a controller re-election solved this because it
> > brought all generation IDs in sync. However, it would have been nice
> > if the cluster had been able to automatically do the controller
> > re-election without waiting for manual intervention.
> >
> > On Wed, Apr 19, 2017 at 1:30 PM, Jun Rao <ju...@confluent.io> wrote:
> >
> >> Hi, Shri,
> >>
> >> As Onur explained, if ZK is down, Kafka can still work, but won't be
> >> able to react to actual broker failures until ZK is up again. So if a
> >> broker is down in that window, some of the partitions may not be
> >> ready for read or write.
> >>
> >> As for the duplicates in the consumer, Hans had a good point. It
> >> would be useful to see if the duplicates are introduced by the
> >> producer or the consumer. Perhaps you can read the log again and see
> >> if duplicates are in the log in the first place. Note that broker
> >> retries can introduce duplicates.
> >>
> >> Hi, Onur,
> >>
> >> For the data loss issue that you mentioned, that should only happen
> >> with acks=1. As we discussed offline, if acks=all is used and unclean
> >> leader election is disabled, acked messages shouldn't be lost.
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >>
> >> On Wed, Apr 19, 2017 at 10:19 AM, Onur Karaman <
> >> onurkaraman.apache@gmail.com
> >> > wrote:
> >>
> >> > If this is what I think it is, it has nothing to do with acks,
> >> > max.in.flight.requests.per.connection, or anything client-side and
> >> > is purely about the kafka cluster.
> >> >
> >> > Here's a simple example involving a single zookeeper instance, 3
> >> brokers, a
> >> > KafkaConsumer and KafkaProducer (neither of these clients interact
> >> > with zookeeper).
> >> > 1. start up zookeeper:
> >> > > ./bin/zookeeper-server-start.sh config/zookeeper.properties
> >> >
> >> > 2. start up some brokers:
> >> > > ./bin/kafka-server-start.sh config/server0.properties
> >> > > ./bin/kafka-server-start.sh config/server1.properties
> >> > > ./bin/kafka-server-start.sh config/server2.properties
> >> >
> >> > 3 create a topic:
> >> > > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic
> >> > > t
> >> > --partitions 1 --replication-factor 3
> >> >
> >> > 4. start a console consumer (this needs to happen before step 5 so
> >> > we
> >> can
> >> > write __consumer_offsets metadata to zookeeper):
> >> > > ./bin/kafka-console-consumer.sh --broker-list
> >> > localhost:9090,localhost:9091,localhost:9092 --topic t
> >> >
> >> > 5. kill zookeeper
> >> >
> >> > 6. start a console producer and produce some messages:
> >> > > ./bin/kafka-console-producer.sh --broker-list
> >> > localhost:9090,localhost:9091,localhost:9092 --topic t
> >> >
> >> > 7. notice the size of the broker logs grow with each message you send:
> >> > > l /tmp/kafka-logs*/t-0
> >> >
> >> > 8. notice the consumer consuming the messages being produced
> >> >
> >> > Basically, zookeeper can be completely offline and your brokers
> >> > will
> >> append
> >> > to logs and process client requests just fine as long as it doesn't
> >> need to
> >> > interact with zookeeper. Today, the only way a broker knows to stop
> >> > accepting requests is when it receives instruction from the
> controller.
> >> >
> >> > I first realized this last July when debugging a small production
> >> > data
> >> loss
> >> > scenario that was a result of this[1]. Maybe this is an attempt at
> >> leaning
> >> > towards availability over consistency. Personally I think that
> >> > brokers should stop accepting requests when it disconnects from
> zookeeper.
> >> >
> >> > [1] The small production data loss scenario happens when accepting
> >> requests
> >> > during the small window in between a broker's zookeeper session
> >> expiration
> >> > and when the controller instructs the broker to stop accepting
> requests.
> >> > During this time, the broker still thinks it leads partitions that
> >> > are currently being led by another broker, effectively resulting in
> >> > a window where the partition is led by two brokers. Clients can
> >> > continue sending requests to the old leader, and for producers with
> >> > low acknowledgement settings (like ack=1), their messages will be
> >> > lost without the client knowing, as the messages are being appended
> >> > to the phantom leader's logs instead of the true leader's logs.
> >> >
> >> > On Wed, Apr 19, 2017 at 7:56 AM, Shrikant Patel <SP...@pdxinc.com>
> >> wrote:
> >> >
> >> > > While we were testing, our producer had following configuration
> >> > > max.in.flight.requests.per.connection=1, acks= all and retries=3.
> >> > >
> >> > > The entire producer side set is below. The consumer has manual
> >> > > offset commit, it commit offset after it has successfully
> >> > > processed the
> >> message.
> >> > >
> >> > > Producer setting
> >> > > bootstrap.servers​= {point the F5 VS fronting Kafka cluster}
> >> > > key.serializer= {appropriate value as per your cases}
> >> > > value.serializer= {appropriate value as per your case} acks= all
> >> > > retries=3
> >> > > ssl.key.password= {appropriate value as per your case}
> >> > > ssl.keystore.location= {appropriate value as per your case}
> >> > > ssl.keystore.password= {appropriate value as per your case}
> >> > > ssl.truststore.location= {appropriate value as per your case}
> >> > > ssl.truststore.password= {appropriate value as per your case}
> >> > > batch.size=16384​ client.id= {appropriate value as per your case,
> >> > > may help with
> >> debugging}
> >> > > max.block.ms​=65000
> >> > > request.timeout.ms=30000
> >> > > security.protocol= SSL
> >> > > ssl.enabled.protocols=TLSv1.2
> >> > > ssl.keystore.type=JKS
> >> > > ssl.protocol=TLSv1.2
> >> > > ssl.truststore.type=JKS
> >> > > max.in.flight.requests.per.connection=1
> >> > > metadata.fetch.timeout.ms=60000
> >> > > reconnect.backoff.ms=1000
> >> > > retry.backoff.ms​=1000
> >> > > max.request.size=1048576​​
> >> > > linger.ms=0
> >> > >
> >> > > Consumer setting
> >> > > bootstrap.servers​= {point the F5 VS fronting Kafka cluster}
> >> > > key.deserializer= {appropriate value as per your cases}
> >> > > value.deserializer= {appropriate value as per your case}
> >> > > group.id= {appropriate value as per your case} ssl.key.password=
> >> > > {appropriate value as per your case} ssl.keystore.location=
> >> > > {appropriate value as per your case} ssl.keystore.password=
> >> > > {appropriate value as per your case} ssl.truststore.location=
> >> > > {appropriate value as per your case} ssl.truststore.password=
> >> > > {appropriate value as per your case} enable.auto.commit=false
> >> > > security.protocol= SSL
> >> > > ssl.enabled.protocols=TLSv1.2
> >> > > ssl.keystore.type=JKS
> >> > > ssl.protocol=TLSv1.2
> >> > > ssl.truststore.type=JKS
> >> > > client.id= {appropriate value as per your case, may help with
> >> > debugging}​
> >> > > reconnect.backoff.ms=1000
> >> > > retry.backoff.ms​=1000​
> >> > >
> >> > > Thanks,
> >> > > Shri
> >> > >
> >> > > -----Original Message-----
> >> > > From: Hans Jespersen [mailto:hans@confluent.io]
> >> > > Sent: Tuesday, April 18, 2017 7:57 PM
> >> > > To: users@kafka.apache.org
> >> > > Subject: [EXTERNAL] Re: ZK and Kafka failover testing
> >> > >
> >> > > ***** Notice: This email was received from an external source
> >> > > *****
> >> > >
> >> > > When you publish, is acks=0,1 or all (-1)?
> >> > > What is max.in.flight.requests.per.connection (default is 5)?
> >> > >
> >> > > It sounds to me like your publishers are using acks=0 and so they
> >> > > are
> >> not
> >> > > actually succeeding in publishing (i.e. you are getting no acks)
> >> > > but
> >> they
> >> > > will retry over and over and will have up to 5 retries in flight,
> >> > > so
> >> when
> >> > > the broker comes back up, you are getting 4 or 5 copies of the
> >> > > same
> >> > message.
> >> > >
> >> > > Try setting max.in.flight.requests.per.connection=1 to get rid of
> >> > > duplicates Try setting acks=all to ensure the messages are being
> >> > persisted
> >> > > by the leader and all the available replicas in the kafka cluster.
> >> > >
> >> > > -hans
> >> > >
> >> > > /**
> >> > >  * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
> >> > >  * hans@confluent.io (650)924-2670  */
> >> > >
> >> > > On Tue, Apr 18, 2017 at 4:10 PM, Shrikant Patel
> >> > > <SP...@pdxinc.com>
> >> > wrote:
> >> > >
> >> > > > Hi All,
> >> > > >
> >> > > > I am seeing strange behavior between ZK and Kafka. We have 5
> >> > > > node in ZK and Kafka cluster each. Kafka version -
> >> > > > 2.11-0.10.1.1
> >> > > >
> >> > > > The min.insync.replicas is 3, replication.factor is 5 for all
> >> topics,
> >> > > > unclean.leader.election.enable is false. We have 15 partitions
> >> > > > for each topic.
> >> > > >
> >> > > > The step we are following in our testing.
> >> > > >
> >> > > >
> >> > > > *         My understanding is that ZK needs aleast 3 out of 5
> >> server to
> >> > > be
> >> > > > functional. Kafka could not be functional without zookeeper. In
> >> > > > out testing, we bring down 3 ZK nodes and don't touch Kafka
> >> > > > nodes. Kafka is still functional, consumer\producer can still
> >> > > > consume\publish
> >> from
> >> > > > Kafka cluster. We then bring down all ZK nodes, Kafka
> >> > > > consumer\producers are still functional. I am not able to
> >> > > > understand why Kafka cluster is not failing as soon as majority
> >> > > > of ZK nodes are down. I do see error in Kafka that it cannot
> >> > > > connection to ZK
> >> cluster.
> >> > > >
> >> > > >
> >> > > >
> >> > > > *         With all or majority of ZK node down, we bring down 1
> >> Kafka
> >> > > > nodes (out of 5, so 4 are running). And at that point the
> >> > > > consumer
> >> and
> >> > > > producer start failing. My guess is the new leadership election
> >> cannot
> >> > > > happen without ZK.
> >> > > >
> >> > > >
> >> > > >
> >> > > > *         Then we bring up the majority of ZK node up. (1st Kafka
> is
> >> > > still
> >> > > > down) Now the Kafka cluster become functional, consumer and
> >> > > > producer now start working again. But Consumer sees big junk of
> >> > > > message from kafka, and many of them are duplicates. It's like
> >> > > > these messages
> >> were
> >> > > > held up somewhere, Where\Why I don't know?  And why the
> duplicates?
> >> I
> >> > > > can understand few duplicates for messages that consumer would
> >> > > > not commit before 1st node when down. But why so many
> >> > > > duplicates and
> >> like
> >> > > > 4 copy for each message. I cannot understand this behavior.
> >> > > >
> >> > > > Appreciate some insight about our issues. Also if there are
> >> > > > blogs
> >> that
> >> > > > describe the ZK and Kafka failover scenario behaviors, that
> >> > > > would be extremely helpful.
> >> > > >
> >> > > > Thanks,
> >> > > > Shri
> >> > > >
> >> > > > This e-mail and its contents (to include attachments) are the
> >> property
> >> > > > of National Health Systems, Inc., its subsidiaries and
> >> > > > affiliates, including but not limited to Rx.com Community
> >> > > > Healthcare Network,
> >> Inc.
> >> > > > and its subsidiaries, and may contain confidential and
> >> > > > proprietary
> >> or
> >> > > > privileged information. If you are not the intended recipient
> >> > > > of
> >> this
> >> > > > e-mail, you are hereby notified that any unauthorized
> >> > > > disclosure, copying, or distribution of this e-mail or of its
> >> > > > attachments, or
> >> the
> >> > > > taking of any unauthorized action based on information
> >> > > > contained
> >> herein
> >> > > is strictly prohibited.
> >> > > > Unauthorized use of information contained herein may subject
> >> > > > you to civil and criminal prosecution and penalties. If you are
> >> > > > not the intended recipient, please immediately notify the
> >> > > > sender by
> >> telephone
> >> > > > at
> >> > > > 800-433-5719 <(800)%20433-5719> or return e-mail and
> >> > > > permanently
> >> delete the original
> >> > e-mail.
> >> > > >
> >> > > This e-mail and its contents (to include attachments) are the
> >> property of
> >> > > National Health Systems, Inc., its subsidiaries and affiliates,
> >> including
> >> > > but not limited to Rx.com Community Healthcare Network, Inc. and
> >> > > its subsidiaries, and may contain confidential and proprietary or
> >> privileged
> >> > > information. If you are not the intended recipient of this
> >> > > e-mail, you
> >> > are
> >> > > hereby notified that any unauthorized disclosure, copying, or
> >> > distribution
> >> > > of this e-mail or of its attachments, or the taking of any
> >> unauthorized
> >> > > action based on information contained herein is strictly prohibited.
> >> > > Unauthorized use of information contained herein may subject you
> >> > > to
> >> civil
> >> > > and criminal prosecution and penalties. If you are not the
> >> > > intended recipient, please immediately notify the sender by
> >> > > telephone at
> >> > > 800-433-5719 <(800)%20433-5719> or return e-mail and permanently
> >> delete the original e-mail.
> >> > >
> >> >
> >>
> >
> >
> This e-mail and its contents (to include attachments) are the property of
> National Health Systems, Inc., its subsidiaries and affiliates, including
> but not limited to Rx.com Community Healthcare Network, Inc. and its
> subsidiaries, and may contain confidential and proprietary or privileged
> information. If you are not the intended recipient of this e-mail, you are
> hereby notified that any unauthorized disclosure, copying, or distribution
> of this e-mail or of its attachments, or the taking of any unauthorized
> action based on information contained herein is strictly prohibited.
> Unauthorized use of information contained herein may subject you to civil
> and criminal prosecution and penalties. If you are not the intended
> recipient, please immediately notify the sender by telephone at
> 800-433-5719 or return e-mail and permanently delete the original e-mail.
>

RE: Re: Re: ZK and Kafka failover testing

Posted by Shrikant Patel <SP...@pdxinc.com>.
Thanks Jeff, Onur, Jun, Hans. I am learning a lot from your response.

Just to summarize briefly my steps, 5 node Kafka and ZK cluster.
1. ZK cluster has all node working. Consumer is down.
2. Bring down majority of ZK nodes.
3. Thing are functional no issue (no dup or lost message)
4. Now first kafka node come down.
5. My issue start happening - as you see below producer says message with key 34 and 35 failed.
6. Bring majority of ZK node up.
7. Other kafka nodes assumes leadership for node 1's topic.
8. Bring consumer up, it starts consuming from the last offset and I see duplicates. I see message 34 (3 times) and 35 (4 times)


Jeff, in my case I don’t see issue with kafka cluster recovering, once the majority ZK nodes are up, other Kafka takes up leadership for down node immediately.
Onur, as Jun mentioned since I have acks=all, I am not seeing any messages being lost.

Jun, Hans, I had same thought of trying to eliminate the consumer getting duplicate because of incorrectly acking the message. In next run of this test case, I was not run client at all. My consumer, producer properties are in first email in this thread. As I understand RetriableException is for temporary issue and I would like retry to see if issue resolves itself by then, hence producer has retries =3

Producer log

******************* Publisher #  Paritition - 12 Key - 26 Value - value 26
 ******************* Publisher #  Paritition - 13 Key - 27 Value - value 27
 ******************* Publisher #  Paritition - 14 Key - 28 Value - value 28
 ******************* Publisher #  Paritition - 0 Key - 29 Value - value 29
 ******************* Publisher #  Paritition - 1 Key - 30 Value - value 30
 ******************* Publisher #  Paritition - 2 Key - 31 Value - value 31
 ******************* Publisher #  Paritition - 3 Key - 32 Value - value 32
 ******************* Publisher #  Paritition - 4 Key - 33 Value - value 33
 ******************* Publisher #  Paritition - 5 Key - 34 Value - value 34
2017-04-19 16:39:08.008  WARN 399580 --- [| shri-producer] o.a.k.clients.producer.internals.Sender  : Got error produce response with correlation id 37 on topic-partition ${topic-name}-5, retrying (2 attempts left). Error: NETWORK_EXCEPTION
2017-04-19 16:39:39.128  WARN 399580 --- [| shri-producer] o.a.k.clients.producer.internals.Sender  : Got error produce response with correlation id 39 on topic-partition ${topic-name}-5, retrying (1 attempts left). Error: NETWORK_EXCEPTION
2017-04-19 16:40:10.271  WARN 399580 --- [| shri-producer] o.a.k.clients.producer.internals.Sender  : Got error produce response with correlation id 41 on topic-partition ${topic-name}-5, retrying (0 attempts left). Error: NETWORK_EXCEPTION
2017-04-19 16:40:41.419 ERROR 399580 --- [| shri-producer] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='34' and payload='value 34' to topic ${topic-name} and partition 5:
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
2017-04-19 16:42:50.621  INFO 399580 --- [pool-1-thread-1] c.p.p.SpringKafkaPublisher_Simple        : ******************* Failed to publish  Paritition - 5 Key - 34 Value - value 34
java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
2017-04-19 16:42:51.001  INFO 399580 --- [pool-1-thread-1] c.p.p.SpringKafkaPublisher_Simple        : ******************* Publisher #  Paritition - 6 Key - 35 Value - value 35
2017-04-19 16:43:21.010  WARN 399580 --- [| shri-producer] o.a.k.clients.producer.internals.Sender  : Got error produce response with correlation id 49 on topic-partition ${topic-name}-6, retrying (2 attempts left). Error: NETWORK_EXCEPTION
2017-04-19 16:43:52.152  WARN 399580 --- [| shri-producer] o.a.k.clients.producer.internals.Sender  : Got error produce response with correlation id 51 on topic-partition ${topic-name}-6, retrying (1 attempts left). Error: NETWORK_EXCEPTION
2017-04-19 16:44:23.234  WARN 399580 --- [| shri-producer] o.a.k.clients.producer.internals.Sender  : Got error produce response with correlation id 53 on topic-partition ${topic-name}-6, retrying (0 attempts left). Error: NETWORK_EXCEPTION
2017-04-19 16:44:54.421 ERROR 399580 --- [| shri-producer] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='35' and payload='value 35' to topic ${topic-name} and partition 6:
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.

Consumer log (consumer only started at the very end of the test scenario)
value 21
value 22
value 23
value 24
value 25
value 26
value 27
value 28
value 29
value 30
value 31
value 32
value 33
value 34
value 34
value 34
value 35
value 35
value 35
value 35

Output of describe command at point 1.

Topic:${topic-name}   PartitionCount:15       ReplicationFactor:5     Configs:min.insync.replicas=3
        Topic: ${topic-name}  Partition: 0    Leader: 5       Replicas: 5,4,1,2,3     Isr: 5,1,2,3,4
        Topic: ${topic-name}  Partition: 1    Leader: 1       Replicas: 1,5,2,3,4     Isr: 5,1,2,3,4
        Topic: ${topic-name}  Partition: 2    Leader: 2       Replicas: 2,1,3,4,5     Isr: 5,1,2,3,4
        Topic: ${topic-name}  Partition: 3    Leader: 3       Replicas: 3,2,4,5,1     Isr: 5,1,2,3,4
        Topic: ${topic-name}  Partition: 4    Leader: 4       Replicas: 4,3,5,1,2     Isr: 5,1,2,3,4
        Topic: ${topic-name}  Partition: 5    Leader: 5       Replicas: 5,1,2,3,4     Isr: 5,1,2,3,4
        Topic: ${topic-name}  Partition: 6    Leader: 1       Replicas: 1,2,3,4,5     Isr: 5,1,2,3,4
        Topic: ${topic-name}  Partition: 7    Leader: 2       Replicas: 2,3,4,5,1     Isr: 5,1,2,3,4
        Topic: ${topic-name}  Partition: 8    Leader: 3       Replicas: 3,4,5,1,2     Isr: 5,1,2,3,4
        Topic: ${topic-name}  Partition: 9    Leader: 4       Replicas: 4,5,1,2,3     Isr: 5,1,2,3,4
        Topic: ${topic-name}  Partition: 10   Leader: 5       Replicas: 5,2,3,4,1     Isr: 5,1,2,3,4
        Topic: ${topic-name}  Partition: 11   Leader: 1       Replicas: 1,3,4,5,2     Isr: 5,1,2,3,4
        Topic: ${topic-name}  Partition: 12   Leader: 2       Replicas: 2,4,5,1,3     Isr: 5,1,2,3,4
        Topic: ${topic-name}  Partition: 13   Leader: 3       Replicas: 3,5,1,2,4     Isr: 5,1,2,3,4
        Topic: ${topic-name}  Partition: 14   Leader: 4       Replicas: 4,1,2,3,5     Isr: 5,1,2,3,4

(since majority ZK are down at point 6 my describe command does not work)
Output of describe command at point 2.

Topic:${topic-name}   PartitionCount:15       ReplicationFactor:5     Configs:min.insync.replicas=3
        Topic: ${topic-name}  Partition: 0    Leader: 5       Replicas: 5,4,1,2,3     Isr: 2,3,4,5
        Topic: ${topic-name}  Partition: 1    Leader: 5       Replicas: 1,5,2,3,4     Isr: 2,5,3,4
        Topic: ${topic-name}  Partition: 2    Leader: 2       Replicas: 2,1,3,4,5     Isr: 4,2,5,3
        Topic: ${topic-name}  Partition: 3    Leader: 3       Replicas: 3,2,4,5,1     Isr: 3,4,2,5
        Topic: ${topic-name}  Partition: 4    Leader: 4       Replicas: 4,3,5,1,2     Isr: 2,5,3,4
        Topic: ${topic-name}  Partition: 5    Leader: 5       Replicas: 5,1,2,3,4     Isr: 4,5,2,3
        Topic: ${topic-name}  Partition: 6    Leader: 2       Replicas: 1,2,3,4,5     Isr: 3,4,5,2
        Topic: ${topic-name}  Partition: 7    Leader: 2       Replicas: 2,3,4,5,1     Isr: 2,3,5,4
        Topic: ${topic-name}  Partition: 8    Leader: 3       Replicas: 3,4,5,1,2     Isr: 2,4,5,3
        Topic: ${topic-name}  Partition: 9    Leader: 4       Replicas: 4,5,1,2,3     Isr: 3,4,2,5
        Topic: ${topic-name}  Partition: 10   Leader: 5       Replicas: 5,2,3,4,1     Isr: 5,2,3,4
        Topic: ${topic-name}  Partition: 11   Leader: 3       Replicas: 1,3,4,5,2     Isr: 5,2,3,4
        Topic: ${topic-name}  Partition: 12   Leader: 2       Replicas: 2,4,5,1,3     Isr: 4,3,5,2
        Topic: ${topic-name}  Partition: 13   Leader: 3       Replicas: 3,5,1,2,4     Isr: 5,3,2,4
        Topic: ${topic-name}  Partition: 14   Leader: 4       Replicas: 4,1,2,3,5     Isr: 4,2,5,3

Thanks,
Shri


-----Original Message-----
From: Jeff Widman [mailto:jeff@netskope.com]
Sent: Wednesday, April 19, 2017 4:11 PM
To: users@kafka.apache.org
Subject: [EXTERNAL] Re: Re: ZK and Kafka failover testing

***** Notice: This email was received from an external source *****

Oops, I linked to the wrong ticket, this is the one we hit:
https://issues.apache.org/jira/browse/KAFKA-3042

On Wed, Apr 19, 2017 at 1:45 PM, Jeff Widman <je...@netskope.com> wrote:

>
>
>
>
>
> *As Onur explained, if ZK is down, Kafka can still work, but won't be
> able to react to actual broker failures until ZK is up again. So if a
> broker is down in that window, some of the partitions may not be ready
> for read or
> write.*
> We had a production scenario where ZK had a long GC pause and Kafka
> lost connection temporarily. The brokers kept sending data just fine
> for existing topics. However, when ZK came back, the kafka cluster
> could not recover gracefully because of this issue:
> https://issues.apache.org/
> jira/browse/KAFKA-2729
> IIRC, in our case, the cached data that was mismatched was the
> controller generations in zookeeper for the partition leaders did not
> match the generation id listed in the controller znode.
> Manually forcing a controller re-election solved this because it
> brought all generation IDs in sync. However, it would have been nice
> if the cluster had been able to automatically do the controller
> re-election without waiting for manual intervention.
>
> On Wed, Apr 19, 2017 at 1:30 PM, Jun Rao <ju...@confluent.io> wrote:
>
>> Hi, Shri,
>>
>> As Onur explained, if ZK is down, Kafka can still work, but won't be
>> able to react to actual broker failures until ZK is up again. So if a
>> broker is down in that window, some of the partitions may not be
>> ready for read or write.
>>
>> As for the duplicates in the consumer, Hans had a good point. It
>> would be useful to see if the duplicates are introduced by the
>> producer or the consumer. Perhaps you can read the log again and see
>> if duplicates are in the log in the first place. Note that broker
>> retries can introduce duplicates.
>>
>> Hi, Onur,
>>
>> For the data loss issue that you mentioned, that should only happen
>> with acks=1. As we discussed offline, if acks=all is used and unclean
>> leader election is disabled, acked messages shouldn't be lost.
>>
>> Thanks,
>>
>> Jun
>>
>>
>> On Wed, Apr 19, 2017 at 10:19 AM, Onur Karaman <
>> onurkaraman.apache@gmail.com
>> > wrote:
>>
>> > If this is what I think it is, it has nothing to do with acks,
>> > max.in.flight.requests.per.connection, or anything client-side and
>> > is purely about the kafka cluster.
>> >
>> > Here's a simple example involving a single zookeeper instance, 3
>> brokers, a
>> > KafkaConsumer and KafkaProducer (neither of these clients interact
>> > with zookeeper).
>> > 1. start up zookeeper:
>> > > ./bin/zookeeper-server-start.sh config/zookeeper.properties
>> >
>> > 2. start up some brokers:
>> > > ./bin/kafka-server-start.sh config/server0.properties
>> > > ./bin/kafka-server-start.sh config/server1.properties
>> > > ./bin/kafka-server-start.sh config/server2.properties
>> >
>> > 3 create a topic:
>> > > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic
>> > > t
>> > --partitions 1 --replication-factor 3
>> >
>> > 4. start a console consumer (this needs to happen before step 5 so
>> > we
>> can
>> > write __consumer_offsets metadata to zookeeper):
>> > > ./bin/kafka-console-consumer.sh --broker-list
>> > localhost:9090,localhost:9091,localhost:9092 --topic t
>> >
>> > 5. kill zookeeper
>> >
>> > 6. start a console producer and produce some messages:
>> > > ./bin/kafka-console-producer.sh --broker-list
>> > localhost:9090,localhost:9091,localhost:9092 --topic t
>> >
>> > 7. notice the size of the broker logs grow with each message you send:
>> > > l /tmp/kafka-logs*/t-0
>> >
>> > 8. notice the consumer consuming the messages being produced
>> >
>> > Basically, zookeeper can be completely offline and your brokers
>> > will
>> append
>> > to logs and process client requests just fine as long as it doesn't
>> need to
>> > interact with zookeeper. Today, the only way a broker knows to stop
>> > accepting requests is when it receives instruction from the controller.
>> >
>> > I first realized this last July when debugging a small production
>> > data
>> loss
>> > scenario that was a result of this[1]. Maybe this is an attempt at
>> leaning
>> > towards availability over consistency. Personally I think that
>> > brokers should stop accepting requests when it disconnects from zookeeper.
>> >
>> > [1] The small production data loss scenario happens when accepting
>> requests
>> > during the small window in between a broker's zookeeper session
>> expiration
>> > and when the controller instructs the broker to stop accepting requests.
>> > During this time, the broker still thinks it leads partitions that
>> > are currently being led by another broker, effectively resulting in
>> > a window where the partition is led by two brokers. Clients can
>> > continue sending requests to the old leader, and for producers with
>> > low acknowledgement settings (like ack=1), their messages will be
>> > lost without the client knowing, as the messages are being appended
>> > to the phantom leader's logs instead of the true leader's logs.
>> >
>> > On Wed, Apr 19, 2017 at 7:56 AM, Shrikant Patel <SP...@pdxinc.com>
>> wrote:
>> >
>> > > While we were testing, our producer had following configuration
>> > > max.in.flight.requests.per.connection=1, acks= all and retries=3.
>> > >
>> > > The entire producer side set is below. The consumer has manual
>> > > offset commit, it commit offset after it has successfully
>> > > processed the
>> message.
>> > >
>> > > Producer setting
>> > > bootstrap.servers​= {point the F5 VS fronting Kafka cluster}
>> > > key.serializer= {appropriate value as per your cases}
>> > > value.serializer= {appropriate value as per your case} acks= all
>> > > retries=3
>> > > ssl.key.password= {appropriate value as per your case}
>> > > ssl.keystore.location= {appropriate value as per your case}
>> > > ssl.keystore.password= {appropriate value as per your case}
>> > > ssl.truststore.location= {appropriate value as per your case}
>> > > ssl.truststore.password= {appropriate value as per your case}
>> > > batch.size=16384​ client.id= {appropriate value as per your case,
>> > > may help with
>> debugging}
>> > > max.block.ms​=65000
>> > > request.timeout.ms=30000
>> > > security.protocol= SSL
>> > > ssl.enabled.protocols=TLSv1.2
>> > > ssl.keystore.type=JKS
>> > > ssl.protocol=TLSv1.2
>> > > ssl.truststore.type=JKS
>> > > max.in.flight.requests.per.connection=1
>> > > metadata.fetch.timeout.ms=60000
>> > > reconnect.backoff.ms=1000
>> > > retry.backoff.ms​=1000
>> > > max.request.size=1048576​​
>> > > linger.ms=0
>> > >
>> > > Consumer setting
>> > > bootstrap.servers​= {point the F5 VS fronting Kafka cluster}
>> > > key.deserializer= {appropriate value as per your cases}
>> > > value.deserializer= {appropriate value as per your case}
>> > > group.id= {appropriate value as per your case} ssl.key.password=
>> > > {appropriate value as per your case} ssl.keystore.location=
>> > > {appropriate value as per your case} ssl.keystore.password=
>> > > {appropriate value as per your case} ssl.truststore.location=
>> > > {appropriate value as per your case} ssl.truststore.password=
>> > > {appropriate value as per your case} enable.auto.commit=false
>> > > security.protocol= SSL
>> > > ssl.enabled.protocols=TLSv1.2
>> > > ssl.keystore.type=JKS
>> > > ssl.protocol=TLSv1.2
>> > > ssl.truststore.type=JKS
>> > > client.id= {appropriate value as per your case, may help with
>> > debugging}​
>> > > reconnect.backoff.ms=1000
>> > > retry.backoff.ms​=1000​
>> > >
>> > > Thanks,
>> > > Shri
>> > >
>> > > -----Original Message-----
>> > > From: Hans Jespersen [mailto:hans@confluent.io]
>> > > Sent: Tuesday, April 18, 2017 7:57 PM
>> > > To: users@kafka.apache.org
>> > > Subject: [EXTERNAL] Re: ZK and Kafka failover testing
>> > >
>> > > ***** Notice: This email was received from an external source
>> > > *****
>> > >
>> > > When you publish, is acks=0,1 or all (-1)?
>> > > What is max.in.flight.requests.per.connection (default is 5)?
>> > >
>> > > It sounds to me like your publishers are using acks=0 and so they
>> > > are
>> not
>> > > actually succeeding in publishing (i.e. you are getting no acks)
>> > > but
>> they
>> > > will retry over and over and will have up to 5 retries in flight,
>> > > so
>> when
>> > > the broker comes back up, you are getting 4 or 5 copies of the
>> > > same
>> > message.
>> > >
>> > > Try setting max.in.flight.requests.per.connection=1 to get rid of
>> > > duplicates Try setting acks=all to ensure the messages are being
>> > persisted
>> > > by the leader and all the available replicas in the kafka cluster.
>> > >
>> > > -hans
>> > >
>> > > /**
>> > >  * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
>> > >  * hans@confluent.io (650)924-2670  */
>> > >
>> > > On Tue, Apr 18, 2017 at 4:10 PM, Shrikant Patel
>> > > <SP...@pdxinc.com>
>> > wrote:
>> > >
>> > > > Hi All,
>> > > >
>> > > > I am seeing strange behavior between ZK and Kafka. We have 5
>> > > > node in ZK and Kafka cluster each. Kafka version -
>> > > > 2.11-0.10.1.1
>> > > >
>> > > > The min.insync.replicas is 3, replication.factor is 5 for all
>> topics,
>> > > > unclean.leader.election.enable is false. We have 15 partitions
>> > > > for each topic.
>> > > >
>> > > > The step we are following in our testing.
>> > > >
>> > > >
>> > > > *         My understanding is that ZK needs aleast 3 out of 5
>> server to
>> > > be
>> > > > functional. Kafka could not be functional without zookeeper. In
>> > > > out testing, we bring down 3 ZK nodes and don't touch Kafka
>> > > > nodes. Kafka is still functional, consumer\producer can still
>> > > > consume\publish
>> from
>> > > > Kafka cluster. We then bring down all ZK nodes, Kafka
>> > > > consumer\producers are still functional. I am not able to
>> > > > understand why Kafka cluster is not failing as soon as majority
>> > > > of ZK nodes are down. I do see error in Kafka that it cannot
>> > > > connection to ZK
>> cluster.
>> > > >
>> > > >
>> > > >
>> > > > *         With all or majority of ZK node down, we bring down 1
>> Kafka
>> > > > nodes (out of 5, so 4 are running). And at that point the
>> > > > consumer
>> and
>> > > > producer start failing. My guess is the new leadership election
>> cannot
>> > > > happen without ZK.
>> > > >
>> > > >
>> > > >
>> > > > *         Then we bring up the majority of ZK node up. (1st Kafka is
>> > > still
>> > > > down) Now the Kafka cluster become functional, consumer and
>> > > > producer now start working again. But Consumer sees big junk of
>> > > > message from kafka, and many of them are duplicates. It's like
>> > > > these messages
>> were
>> > > > held up somewhere, Where\Why I don't know?  And why the duplicates?
>> I
>> > > > can understand few duplicates for messages that consumer would
>> > > > not commit before 1st node when down. But why so many
>> > > > duplicates and
>> like
>> > > > 4 copy for each message. I cannot understand this behavior.
>> > > >
>> > > > Appreciate some insight about our issues. Also if there are
>> > > > blogs
>> that
>> > > > describe the ZK and Kafka failover scenario behaviors, that
>> > > > would be extremely helpful.
>> > > >
>> > > > Thanks,
>> > > > Shri
>> > > >
>> > > > This e-mail and its contents (to include attachments) are the
>> property
>> > > > of National Health Systems, Inc., its subsidiaries and
>> > > > affiliates, including but not limited to Rx.com Community
>> > > > Healthcare Network,
>> Inc.
>> > > > and its subsidiaries, and may contain confidential and
>> > > > proprietary
>> or
>> > > > privileged information. If you are not the intended recipient
>> > > > of
>> this
>> > > > e-mail, you are hereby notified that any unauthorized
>> > > > disclosure, copying, or distribution of this e-mail or of its
>> > > > attachments, or
>> the
>> > > > taking of any unauthorized action based on information
>> > > > contained
>> herein
>> > > is strictly prohibited.
>> > > > Unauthorized use of information contained herein may subject
>> > > > you to civil and criminal prosecution and penalties. If you are
>> > > > not the intended recipient, please immediately notify the
>> > > > sender by
>> telephone
>> > > > at
>> > > > 800-433-5719 <(800)%20433-5719> or return e-mail and
>> > > > permanently
>> delete the original
>> > e-mail.
>> > > >
>> > > This e-mail and its contents (to include attachments) are the
>> property of
>> > > National Health Systems, Inc., its subsidiaries and affiliates,
>> including
>> > > but not limited to Rx.com Community Healthcare Network, Inc. and
>> > > its subsidiaries, and may contain confidential and proprietary or
>> privileged
>> > > information. If you are not the intended recipient of this
>> > > e-mail, you
>> > are
>> > > hereby notified that any unauthorized disclosure, copying, or
>> > distribution
>> > > of this e-mail or of its attachments, or the taking of any
>> unauthorized
>> > > action based on information contained herein is strictly prohibited.
>> > > Unauthorized use of information contained herein may subject you
>> > > to
>> civil
>> > > and criminal prosecution and penalties. If you are not the
>> > > intended recipient, please immediately notify the sender by
>> > > telephone at
>> > > 800-433-5719 <(800)%20433-5719> or return e-mail and permanently
>> delete the original e-mail.
>> > >
>> >
>>
>
>
This e-mail and its contents (to include attachments) are the property of National Health Systems, Inc., its subsidiaries and affiliates, including but not limited to Rx.com Community Healthcare Network, Inc. and its subsidiaries, and may contain confidential and proprietary or privileged information. If you are not the intended recipient of this e-mail, you are hereby notified that any unauthorized disclosure, copying, or distribution of this e-mail or of its attachments, or the taking of any unauthorized action based on information contained herein is strictly prohibited. Unauthorized use of information contained herein may subject you to civil and criminal prosecution and penalties. If you are not the intended recipient, please immediately notify the sender by telephone at 800-433-5719 or return e-mail and permanently delete the original e-mail.

Re: Re: Re: ZK and Kafka failover testing

Posted by Hans Jespersen <ha...@confluent.io>.
The kafka-console-producer.sh defaults to acks=1 so just be careful with
using those tools for too much debugging. Your output is helpful though.

https://github.com/apache/kafka/blob/5a2fcdd6d480e9f003cc49a59d5952ba4c515a71/core/src/main/scala/kafka/tools/ConsoleProducer.scala#L185

-hans

On Wed, Apr 19, 2017 at 3:52 PM, Shrikant Patel <SP...@pdxinc.com> wrote:

> Just to add, I see below behavior repeat with even command line console
> producer and consumer that come with Kafka.
>
> Thanks,
> Shri
> __________________________________________________
> Shrikant Patel  |  817.367.4302
> Enterprise Architecture Team
> PDX-NHIN
>
>
> -----Original Message-----
> From: Shrikant Patel
> Sent: Wednesday, April 19, 2017 5:49 PM
> To: users@kafka.apache.org
> Subject: RE: [EXTERNAL] Re: Re: ZK and Kafka failover testing
>
> Thanks Jeff, Onur, Jun, Hans. I am learning a lot from your response.
>
> Just to summarize briefly my steps, 5 node Kafka and ZK cluster.
> 1. ZK cluster has all node working. Consumer is down.
> 2. Bring down majority of ZK nodes.
> 3. Thing are functional no issue (no dup or lost message) 4. Now first
> kafka node come down.
> 5. My issue start happening - as you see below producer says message with
> key 34 and 35 failed.
> 6. Bring majority of ZK node up.
> 7. Other kafka nodes assumes leadership for node 1's topic.
> 8. Bring consumer up, it starts consuming from the last offset and I see
> duplicates. I see message 34 (3 times) and 35 (4 times)
>
>
> Jeff, in my case I don’t see issue with kafka cluster recovering, once the
> majority ZK nodes are up, other Kafka takes up leadership for down node
> immediately.
> Onur, as Jun mentioned since I have acks=all, I am not seeing any messages
> being lost.
>
> Jun, Hans, I had same thought of trying to eliminate the consumer getting
> duplicate because of incorrectly acking the message. In next run of this
> test case, I was not run client at all. My consumer, producer properties
> are in first email in this thread. As I understand RetriableException is
> for temporary issue and I would like retry to see if issue resolves itself
> by then, hence producer has retries =3
>
> Producer log
>
> ******************* Publisher #  Paritition - 12 Key - 26 Value - value 26
>  ******************* Publisher #  Paritition - 13 Key - 27 Value - value 27
>  ******************* Publisher #  Paritition - 14 Key - 28 Value - value 28
>  ******************* Publisher #  Paritition - 0 Key - 29 Value - value 29
>  ******************* Publisher #  Paritition - 1 Key - 30 Value - value 30
>  ******************* Publisher #  Paritition - 2 Key - 31 Value - value 31
>  ******************* Publisher #  Paritition - 3 Key - 32 Value - value 32
>  ******************* Publisher #  Paritition - 4 Key - 33 Value - value 33
>  ******************* Publisher #  Paritition - 5 Key - 34 Value - value 34
> 2017-04-19 16:39:08.008  WARN 399580 --- [| shri-producer]
> o.a.k.clients.producer.internals.Sender  : Got error produce response
> with correlation id 37 on topic-partition ${topic-name}-5, retrying (2
> attempts left). Error: NETWORK_EXCEPTION
> 2017-04-19 16:39:39.128  WARN 399580 --- [| shri-producer]
> o.a.k.clients.producer.internals.Sender  : Got error produce response
> with correlation id 39 on topic-partition ${topic-name}-5, retrying (1
> attempts left). Error: NETWORK_EXCEPTION
> 2017-04-19 16:40:10.271  WARN 399580 --- [| shri-producer]
> o.a.k.clients.producer.internals.Sender  : Got error produce response
> with correlation id 41 on topic-partition ${topic-name}-5, retrying (0
> attempts left). Error: NETWORK_EXCEPTION
> 2017-04-19 16:40:41.419 ERROR 399580 --- [| shri-producer] o.s.k.support.LoggingProducerListener
>   : Exception thrown when sending a message with key='34' and
> payload='value 34' to topic ${topic-name} and partition 5:
> org.apache.kafka.common.errors.NetworkException: The server disconnected
> before a response was received.
> 2017-04-19 16:42:50.621  INFO 399580 --- [pool-1-thread-1]
> c.p.p.SpringKafkaPublisher_Simple        : ******************* Failed to
> publish  Paritition - 5 Key - 34 Value - value 34
> java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException:
> Failed to send; nested exception is org.apache.kafka.common.errors.NetworkException:
> The server disconnected before a response was received.
> 2017-04-19 16:42:51.001  INFO 399580 --- [pool-1-thread-1]
> c.p.p.SpringKafkaPublisher_Simple        : ******************* Publisher
> #  Paritition - 6 Key - 35 Value - value 35
> 2017-04-19 16:43:21.010  WARN 399580 --- [| shri-producer]
> o.a.k.clients.producer.internals.Sender  : Got error produce response
> with correlation id 49 on topic-partition ${topic-name}-6, retrying (2
> attempts left). Error: NETWORK_EXCEPTION
> 2017-04-19 16:43:52.152  WARN 399580 --- [| shri-producer]
> o.a.k.clients.producer.internals.Sender  : Got error produce response
> with correlation id 51 on topic-partition ${topic-name}-6, retrying (1
> attempts left). Error: NETWORK_EXCEPTION
> 2017-04-19 16:44:23.234  WARN 399580 --- [| shri-producer]
> o.a.k.clients.producer.internals.Sender  : Got error produce response
> with correlation id 53 on topic-partition ${topic-name}-6, retrying (0
> attempts left). Error: NETWORK_EXCEPTION
> 2017-04-19 16:44:54.421 ERROR 399580 --- [| shri-producer] o.s.k.support.LoggingProducerListener
>   : Exception thrown when sending a message with key='35' and
> payload='value 35' to topic ${topic-name} and partition 6:
> org.apache.kafka.common.errors.NetworkException: The server disconnected
> before a response was received.
>
> Consumer log (consumer only started at the very end of the test scenario)
> value 21
> value 22
> value 23
> value 24
> value 25
> value 26
> value 27
> value 28
> value 29
> value 30
> value 31
> value 32
> value 33
> value 34
> value 34
> value 34
> value 35
> value 35
> value 35
> value 35
>
> Output of describe command at point 1.
>
> Topic:${topic-name}   PartitionCount:15       ReplicationFactor:5
>  Configs:min.insync.replicas=3
>         Topic: ${topic-name}  Partition: 0    Leader: 5       Replicas:
> 5,4,1,2,3     Isr: 5,1,2,3,4
>         Topic: ${topic-name}  Partition: 1    Leader: 1       Replicas:
> 1,5,2,3,4     Isr: 5,1,2,3,4
>         Topic: ${topic-name}  Partition: 2    Leader: 2       Replicas:
> 2,1,3,4,5     Isr: 5,1,2,3,4
>         Topic: ${topic-name}  Partition: 3    Leader: 3       Replicas:
> 3,2,4,5,1     Isr: 5,1,2,3,4
>         Topic: ${topic-name}  Partition: 4    Leader: 4       Replicas:
> 4,3,5,1,2     Isr: 5,1,2,3,4
>         Topic: ${topic-name}  Partition: 5    Leader: 5       Replicas:
> 5,1,2,3,4     Isr: 5,1,2,3,4
>         Topic: ${topic-name}  Partition: 6    Leader: 1       Replicas:
> 1,2,3,4,5     Isr: 5,1,2,3,4
>         Topic: ${topic-name}  Partition: 7    Leader: 2       Replicas:
> 2,3,4,5,1     Isr: 5,1,2,3,4
>         Topic: ${topic-name}  Partition: 8    Leader: 3       Replicas:
> 3,4,5,1,2     Isr: 5,1,2,3,4
>         Topic: ${topic-name}  Partition: 9    Leader: 4       Replicas:
> 4,5,1,2,3     Isr: 5,1,2,3,4
>         Topic: ${topic-name}  Partition: 10   Leader: 5       Replicas:
> 5,2,3,4,1     Isr: 5,1,2,3,4
>         Topic: ${topic-name}  Partition: 11   Leader: 1       Replicas:
> 1,3,4,5,2     Isr: 5,1,2,3,4
>         Topic: ${topic-name}  Partition: 12   Leader: 2       Replicas:
> 2,4,5,1,3     Isr: 5,1,2,3,4
>         Topic: ${topic-name}  Partition: 13   Leader: 3       Replicas:
> 3,5,1,2,4     Isr: 5,1,2,3,4
>         Topic: ${topic-name}  Partition: 14   Leader: 4       Replicas:
> 4,1,2,3,5     Isr: 5,1,2,3,4
>
> (since majority ZK are down at point 6 my describe command does not work)
> Output of describe command at point 2.
>
> Topic:${topic-name}   PartitionCount:15       ReplicationFactor:5
>  Configs:min.insync.replicas=3
>         Topic: ${topic-name}  Partition: 0    Leader: 5       Replicas:
> 5,4,1,2,3     Isr: 2,3,4,5
>         Topic: ${topic-name}  Partition: 1    Leader: 5       Replicas:
> 1,5,2,3,4     Isr: 2,5,3,4
>         Topic: ${topic-name}  Partition: 2    Leader: 2       Replicas:
> 2,1,3,4,5     Isr: 4,2,5,3
>         Topic: ${topic-name}  Partition: 3    Leader: 3       Replicas:
> 3,2,4,5,1     Isr: 3,4,2,5
>         Topic: ${topic-name}  Partition: 4    Leader: 4       Replicas:
> 4,3,5,1,2     Isr: 2,5,3,4
>         Topic: ${topic-name}  Partition: 5    Leader: 5       Replicas:
> 5,1,2,3,4     Isr: 4,5,2,3
>         Topic: ${topic-name}  Partition: 6    Leader: 2       Replicas:
> 1,2,3,4,5     Isr: 3,4,5,2
>         Topic: ${topic-name}  Partition: 7    Leader: 2       Replicas:
> 2,3,4,5,1     Isr: 2,3,5,4
>         Topic: ${topic-name}  Partition: 8    Leader: 3       Replicas:
> 3,4,5,1,2     Isr: 2,4,5,3
>         Topic: ${topic-name}  Partition: 9    Leader: 4       Replicas:
> 4,5,1,2,3     Isr: 3,4,2,5
>         Topic: ${topic-name}  Partition: 10   Leader: 5       Replicas:
> 5,2,3,4,1     Isr: 5,2,3,4
>         Topic: ${topic-name}  Partition: 11   Leader: 3       Replicas:
> 1,3,4,5,2     Isr: 5,2,3,4
>         Topic: ${topic-name}  Partition: 12   Leader: 2       Replicas:
> 2,4,5,1,3     Isr: 4,3,5,2
>         Topic: ${topic-name}  Partition: 13   Leader: 3       Replicas:
> 3,5,1,2,4     Isr: 5,3,2,4
>         Topic: ${topic-name}  Partition: 14   Leader: 4       Replicas:
> 4,1,2,3,5     Isr: 4,2,5,3
>
> Thanks,
> Shri
>
>
> -----Original Message-----
> From: Jeff Widman [mailto:jeff@netskope.com]
> Sent: Wednesday, April 19, 2017 4:11 PM
> To: users@kafka.apache.org
> Subject: [EXTERNAL] Re: Re: ZK and Kafka failover testing
>
> ***** Notice: This email was received from an external source *****
>
> Oops, I linked to the wrong ticket, this is the one we hit:
> https://issues.apache.org/jira/browse/KAFKA-3042
>
> On Wed, Apr 19, 2017 at 1:45 PM, Jeff Widman <je...@netskope.com> wrote:
>
> >
> >
> >
> >
> >
> > *As Onur explained, if ZK is down, Kafka can still work, but won't be
> > able to react to actual broker failures until ZK is up again. So if a
> > broker is down in that window, some of the partitions may not be ready
> > for read or
> > write.*
> > We had a production scenario where ZK had a long GC pause and Kafka
> > lost connection temporarily. The brokers kept sending data just fine
> > for existing topics. However, when ZK came back, the kafka cluster
> > could not recover gracefully because of this issue:
> > https://issues.apache.org/
> > jira/browse/KAFKA-2729
> > IIRC, in our case, the cached data that was mismatched was the
> > controller generations in zookeeper for the partition leaders did not
> > match the generation id listed in the controller znode.
> > Manually forcing a controller re-election solved this because it
> > brought all generation IDs in sync. However, it would have been nice
> > if the cluster had been able to automatically do the controller
> > re-election without waiting for manual intervention.
> >
> > On Wed, Apr 19, 2017 at 1:30 PM, Jun Rao <ju...@confluent.io> wrote:
> >
> >> Hi, Shri,
> >>
> >> As Onur explained, if ZK is down, Kafka can still work, but won't be
> >> able to react to actual broker failures until ZK is up again. So if a
> >> broker is down in that window, some of the partitions may not be
> >> ready for read or write.
> >>
> >> As for the duplicates in the consumer, Hans had a good point. It
> >> would be useful to see if the duplicates are introduced by the
> >> producer or the consumer. Perhaps you can read the log again and see
> >> if duplicates are in the log in the first place. Note that broker
> >> retries can introduce duplicates.
> >>
> >> Hi, Onur,
> >>
> >> For the data loss issue that you mentioned, that should only happen
> >> with acks=1. As we discussed offline, if acks=all is used and unclean
> >> leader election is disabled, acked messages shouldn't be lost.
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >>
> >> On Wed, Apr 19, 2017 at 10:19 AM, Onur Karaman <
> >> onurkaraman.apache@gmail.com
> >> > wrote:
> >>
> >> > If this is what I think it is, it has nothing to do with acks,
> >> > max.in.flight.requests.per.connection, or anything client-side and
> >> > is purely about the kafka cluster.
> >> >
> >> > Here's a simple example involving a single zookeeper instance, 3
> >> brokers, a
> >> > KafkaConsumer and KafkaProducer (neither of these clients interact
> >> > with zookeeper).
> >> > 1. start up zookeeper:
> >> > > ./bin/zookeeper-server-start.sh config/zookeeper.properties
> >> >
> >> > 2. start up some brokers:
> >> > > ./bin/kafka-server-start.sh config/server0.properties
> >> > > ./bin/kafka-server-start.sh config/server1.properties
> >> > > ./bin/kafka-server-start.sh config/server2.properties
> >> >
> >> > 3 create a topic:
> >> > > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic
> >> > > t
> >> > --partitions 1 --replication-factor 3
> >> >
> >> > 4. start a console consumer (this needs to happen before step 5 so
> >> > we
> >> can
> >> > write __consumer_offsets metadata to zookeeper):
> >> > > ./bin/kafka-console-consumer.sh --broker-list
> >> > localhost:9090,localhost:9091,localhost:9092 --topic t
> >> >
> >> > 5. kill zookeeper
> >> >
> >> > 6. start a console producer and produce some messages:
> >> > > ./bin/kafka-console-producer.sh --broker-list
> >> > localhost:9090,localhost:9091,localhost:9092 --topic t
> >> >
> >> > 7. notice the size of the broker logs grow with each message you send:
> >> > > l /tmp/kafka-logs*/t-0
> >> >
> >> > 8. notice the consumer consuming the messages being produced
> >> >
> >> > Basically, zookeeper can be completely offline and your brokers
> >> > will
> >> append
> >> > to logs and process client requests just fine as long as it doesn't
> >> need to
> >> > interact with zookeeper. Today, the only way a broker knows to stop
> >> > accepting requests is when it receives instruction from the
> controller.
> >> >
> >> > I first realized this last July when debugging a small production
> >> > data
> >> loss
> >> > scenario that was a result of this[1]. Maybe this is an attempt at
> >> leaning
> >> > towards availability over consistency. Personally I think that
> >> > brokers should stop accepting requests when it disconnects from
> zookeeper.
> >> >
> >> > [1] The small production data loss scenario happens when accepting
> >> requests
> >> > during the small window in between a broker's zookeeper session
> >> expiration
> >> > and when the controller instructs the broker to stop accepting
> requests.
> >> > During this time, the broker still thinks it leads partitions that
> >> > are currently being led by another broker, effectively resulting in
> >> > a window where the partition is led by two brokers. Clients can
> >> > continue sending requests to the old leader, and for producers with
> >> > low acknowledgement settings (like ack=1), their messages will be
> >> > lost without the client knowing, as the messages are being appended
> >> > to the phantom leader's logs instead of the true leader's logs.
> >> >
> >> > On Wed, Apr 19, 2017 at 7:56 AM, Shrikant Patel <SP...@pdxinc.com>
> >> wrote:
> >> >
> >> > > While we were testing, our producer had following configuration
> >> > > max.in.flight.requests.per.connection=1, acks= all and retries=3.
> >> > >
> >> > > The entire producer side set is below. The consumer has manual
> >> > > offset commit, it commit offset after it has successfully
> >> > > processed the
> >> message.
> >> > >
> >> > > Producer setting
> >> > > bootstrap.servers​= {point the F5 VS fronting Kafka cluster}
> >> > > key.serializer= {appropriate value as per your cases}
> >> > > value.serializer= {appropriate value as per your case} acks= all
> >> > > retries=3
> >> > > ssl.key.password= {appropriate value as per your case}
> >> > > ssl.keystore.location= {appropriate value as per your case}
> >> > > ssl.keystore.password= {appropriate value as per your case}
> >> > > ssl.truststore.location= {appropriate value as per your case}
> >> > > ssl.truststore.password= {appropriate value as per your case}
> >> > > batch.size=16384​ client.id= {appropriate value as per your case,
> >> > > may help with
> >> debugging}
> >> > > max.block.ms​=65000
> >> > > request.timeout.ms=30000
> >> > > security.protocol= SSL
> >> > > ssl.enabled.protocols=TLSv1.2
> >> > > ssl.keystore.type=JKS
> >> > > ssl.protocol=TLSv1.2
> >> > > ssl.truststore.type=JKS
> >> > > max.in.flight.requests.per.connection=1
> >> > > metadata.fetch.timeout.ms=60000
> >> > > reconnect.backoff.ms=1000
> >> > > retry.backoff.ms​=1000
> >> > > max.request.size=1048576​​
> >> > > linger.ms=0
> >> > >
> >> > > Consumer setting
> >> > > bootstrap.servers​= {point the F5 VS fronting Kafka cluster}
> >> > > key.deserializer= {appropriate value as per your cases}
> >> > > value.deserializer= {appropriate value as per your case}
> >> > > group.id= {appropriate value as per your case} ssl.key.password=
> >> > > {appropriate value as per your case} ssl.keystore.location=
> >> > > {appropriate value as per your case} ssl.keystore.password=
> >> > > {appropriate value as per your case} ssl.truststore.location=
> >> > > {appropriate value as per your case} ssl.truststore.password=
> >> > > {appropriate value as per your case} enable.auto.commit=false
> >> > > security.protocol= SSL
> >> > > ssl.enabled.protocols=TLSv1.2
> >> > > ssl.keystore.type=JKS
> >> > > ssl.protocol=TLSv1.2
> >> > > ssl.truststore.type=JKS
> >> > > client.id= {appropriate value as per your case, may help with
> >> > debugging}​
> >> > > reconnect.backoff.ms=1000
> >> > > retry.backoff.ms​=1000​
> >> > >
> >> > > Thanks,
> >> > > Shri
> >> > >
> >> > > -----Original Message-----
> >> > > From: Hans Jespersen [mailto:hans@confluent.io]
> >> > > Sent: Tuesday, April 18, 2017 7:57 PM
> >> > > To: users@kafka.apache.org
> >> > > Subject: [EXTERNAL] Re: ZK and Kafka failover testing
> >> > >
> >> > > ***** Notice: This email was received from an external source
> >> > > *****
> >> > >
> >> > > When you publish, is acks=0,1 or all (-1)?
> >> > > What is max.in.flight.requests.per.connection (default is 5)?
> >> > >
> >> > > It sounds to me like your publishers are using acks=0 and so they
> >> > > are
> >> not
> >> > > actually succeeding in publishing (i.e. you are getting no acks)
> >> > > but
> >> they
> >> > > will retry over and over and will have up to 5 retries in flight,
> >> > > so
> >> when
> >> > > the broker comes back up, you are getting 4 or 5 copies of the
> >> > > same
> >> > message.
> >> > >
> >> > > Try setting max.in.flight.requests.per.connection=1 to get rid of
> >> > > duplicates Try setting acks=all to ensure the messages are being
> >> > persisted
> >> > > by the leader and all the available replicas in the kafka cluster.
> >> > >
> >> > > -hans
> >> > >
> >> > > /**
> >> > >  * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
> >> > >  * hans@confluent.io (650)924-2670  */
> >> > >
> >> > > On Tue, Apr 18, 2017 at 4:10 PM, Shrikant Patel
> >> > > <SP...@pdxinc.com>
> >> > wrote:
> >> > >
> >> > > > Hi All,
> >> > > >
> >> > > > I am seeing strange behavior between ZK and Kafka. We have 5
> >> > > > node in ZK and Kafka cluster each. Kafka version -
> >> > > > 2.11-0.10.1.1
> >> > > >
> >> > > > The min.insync.replicas is 3, replication.factor is 5 for all
> >> topics,
> >> > > > unclean.leader.election.enable is false. We have 15 partitions
> >> > > > for each topic.
> >> > > >
> >> > > > The step we are following in our testing.
> >> > > >
> >> > > >
> >> > > > *         My understanding is that ZK needs aleast 3 out of 5
> >> server to
> >> > > be
> >> > > > functional. Kafka could not be functional without zookeeper. In
> >> > > > out testing, we bring down 3 ZK nodes and don't touch Kafka
> >> > > > nodes. Kafka is still functional, consumer\producer can still
> >> > > > consume\publish
> >> from
> >> > > > Kafka cluster. We then bring down all ZK nodes, Kafka
> >> > > > consumer\producers are still functional. I am not able to
> >> > > > understand why Kafka cluster is not failing as soon as majority
> >> > > > of ZK nodes are down. I do see error in Kafka that it cannot
> >> > > > connection to ZK
> >> cluster.
> >> > > >
> >> > > >
> >> > > >
> >> > > > *         With all or majority of ZK node down, we bring down 1
> >> Kafka
> >> > > > nodes (out of 5, so 4 are running). And at that point the
> >> > > > consumer
> >> and
> >> > > > producer start failing. My guess is the new leadership election
> >> cannot
> >> > > > happen without ZK.
> >> > > >
> >> > > >
> >> > > >
> >> > > > *         Then we bring up the majority of ZK node up. (1st Kafka
> is
> >> > > still
> >> > > > down) Now the Kafka cluster become functional, consumer and
> >> > > > producer now start working again. But Consumer sees big junk of
> >> > > > message from kafka, and many of them are duplicates. It's like
> >> > > > these messages
> >> were
> >> > > > held up somewhere, Where\Why I don't know?  And why the
> duplicates?
> >> I
> >> > > > can understand few duplicates for messages that consumer would
> >> > > > not commit before 1st node when down. But why so many
> >> > > > duplicates and
> >> like
> >> > > > 4 copy for each message. I cannot understand this behavior.
> >> > > >
> >> > > > Appreciate some insight about our issues. Also if there are
> >> > > > blogs
> >> that
> >> > > > describe the ZK and Kafka failover scenario behaviors, that
> >> > > > would be extremely helpful.
> >> > > >
> >> > > > Thanks,
> >> > > > Shri
> >> > > >
> >> > > > This e-mail and its contents (to include attachments) are the
> >> property
> >> > > > of National Health Systems, Inc., its subsidiaries and
> >> > > > affiliates, including but not limited to Rx.com Community
> >> > > > Healthcare Network,
> >> Inc.
> >> > > > and its subsidiaries, and may contain confidential and
> >> > > > proprietary
> >> or
> >> > > > privileged information. If you are not the intended recipient
> >> > > > of
> >> this
> >> > > > e-mail, you are hereby notified that any unauthorized
> >> > > > disclosure, copying, or distribution of this e-mail or of its
> >> > > > attachments, or
> >> the
> >> > > > taking of any unauthorized action based on information
> >> > > > contained
> >> herein
> >> > > is strictly prohibited.
> >> > > > Unauthorized use of information contained herein may subject
> >> > > > you to civil and criminal prosecution and penalties. If you are
> >> > > > not the intended recipient, please immediately notify the
> >> > > > sender by
> >> telephone
> >> > > > at
> >> > > > 800-433-5719 <(800)%20433-5719> or return e-mail and
> >> > > > permanently
> >> delete the original
> >> > e-mail.
> >> > > >
> >> > > This e-mail and its contents (to include attachments) are the
> >> property of
> >> > > National Health Systems, Inc., its subsidiaries and affiliates,
> >> including
> >> > > but not limited to Rx.com Community Healthcare Network, Inc. and
> >> > > its subsidiaries, and may contain confidential and proprietary or
> >> privileged
> >> > > information. If you are not the intended recipient of this
> >> > > e-mail, you
> >> > are
> >> > > hereby notified that any unauthorized disclosure, copying, or
> >> > distribution
> >> > > of this e-mail or of its attachments, or the taking of any
> >> unauthorized
> >> > > action based on information contained herein is strictly prohibited.
> >> > > Unauthorized use of information contained herein may subject you
> >> > > to
> >> civil
> >> > > and criminal prosecution and penalties. If you are not the
> >> > > intended recipient, please immediately notify the sender by
> >> > > telephone at
> >> > > 800-433-5719 <(800)%20433-5719> or return e-mail and permanently
> >> delete the original e-mail.
> >> > >
> >> >
> >>
> >
> >
> This e-mail and its contents (to include attachments) are the property of
> National Health Systems, Inc., its subsidiaries and affiliates, including
> but not limited to Rx.com Community Healthcare Network, Inc. and its
> subsidiaries, and may contain confidential and proprietary or privileged
> information. If you are not the intended recipient of this e-mail, you are
> hereby notified that any unauthorized disclosure, copying, or distribution
> of this e-mail or of its attachments, or the taking of any unauthorized
> action based on information contained herein is strictly prohibited.
> Unauthorized use of information contained herein may subject you to civil
> and criminal prosecution and penalties. If you are not the intended
> recipient, please immediately notify the sender by telephone at
> 800-433-5719 or return e-mail and permanently delete the original e-mail.
>

RE: Re: Re: ZK and Kafka failover testing

Posted by Shrikant Patel <SP...@pdxinc.com>.
Just to add, I see below behavior repeat with even command line console producer and consumer that come with Kafka.

Thanks,
Shri
__________________________________________________
Shrikant Patel  |  817.367.4302
Enterprise Architecture Team
PDX-NHIN


-----Original Message-----
From: Shrikant Patel
Sent: Wednesday, April 19, 2017 5:49 PM
To: users@kafka.apache.org
Subject: RE: [EXTERNAL] Re: Re: ZK and Kafka failover testing

Thanks Jeff, Onur, Jun, Hans. I am learning a lot from your response.

Just to summarize briefly my steps, 5 node Kafka and ZK cluster.
1. ZK cluster has all node working. Consumer is down.
2. Bring down majority of ZK nodes.
3. Thing are functional no issue (no dup or lost message) 4. Now first kafka node come down.
5. My issue start happening - as you see below producer says message with key 34 and 35 failed.
6. Bring majority of ZK node up.
7. Other kafka nodes assumes leadership for node 1's topic.
8. Bring consumer up, it starts consuming from the last offset and I see duplicates. I see message 34 (3 times) and 35 (4 times)


Jeff, in my case I don’t see issue with kafka cluster recovering, once the majority ZK nodes are up, other Kafka takes up leadership for down node immediately.
Onur, as Jun mentioned since I have acks=all, I am not seeing any messages being lost.

Jun, Hans, I had same thought of trying to eliminate the consumer getting duplicate because of incorrectly acking the message. In next run of this test case, I was not run client at all. My consumer, producer properties are in first email in this thread. As I understand RetriableException is for temporary issue and I would like retry to see if issue resolves itself by then, hence producer has retries =3

Producer log

******************* Publisher #  Paritition - 12 Key - 26 Value - value 26
 ******************* Publisher #  Paritition - 13 Key - 27 Value - value 27
 ******************* Publisher #  Paritition - 14 Key - 28 Value - value 28
 ******************* Publisher #  Paritition - 0 Key - 29 Value - value 29
 ******************* Publisher #  Paritition - 1 Key - 30 Value - value 30
 ******************* Publisher #  Paritition - 2 Key - 31 Value - value 31
 ******************* Publisher #  Paritition - 3 Key - 32 Value - value 32
 ******************* Publisher #  Paritition - 4 Key - 33 Value - value 33
 ******************* Publisher #  Paritition - 5 Key - 34 Value - value 34
2017-04-19 16:39:08.008  WARN 399580 --- [| shri-producer] o.a.k.clients.producer.internals.Sender  : Got error produce response with correlation id 37 on topic-partition ${topic-name}-5, retrying (2 attempts left). Error: NETWORK_EXCEPTION
2017-04-19 16:39:39.128  WARN 399580 --- [| shri-producer] o.a.k.clients.producer.internals.Sender  : Got error produce response with correlation id 39 on topic-partition ${topic-name}-5, retrying (1 attempts left). Error: NETWORK_EXCEPTION
2017-04-19 16:40:10.271  WARN 399580 --- [| shri-producer] o.a.k.clients.producer.internals.Sender  : Got error produce response with correlation id 41 on topic-partition ${topic-name}-5, retrying (0 attempts left). Error: NETWORK_EXCEPTION
2017-04-19 16:40:41.419 ERROR 399580 --- [| shri-producer] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='34' and payload='value 34' to topic ${topic-name} and partition 5:
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
2017-04-19 16:42:50.621  INFO 399580 --- [pool-1-thread-1] c.p.p.SpringKafkaPublisher_Simple        : ******************* Failed to publish  Paritition - 5 Key - 34 Value - value 34
java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
2017-04-19 16:42:51.001  INFO 399580 --- [pool-1-thread-1] c.p.p.SpringKafkaPublisher_Simple        : ******************* Publisher #  Paritition - 6 Key - 35 Value - value 35
2017-04-19 16:43:21.010  WARN 399580 --- [| shri-producer] o.a.k.clients.producer.internals.Sender  : Got error produce response with correlation id 49 on topic-partition ${topic-name}-6, retrying (2 attempts left). Error: NETWORK_EXCEPTION
2017-04-19 16:43:52.152  WARN 399580 --- [| shri-producer] o.a.k.clients.producer.internals.Sender  : Got error produce response with correlation id 51 on topic-partition ${topic-name}-6, retrying (1 attempts left). Error: NETWORK_EXCEPTION
2017-04-19 16:44:23.234  WARN 399580 --- [| shri-producer] o.a.k.clients.producer.internals.Sender  : Got error produce response with correlation id 53 on topic-partition ${topic-name}-6, retrying (0 attempts left). Error: NETWORK_EXCEPTION
2017-04-19 16:44:54.421 ERROR 399580 --- [| shri-producer] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='35' and payload='value 35' to topic ${topic-name} and partition 6:
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.

Consumer log (consumer only started at the very end of the test scenario)
value 21
value 22
value 23
value 24
value 25
value 26
value 27
value 28
value 29
value 30
value 31
value 32
value 33
value 34
value 34
value 34
value 35
value 35
value 35
value 35

Output of describe command at point 1.

Topic:${topic-name}   PartitionCount:15       ReplicationFactor:5     Configs:min.insync.replicas=3
        Topic: ${topic-name}  Partition: 0    Leader: 5       Replicas: 5,4,1,2,3     Isr: 5,1,2,3,4
        Topic: ${topic-name}  Partition: 1    Leader: 1       Replicas: 1,5,2,3,4     Isr: 5,1,2,3,4
        Topic: ${topic-name}  Partition: 2    Leader: 2       Replicas: 2,1,3,4,5     Isr: 5,1,2,3,4
        Topic: ${topic-name}  Partition: 3    Leader: 3       Replicas: 3,2,4,5,1     Isr: 5,1,2,3,4
        Topic: ${topic-name}  Partition: 4    Leader: 4       Replicas: 4,3,5,1,2     Isr: 5,1,2,3,4
        Topic: ${topic-name}  Partition: 5    Leader: 5       Replicas: 5,1,2,3,4     Isr: 5,1,2,3,4
        Topic: ${topic-name}  Partition: 6    Leader: 1       Replicas: 1,2,3,4,5     Isr: 5,1,2,3,4
        Topic: ${topic-name}  Partition: 7    Leader: 2       Replicas: 2,3,4,5,1     Isr: 5,1,2,3,4
        Topic: ${topic-name}  Partition: 8    Leader: 3       Replicas: 3,4,5,1,2     Isr: 5,1,2,3,4
        Topic: ${topic-name}  Partition: 9    Leader: 4       Replicas: 4,5,1,2,3     Isr: 5,1,2,3,4
        Topic: ${topic-name}  Partition: 10   Leader: 5       Replicas: 5,2,3,4,1     Isr: 5,1,2,3,4
        Topic: ${topic-name}  Partition: 11   Leader: 1       Replicas: 1,3,4,5,2     Isr: 5,1,2,3,4
        Topic: ${topic-name}  Partition: 12   Leader: 2       Replicas: 2,4,5,1,3     Isr: 5,1,2,3,4
        Topic: ${topic-name}  Partition: 13   Leader: 3       Replicas: 3,5,1,2,4     Isr: 5,1,2,3,4
        Topic: ${topic-name}  Partition: 14   Leader: 4       Replicas: 4,1,2,3,5     Isr: 5,1,2,3,4

(since majority ZK are down at point 6 my describe command does not work)
Output of describe command at point 2.

Topic:${topic-name}   PartitionCount:15       ReplicationFactor:5     Configs:min.insync.replicas=3
        Topic: ${topic-name}  Partition: 0    Leader: 5       Replicas: 5,4,1,2,3     Isr: 2,3,4,5
        Topic: ${topic-name}  Partition: 1    Leader: 5       Replicas: 1,5,2,3,4     Isr: 2,5,3,4
        Topic: ${topic-name}  Partition: 2    Leader: 2       Replicas: 2,1,3,4,5     Isr: 4,2,5,3
        Topic: ${topic-name}  Partition: 3    Leader: 3       Replicas: 3,2,4,5,1     Isr: 3,4,2,5
        Topic: ${topic-name}  Partition: 4    Leader: 4       Replicas: 4,3,5,1,2     Isr: 2,5,3,4
        Topic: ${topic-name}  Partition: 5    Leader: 5       Replicas: 5,1,2,3,4     Isr: 4,5,2,3
        Topic: ${topic-name}  Partition: 6    Leader: 2       Replicas: 1,2,3,4,5     Isr: 3,4,5,2
        Topic: ${topic-name}  Partition: 7    Leader: 2       Replicas: 2,3,4,5,1     Isr: 2,3,5,4
        Topic: ${topic-name}  Partition: 8    Leader: 3       Replicas: 3,4,5,1,2     Isr: 2,4,5,3
        Topic: ${topic-name}  Partition: 9    Leader: 4       Replicas: 4,5,1,2,3     Isr: 3,4,2,5
        Topic: ${topic-name}  Partition: 10   Leader: 5       Replicas: 5,2,3,4,1     Isr: 5,2,3,4
        Topic: ${topic-name}  Partition: 11   Leader: 3       Replicas: 1,3,4,5,2     Isr: 5,2,3,4
        Topic: ${topic-name}  Partition: 12   Leader: 2       Replicas: 2,4,5,1,3     Isr: 4,3,5,2
        Topic: ${topic-name}  Partition: 13   Leader: 3       Replicas: 3,5,1,2,4     Isr: 5,3,2,4
        Topic: ${topic-name}  Partition: 14   Leader: 4       Replicas: 4,1,2,3,5     Isr: 4,2,5,3

Thanks,
Shri


-----Original Message-----
From: Jeff Widman [mailto:jeff@netskope.com]
Sent: Wednesday, April 19, 2017 4:11 PM
To: users@kafka.apache.org
Subject: [EXTERNAL] Re: Re: ZK and Kafka failover testing

***** Notice: This email was received from an external source *****

Oops, I linked to the wrong ticket, this is the one we hit:
https://issues.apache.org/jira/browse/KAFKA-3042

On Wed, Apr 19, 2017 at 1:45 PM, Jeff Widman <je...@netskope.com> wrote:

>
>
>
>
>
> *As Onur explained, if ZK is down, Kafka can still work, but won't be
> able to react to actual broker failures until ZK is up again. So if a
> broker is down in that window, some of the partitions may not be ready
> for read or
> write.*
> We had a production scenario where ZK had a long GC pause and Kafka
> lost connection temporarily. The brokers kept sending data just fine
> for existing topics. However, when ZK came back, the kafka cluster
> could not recover gracefully because of this issue:
> https://issues.apache.org/
> jira/browse/KAFKA-2729
> IIRC, in our case, the cached data that was mismatched was the
> controller generations in zookeeper for the partition leaders did not
> match the generation id listed in the controller znode.
> Manually forcing a controller re-election solved this because it
> brought all generation IDs in sync. However, it would have been nice
> if the cluster had been able to automatically do the controller
> re-election without waiting for manual intervention.
>
> On Wed, Apr 19, 2017 at 1:30 PM, Jun Rao <ju...@confluent.io> wrote:
>
>> Hi, Shri,
>>
>> As Onur explained, if ZK is down, Kafka can still work, but won't be
>> able to react to actual broker failures until ZK is up again. So if a
>> broker is down in that window, some of the partitions may not be
>> ready for read or write.
>>
>> As for the duplicates in the consumer, Hans had a good point. It
>> would be useful to see if the duplicates are introduced by the
>> producer or the consumer. Perhaps you can read the log again and see
>> if duplicates are in the log in the first place. Note that broker
>> retries can introduce duplicates.
>>
>> Hi, Onur,
>>
>> For the data loss issue that you mentioned, that should only happen
>> with acks=1. As we discussed offline, if acks=all is used and unclean
>> leader election is disabled, acked messages shouldn't be lost.
>>
>> Thanks,
>>
>> Jun
>>
>>
>> On Wed, Apr 19, 2017 at 10:19 AM, Onur Karaman <
>> onurkaraman.apache@gmail.com
>> > wrote:
>>
>> > If this is what I think it is, it has nothing to do with acks,
>> > max.in.flight.requests.per.connection, or anything client-side and
>> > is purely about the kafka cluster.
>> >
>> > Here's a simple example involving a single zookeeper instance, 3
>> brokers, a
>> > KafkaConsumer and KafkaProducer (neither of these clients interact
>> > with zookeeper).
>> > 1. start up zookeeper:
>> > > ./bin/zookeeper-server-start.sh config/zookeeper.properties
>> >
>> > 2. start up some brokers:
>> > > ./bin/kafka-server-start.sh config/server0.properties
>> > > ./bin/kafka-server-start.sh config/server1.properties
>> > > ./bin/kafka-server-start.sh config/server2.properties
>> >
>> > 3 create a topic:
>> > > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic
>> > > t
>> > --partitions 1 --replication-factor 3
>> >
>> > 4. start a console consumer (this needs to happen before step 5 so
>> > we
>> can
>> > write __consumer_offsets metadata to zookeeper):
>> > > ./bin/kafka-console-consumer.sh --broker-list
>> > localhost:9090,localhost:9091,localhost:9092 --topic t
>> >
>> > 5. kill zookeeper
>> >
>> > 6. start a console producer and produce some messages:
>> > > ./bin/kafka-console-producer.sh --broker-list
>> > localhost:9090,localhost:9091,localhost:9092 --topic t
>> >
>> > 7. notice the size of the broker logs grow with each message you send:
>> > > l /tmp/kafka-logs*/t-0
>> >
>> > 8. notice the consumer consuming the messages being produced
>> >
>> > Basically, zookeeper can be completely offline and your brokers
>> > will
>> append
>> > to logs and process client requests just fine as long as it doesn't
>> need to
>> > interact with zookeeper. Today, the only way a broker knows to stop
>> > accepting requests is when it receives instruction from the controller.
>> >
>> > I first realized this last July when debugging a small production
>> > data
>> loss
>> > scenario that was a result of this[1]. Maybe this is an attempt at
>> leaning
>> > towards availability over consistency. Personally I think that
>> > brokers should stop accepting requests when it disconnects from zookeeper.
>> >
>> > [1] The small production data loss scenario happens when accepting
>> requests
>> > during the small window in between a broker's zookeeper session
>> expiration
>> > and when the controller instructs the broker to stop accepting requests.
>> > During this time, the broker still thinks it leads partitions that
>> > are currently being led by another broker, effectively resulting in
>> > a window where the partition is led by two brokers. Clients can
>> > continue sending requests to the old leader, and for producers with
>> > low acknowledgement settings (like ack=1), their messages will be
>> > lost without the client knowing, as the messages are being appended
>> > to the phantom leader's logs instead of the true leader's logs.
>> >
>> > On Wed, Apr 19, 2017 at 7:56 AM, Shrikant Patel <SP...@pdxinc.com>
>> wrote:
>> >
>> > > While we were testing, our producer had following configuration
>> > > max.in.flight.requests.per.connection=1, acks= all and retries=3.
>> > >
>> > > The entire producer side set is below. The consumer has manual
>> > > offset commit, it commit offset after it has successfully
>> > > processed the
>> message.
>> > >
>> > > Producer setting
>> > > bootstrap.servers​= {point the F5 VS fronting Kafka cluster}
>> > > key.serializer= {appropriate value as per your cases}
>> > > value.serializer= {appropriate value as per your case} acks= all
>> > > retries=3
>> > > ssl.key.password= {appropriate value as per your case}
>> > > ssl.keystore.location= {appropriate value as per your case}
>> > > ssl.keystore.password= {appropriate value as per your case}
>> > > ssl.truststore.location= {appropriate value as per your case}
>> > > ssl.truststore.password= {appropriate value as per your case}
>> > > batch.size=16384​ client.id= {appropriate value as per your case,
>> > > may help with
>> debugging}
>> > > max.block.ms​=65000
>> > > request.timeout.ms=30000
>> > > security.protocol= SSL
>> > > ssl.enabled.protocols=TLSv1.2
>> > > ssl.keystore.type=JKS
>> > > ssl.protocol=TLSv1.2
>> > > ssl.truststore.type=JKS
>> > > max.in.flight.requests.per.connection=1
>> > > metadata.fetch.timeout.ms=60000
>> > > reconnect.backoff.ms=1000
>> > > retry.backoff.ms​=1000
>> > > max.request.size=1048576​​
>> > > linger.ms=0
>> > >
>> > > Consumer setting
>> > > bootstrap.servers​= {point the F5 VS fronting Kafka cluster}
>> > > key.deserializer= {appropriate value as per your cases}
>> > > value.deserializer= {appropriate value as per your case}
>> > > group.id= {appropriate value as per your case} ssl.key.password=
>> > > {appropriate value as per your case} ssl.keystore.location=
>> > > {appropriate value as per your case} ssl.keystore.password=
>> > > {appropriate value as per your case} ssl.truststore.location=
>> > > {appropriate value as per your case} ssl.truststore.password=
>> > > {appropriate value as per your case} enable.auto.commit=false
>> > > security.protocol= SSL
>> > > ssl.enabled.protocols=TLSv1.2
>> > > ssl.keystore.type=JKS
>> > > ssl.protocol=TLSv1.2
>> > > ssl.truststore.type=JKS
>> > > client.id= {appropriate value as per your case, may help with
>> > debugging}​
>> > > reconnect.backoff.ms=1000
>> > > retry.backoff.ms​=1000​
>> > >
>> > > Thanks,
>> > > Shri
>> > >
>> > > -----Original Message-----
>> > > From: Hans Jespersen [mailto:hans@confluent.io]
>> > > Sent: Tuesday, April 18, 2017 7:57 PM
>> > > To: users@kafka.apache.org
>> > > Subject: [EXTERNAL] Re: ZK and Kafka failover testing
>> > >
>> > > ***** Notice: This email was received from an external source
>> > > *****
>> > >
>> > > When you publish, is acks=0,1 or all (-1)?
>> > > What is max.in.flight.requests.per.connection (default is 5)?
>> > >
>> > > It sounds to me like your publishers are using acks=0 and so they
>> > > are
>> not
>> > > actually succeeding in publishing (i.e. you are getting no acks)
>> > > but
>> they
>> > > will retry over and over and will have up to 5 retries in flight,
>> > > so
>> when
>> > > the broker comes back up, you are getting 4 or 5 copies of the
>> > > same
>> > message.
>> > >
>> > > Try setting max.in.flight.requests.per.connection=1 to get rid of
>> > > duplicates Try setting acks=all to ensure the messages are being
>> > persisted
>> > > by the leader and all the available replicas in the kafka cluster.
>> > >
>> > > -hans
>> > >
>> > > /**
>> > >  * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
>> > >  * hans@confluent.io (650)924-2670  */
>> > >
>> > > On Tue, Apr 18, 2017 at 4:10 PM, Shrikant Patel
>> > > <SP...@pdxinc.com>
>> > wrote:
>> > >
>> > > > Hi All,
>> > > >
>> > > > I am seeing strange behavior between ZK and Kafka. We have 5
>> > > > node in ZK and Kafka cluster each. Kafka version -
>> > > > 2.11-0.10.1.1
>> > > >
>> > > > The min.insync.replicas is 3, replication.factor is 5 for all
>> topics,
>> > > > unclean.leader.election.enable is false. We have 15 partitions
>> > > > for each topic.
>> > > >
>> > > > The step we are following in our testing.
>> > > >
>> > > >
>> > > > *         My understanding is that ZK needs aleast 3 out of 5
>> server to
>> > > be
>> > > > functional. Kafka could not be functional without zookeeper. In
>> > > > out testing, we bring down 3 ZK nodes and don't touch Kafka
>> > > > nodes. Kafka is still functional, consumer\producer can still
>> > > > consume\publish
>> from
>> > > > Kafka cluster. We then bring down all ZK nodes, Kafka
>> > > > consumer\producers are still functional. I am not able to
>> > > > understand why Kafka cluster is not failing as soon as majority
>> > > > of ZK nodes are down. I do see error in Kafka that it cannot
>> > > > connection to ZK
>> cluster.
>> > > >
>> > > >
>> > > >
>> > > > *         With all or majority of ZK node down, we bring down 1
>> Kafka
>> > > > nodes (out of 5, so 4 are running). And at that point the
>> > > > consumer
>> and
>> > > > producer start failing. My guess is the new leadership election
>> cannot
>> > > > happen without ZK.
>> > > >
>> > > >
>> > > >
>> > > > *         Then we bring up the majority of ZK node up. (1st Kafka is
>> > > still
>> > > > down) Now the Kafka cluster become functional, consumer and
>> > > > producer now start working again. But Consumer sees big junk of
>> > > > message from kafka, and many of them are duplicates. It's like
>> > > > these messages
>> were
>> > > > held up somewhere, Where\Why I don't know?  And why the duplicates?
>> I
>> > > > can understand few duplicates for messages that consumer would
>> > > > not commit before 1st node when down. But why so many
>> > > > duplicates and
>> like
>> > > > 4 copy for each message. I cannot understand this behavior.
>> > > >
>> > > > Appreciate some insight about our issues. Also if there are
>> > > > blogs
>> that
>> > > > describe the ZK and Kafka failover scenario behaviors, that
>> > > > would be extremely helpful.
>> > > >
>> > > > Thanks,
>> > > > Shri
>> > > >
>> > > > This e-mail and its contents (to include attachments) are the
>> property
>> > > > of National Health Systems, Inc., its subsidiaries and
>> > > > affiliates, including but not limited to Rx.com Community
>> > > > Healthcare Network,
>> Inc.
>> > > > and its subsidiaries, and may contain confidential and
>> > > > proprietary
>> or
>> > > > privileged information. If you are not the intended recipient
>> > > > of
>> this
>> > > > e-mail, you are hereby notified that any unauthorized
>> > > > disclosure, copying, or distribution of this e-mail or of its
>> > > > attachments, or
>> the
>> > > > taking of any unauthorized action based on information
>> > > > contained
>> herein
>> > > is strictly prohibited.
>> > > > Unauthorized use of information contained herein may subject
>> > > > you to civil and criminal prosecution and penalties. If you are
>> > > > not the intended recipient, please immediately notify the
>> > > > sender by
>> telephone
>> > > > at
>> > > > 800-433-5719 <(800)%20433-5719> or return e-mail and
>> > > > permanently
>> delete the original
>> > e-mail.
>> > > >
>> > > This e-mail and its contents (to include attachments) are the
>> property of
>> > > National Health Systems, Inc., its subsidiaries and affiliates,
>> including
>> > > but not limited to Rx.com Community Healthcare Network, Inc. and
>> > > its subsidiaries, and may contain confidential and proprietary or
>> privileged
>> > > information. If you are not the intended recipient of this
>> > > e-mail, you
>> > are
>> > > hereby notified that any unauthorized disclosure, copying, or
>> > distribution
>> > > of this e-mail or of its attachments, or the taking of any
>> unauthorized
>> > > action based on information contained herein is strictly prohibited.
>> > > Unauthorized use of information contained herein may subject you
>> > > to
>> civil
>> > > and criminal prosecution and penalties. If you are not the
>> > > intended recipient, please immediately notify the sender by
>> > > telephone at
>> > > 800-433-5719 <(800)%20433-5719> or return e-mail and permanently
>> delete the original e-mail.
>> > >
>> >
>>
>
>
This e-mail and its contents (to include attachments) are the property of National Health Systems, Inc., its subsidiaries and affiliates, including but not limited to Rx.com Community Healthcare Network, Inc. and its subsidiaries, and may contain confidential and proprietary or privileged information. If you are not the intended recipient of this e-mail, you are hereby notified that any unauthorized disclosure, copying, or distribution of this e-mail or of its attachments, or the taking of any unauthorized action based on information contained herein is strictly prohibited. Unauthorized use of information contained herein may subject you to civil and criminal prosecution and penalties. If you are not the intended recipient, please immediately notify the sender by telephone at 800-433-5719 or return e-mail and permanently delete the original e-mail.

Re: Re: ZK and Kafka failover testing

Posted by Jeff Widman <je...@netskope.com>.
Oops, I linked to the wrong ticket, this is the one we hit:
https://issues.apache.org/jira/browse/KAFKA-3042

On Wed, Apr 19, 2017 at 1:45 PM, Jeff Widman <je...@netskope.com> wrote:

>
>
>
>
>
> *As Onur explained, if ZK is down, Kafka can still work, but won't be able
> to react to actual broker failures until ZK is up again. So if a broker is
> down in that window, some of the partitions may not be ready for read or
> write.*
> We had a production scenario where ZK had a long GC pause and Kafka lost
> connection temporarily. The brokers kept sending data just fine for
> existing topics. However, when ZK came back, the kafka cluster could not
> recover gracefully because of this issue: https://issues.apache.org/
> jira/browse/KAFKA-2729
> IIRC, in our case, the cached data that was mismatched was the controller
> generations in zookeeper for the partition leaders did not match the
> generation id listed in the controller znode.
> Manually forcing a controller re-election solved this because it brought
> all generation IDs in sync. However, it would have been nice if the cluster
> had been able to automatically do the controller re-election without
> waiting for manual intervention.
>
> On Wed, Apr 19, 2017 at 1:30 PM, Jun Rao <ju...@confluent.io> wrote:
>
>> Hi, Shri,
>>
>> As Onur explained, if ZK is down, Kafka can still work, but won't be able
>> to react to actual broker failures until ZK is up again. So if a broker is
>> down in that window, some of the partitions may not be ready for read or
>> write.
>>
>> As for the duplicates in the consumer, Hans had a good point. It would be
>> useful to see if the duplicates are introduced by the producer or the
>> consumer. Perhaps you can read the log again and see if duplicates are in
>> the log in the first place. Note that broker retries can introduce
>> duplicates.
>>
>> Hi, Onur,
>>
>> For the data loss issue that you mentioned, that should only happen with
>> acks=1. As we discussed offline, if acks=all is used and unclean leader
>> election is disabled, acked messages shouldn't be lost.
>>
>> Thanks,
>>
>> Jun
>>
>>
>> On Wed, Apr 19, 2017 at 10:19 AM, Onur Karaman <
>> onurkaraman.apache@gmail.com
>> > wrote:
>>
>> > If this is what I think it is, it has nothing to do with acks,
>> > max.in.flight.requests.per.connection, or anything client-side and is
>> > purely about the kafka cluster.
>> >
>> > Here's a simple example involving a single zookeeper instance, 3
>> brokers, a
>> > KafkaConsumer and KafkaProducer (neither of these clients interact with
>> > zookeeper).
>> > 1. start up zookeeper:
>> > > ./bin/zookeeper-server-start.sh config/zookeeper.properties
>> >
>> > 2. start up some brokers:
>> > > ./bin/kafka-server-start.sh config/server0.properties
>> > > ./bin/kafka-server-start.sh config/server1.properties
>> > > ./bin/kafka-server-start.sh config/server2.properties
>> >
>> > 3 create a topic:
>> > > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t
>> > --partitions 1 --replication-factor 3
>> >
>> > 4. start a console consumer (this needs to happen before step 5 so we
>> can
>> > write __consumer_offsets metadata to zookeeper):
>> > > ./bin/kafka-console-consumer.sh --broker-list
>> > localhost:9090,localhost:9091,localhost:9092 --topic t
>> >
>> > 5. kill zookeeper
>> >
>> > 6. start a console producer and produce some messages:
>> > > ./bin/kafka-console-producer.sh --broker-list
>> > localhost:9090,localhost:9091,localhost:9092 --topic t
>> >
>> > 7. notice the size of the broker logs grow with each message you send:
>> > > l /tmp/kafka-logs*/t-0
>> >
>> > 8. notice the consumer consuming the messages being produced
>> >
>> > Basically, zookeeper can be completely offline and your brokers will
>> append
>> > to logs and process client requests just fine as long as it doesn't
>> need to
>> > interact with zookeeper. Today, the only way a broker knows to stop
>> > accepting requests is when it receives instruction from the controller.
>> >
>> > I first realized this last July when debugging a small production data
>> loss
>> > scenario that was a result of this[1]. Maybe this is an attempt at
>> leaning
>> > towards availability over consistency. Personally I think that brokers
>> > should stop accepting requests when it disconnects from zookeeper.
>> >
>> > [1] The small production data loss scenario happens when accepting
>> requests
>> > during the small window in between a broker's zookeeper session
>> expiration
>> > and when the controller instructs the broker to stop accepting requests.
>> > During this time, the broker still thinks it leads partitions that are
>> > currently being led by another broker, effectively resulting in a window
>> > where the partition is led by two brokers. Clients can continue sending
>> > requests to the old leader, and for producers with low acknowledgement
>> > settings (like ack=1), their messages will be lost without the client
>> > knowing, as the messages are being appended to the phantom leader's logs
>> > instead of the true leader's logs.
>> >
>> > On Wed, Apr 19, 2017 at 7:56 AM, Shrikant Patel <SP...@pdxinc.com>
>> wrote:
>> >
>> > > While we were testing, our producer had following configuration
>> > > max.in.flight.requests.per.connection=1, acks= all and retries=3.
>> > >
>> > > The entire producer side set is below. The consumer has manual offset
>> > > commit, it commit offset after it has successfully processed the
>> message.
>> > >
>> > > Producer setting
>> > > bootstrap.servers​= {point the F5 VS fronting Kafka cluster}
>> > > key.serializer= {appropriate value as per your cases}
>> > > value.serializer= {appropriate value as per your case}
>> > > acks= all
>> > > retries=3
>> > > ssl.key.password= {appropriate value as per your case}
>> > > ssl.keystore.location= {appropriate value as per your case}
>> > > ssl.keystore.password= {appropriate value as per your case}
>> > > ssl.truststore.location= {appropriate value as per your case}
>> > > ssl.truststore.password= {appropriate value as per your case}
>> > > batch.size=16384​
>> > > client.id= {appropriate value as per your case, may help with
>> debugging}
>> > > max.block.ms​=65000
>> > > request.timeout.ms=30000
>> > > security.protocol= SSL
>> > > ssl.enabled.protocols=TLSv1.2
>> > > ssl.keystore.type=JKS
>> > > ssl.protocol=TLSv1.2
>> > > ssl.truststore.type=JKS
>> > > max.in.flight.requests.per.connection=1
>> > > metadata.fetch.timeout.ms=60000
>> > > reconnect.backoff.ms=1000
>> > > retry.backoff.ms​=1000
>> > > max.request.size=1048576​​
>> > > linger.ms=0
>> > >
>> > > Consumer setting
>> > > bootstrap.servers​= {point the F5 VS fronting Kafka cluster}
>> > > key.deserializer= {appropriate value as per your cases}
>> > > value.deserializer= {appropriate value as per your case}
>> > > group.id= {appropriate value as per your case}
>> > > ssl.key.password= {appropriate value as per your case}
>> > > ssl.keystore.location= {appropriate value as per your case}
>> > > ssl.keystore.password= {appropriate value as per your case}
>> > > ssl.truststore.location= {appropriate value as per your case}
>> > > ssl.truststore.password= {appropriate value as per your case}
>> > > enable.auto.commit=false
>> > > security.protocol= SSL
>> > > ssl.enabled.protocols=TLSv1.2
>> > > ssl.keystore.type=JKS
>> > > ssl.protocol=TLSv1.2
>> > > ssl.truststore.type=JKS
>> > > client.id= {appropriate value as per your case, may help with
>> > debugging}​
>> > > reconnect.backoff.ms=1000
>> > > retry.backoff.ms​=1000​
>> > >
>> > > Thanks,
>> > > Shri
>> > >
>> > > -----Original Message-----
>> > > From: Hans Jespersen [mailto:hans@confluent.io]
>> > > Sent: Tuesday, April 18, 2017 7:57 PM
>> > > To: users@kafka.apache.org
>> > > Subject: [EXTERNAL] Re: ZK and Kafka failover testing
>> > >
>> > > ***** Notice: This email was received from an external source *****
>> > >
>> > > When you publish, is acks=0,1 or all (-1)?
>> > > What is max.in.flight.requests.per.connection (default is 5)?
>> > >
>> > > It sounds to me like your publishers are using acks=0 and so they are
>> not
>> > > actually succeeding in publishing (i.e. you are getting no acks) but
>> they
>> > > will retry over and over and will have up to 5 retries in flight, so
>> when
>> > > the broker comes back up, you are getting 4 or 5 copies of the same
>> > message.
>> > >
>> > > Try setting max.in.flight.requests.per.connection=1 to get rid of
>> > > duplicates Try setting acks=all to ensure the messages are being
>> > persisted
>> > > by the leader and all the available replicas in the kafka cluster.
>> > >
>> > > -hans
>> > >
>> > > /**
>> > >  * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
>> > >  * hans@confluent.io (650)924-2670
>> > >  */
>> > >
>> > > On Tue, Apr 18, 2017 at 4:10 PM, Shrikant Patel <SP...@pdxinc.com>
>> > wrote:
>> > >
>> > > > Hi All,
>> > > >
>> > > > I am seeing strange behavior between ZK and Kafka. We have 5 node in
>> > > > ZK and Kafka cluster each. Kafka version - 2.11-0.10.1.1
>> > > >
>> > > > The min.insync.replicas is 3, replication.factor is 5 for all
>> topics,
>> > > > unclean.leader.election.enable is false. We have 15 partitions for
>> > > > each topic.
>> > > >
>> > > > The step we are following in our testing.
>> > > >
>> > > >
>> > > > *         My understanding is that ZK needs aleast 3 out of 5
>> server to
>> > > be
>> > > > functional. Kafka could not be functional without zookeeper. In out
>> > > > testing, we bring down 3 ZK nodes and don't touch Kafka nodes. Kafka
>> > > > is still functional, consumer\producer can still consume\publish
>> from
>> > > > Kafka cluster. We then bring down all ZK nodes, Kafka
>> > > > consumer\producers are still functional. I am not able to understand
>> > > > why Kafka cluster is not failing as soon as majority of ZK nodes are
>> > > > down. I do see error in Kafka that it cannot connection to ZK
>> cluster.
>> > > >
>> > > >
>> > > >
>> > > > *         With all or majority of ZK node down, we bring down 1
>> Kafka
>> > > > nodes (out of 5, so 4 are running). And at that point the consumer
>> and
>> > > > producer start failing. My guess is the new leadership election
>> cannot
>> > > > happen without ZK.
>> > > >
>> > > >
>> > > >
>> > > > *         Then we bring up the majority of ZK node up. (1st Kafka is
>> > > still
>> > > > down) Now the Kafka cluster become functional, consumer and producer
>> > > > now start working again. But Consumer sees big junk of message from
>> > > > kafka, and many of them are duplicates. It's like these messages
>> were
>> > > > held up somewhere, Where\Why I don't know?  And why the duplicates?
>> I
>> > > > can understand few duplicates for messages that consumer would not
>> > > > commit before 1st node when down. But why so many duplicates and
>> like
>> > > > 4 copy for each message. I cannot understand this behavior.
>> > > >
>> > > > Appreciate some insight about our issues. Also if there are blogs
>> that
>> > > > describe the ZK and Kafka failover scenario behaviors, that would be
>> > > > extremely helpful.
>> > > >
>> > > > Thanks,
>> > > > Shri
>> > > >
>> > > > This e-mail and its contents (to include attachments) are the
>> property
>> > > > of National Health Systems, Inc., its subsidiaries and affiliates,
>> > > > including but not limited to Rx.com Community Healthcare Network,
>> Inc.
>> > > > and its subsidiaries, and may contain confidential and proprietary
>> or
>> > > > privileged information. If you are not the intended recipient of
>> this
>> > > > e-mail, you are hereby notified that any unauthorized disclosure,
>> > > > copying, or distribution of this e-mail or of its attachments, or
>> the
>> > > > taking of any unauthorized action based on information contained
>> herein
>> > > is strictly prohibited.
>> > > > Unauthorized use of information contained herein may subject you to
>> > > > civil and criminal prosecution and penalties. If you are not the
>> > > > intended recipient, please immediately notify the sender by
>> telephone
>> > > > at
>> > > > 800-433-5719 <(800)%20433-5719> or return e-mail and permanently
>> delete the original
>> > e-mail.
>> > > >
>> > > This e-mail and its contents (to include attachments) are the
>> property of
>> > > National Health Systems, Inc., its subsidiaries and affiliates,
>> including
>> > > but not limited to Rx.com Community Healthcare Network, Inc. and its
>> > > subsidiaries, and may contain confidential and proprietary or
>> privileged
>> > > information. If you are not the intended recipient of this e-mail, you
>> > are
>> > > hereby notified that any unauthorized disclosure, copying, or
>> > distribution
>> > > of this e-mail or of its attachments, or the taking of any
>> unauthorized
>> > > action based on information contained herein is strictly prohibited.
>> > > Unauthorized use of information contained herein may subject you to
>> civil
>> > > and criminal prosecution and penalties. If you are not the intended
>> > > recipient, please immediately notify the sender by telephone at
>> > > 800-433-5719 <(800)%20433-5719> or return e-mail and permanently
>> delete the original e-mail.
>> > >
>> >
>>
>
>

Re: Re: ZK and Kafka failover testing

Posted by Jeff Widman <je...@netskope.com>.
*As Onur explained, if ZK is down, Kafka can still work, but won't be able
to react to actual broker failures until ZK is up again. So if a broker is
down in that window, some of the partitions may not be ready for read or
write.*
We had a production scenario where ZK had a long GC pause and Kafka lost
connection temporarily. The brokers kept sending data just fine for
existing topics. However, when ZK came back, the kafka cluster could not
recover gracefully because of this issue:
https://issues.apache.org/jira/browse/KAFKA-2729
IIRC, in our case, the cached data that was mismatched was the controller
generations in zookeeper for the partition leaders did not match the
generation id listed in the controller znode.
Manually forcing a controller re-election solved this because it brought
all generation IDs in sync. However, it would have been nice if the cluster
had been able to automatically do the controller re-election without
waiting for manual intervention.

On Wed, Apr 19, 2017 at 1:30 PM, Jun Rao <ju...@confluent.io> wrote:

> Hi, Shri,
>
> As Onur explained, if ZK is down, Kafka can still work, but won't be able
> to react to actual broker failures until ZK is up again. So if a broker is
> down in that window, some of the partitions may not be ready for read or
> write.
>
> As for the duplicates in the consumer, Hans had a good point. It would be
> useful to see if the duplicates are introduced by the producer or the
> consumer. Perhaps you can read the log again and see if duplicates are in
> the log in the first place. Note that broker retries can introduce
> duplicates.
>
> Hi, Onur,
>
> For the data loss issue that you mentioned, that should only happen with
> acks=1. As we discussed offline, if acks=all is used and unclean leader
> election is disabled, acked messages shouldn't be lost.
>
> Thanks,
>
> Jun
>
>
> On Wed, Apr 19, 2017 at 10:19 AM, Onur Karaman <
> onurkaraman.apache@gmail.com
> > wrote:
>
> > If this is what I think it is, it has nothing to do with acks,
> > max.in.flight.requests.per.connection, or anything client-side and is
> > purely about the kafka cluster.
> >
> > Here's a simple example involving a single zookeeper instance, 3
> brokers, a
> > KafkaConsumer and KafkaProducer (neither of these clients interact with
> > zookeeper).
> > 1. start up zookeeper:
> > > ./bin/zookeeper-server-start.sh config/zookeeper.properties
> >
> > 2. start up some brokers:
> > > ./bin/kafka-server-start.sh config/server0.properties
> > > ./bin/kafka-server-start.sh config/server1.properties
> > > ./bin/kafka-server-start.sh config/server2.properties
> >
> > 3 create a topic:
> > > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t
> > --partitions 1 --replication-factor 3
> >
> > 4. start a console consumer (this needs to happen before step 5 so we can
> > write __consumer_offsets metadata to zookeeper):
> > > ./bin/kafka-console-consumer.sh --broker-list
> > localhost:9090,localhost:9091,localhost:9092 --topic t
> >
> > 5. kill zookeeper
> >
> > 6. start a console producer and produce some messages:
> > > ./bin/kafka-console-producer.sh --broker-list
> > localhost:9090,localhost:9091,localhost:9092 --topic t
> >
> > 7. notice the size of the broker logs grow with each message you send:
> > > l /tmp/kafka-logs*/t-0
> >
> > 8. notice the consumer consuming the messages being produced
> >
> > Basically, zookeeper can be completely offline and your brokers will
> append
> > to logs and process client requests just fine as long as it doesn't need
> to
> > interact with zookeeper. Today, the only way a broker knows to stop
> > accepting requests is when it receives instruction from the controller.
> >
> > I first realized this last July when debugging a small production data
> loss
> > scenario that was a result of this[1]. Maybe this is an attempt at
> leaning
> > towards availability over consistency. Personally I think that brokers
> > should stop accepting requests when it disconnects from zookeeper.
> >
> > [1] The small production data loss scenario happens when accepting
> requests
> > during the small window in between a broker's zookeeper session
> expiration
> > and when the controller instructs the broker to stop accepting requests.
> > During this time, the broker still thinks it leads partitions that are
> > currently being led by another broker, effectively resulting in a window
> > where the partition is led by two brokers. Clients can continue sending
> > requests to the old leader, and for producers with low acknowledgement
> > settings (like ack=1), their messages will be lost without the client
> > knowing, as the messages are being appended to the phantom leader's logs
> > instead of the true leader's logs.
> >
> > On Wed, Apr 19, 2017 at 7:56 AM, Shrikant Patel <SP...@pdxinc.com>
> wrote:
> >
> > > While we were testing, our producer had following configuration
> > > max.in.flight.requests.per.connection=1, acks= all and retries=3.
> > >
> > > The entire producer side set is below. The consumer has manual offset
> > > commit, it commit offset after it has successfully processed the
> message.
> > >
> > > Producer setting
> > > bootstrap.servers​= {point the F5 VS fronting Kafka cluster}
> > > key.serializer= {appropriate value as per your cases}
> > > value.serializer= {appropriate value as per your case}
> > > acks= all
> > > retries=3
> > > ssl.key.password= {appropriate value as per your case}
> > > ssl.keystore.location= {appropriate value as per your case}
> > > ssl.keystore.password= {appropriate value as per your case}
> > > ssl.truststore.location= {appropriate value as per your case}
> > > ssl.truststore.password= {appropriate value as per your case}
> > > batch.size=16384​
> > > client.id= {appropriate value as per your case, may help with
> debugging}
> > > max.block.ms​=65000
> > > request.timeout.ms=30000
> > > security.protocol= SSL
> > > ssl.enabled.protocols=TLSv1.2
> > > ssl.keystore.type=JKS
> > > ssl.protocol=TLSv1.2
> > > ssl.truststore.type=JKS
> > > max.in.flight.requests.per.connection=1
> > > metadata.fetch.timeout.ms=60000
> > > reconnect.backoff.ms=1000
> > > retry.backoff.ms​=1000
> > > max.request.size=1048576​​
> > > linger.ms=0
> > >
> > > Consumer setting
> > > bootstrap.servers​= {point the F5 VS fronting Kafka cluster}
> > > key.deserializer= {appropriate value as per your cases}
> > > value.deserializer= {appropriate value as per your case}
> > > group.id= {appropriate value as per your case}
> > > ssl.key.password= {appropriate value as per your case}
> > > ssl.keystore.location= {appropriate value as per your case}
> > > ssl.keystore.password= {appropriate value as per your case}
> > > ssl.truststore.location= {appropriate value as per your case}
> > > ssl.truststore.password= {appropriate value as per your case}
> > > enable.auto.commit=false
> > > security.protocol= SSL
> > > ssl.enabled.protocols=TLSv1.2
> > > ssl.keystore.type=JKS
> > > ssl.protocol=TLSv1.2
> > > ssl.truststore.type=JKS
> > > client.id= {appropriate value as per your case, may help with
> > debugging}​
> > > reconnect.backoff.ms=1000
> > > retry.backoff.ms​=1000​
> > >
> > > Thanks,
> > > Shri
> > >
> > > -----Original Message-----
> > > From: Hans Jespersen [mailto:hans@confluent.io]
> > > Sent: Tuesday, April 18, 2017 7:57 PM
> > > To: users@kafka.apache.org
> > > Subject: [EXTERNAL] Re: ZK and Kafka failover testing
> > >
> > > ***** Notice: This email was received from an external source *****
> > >
> > > When you publish, is acks=0,1 or all (-1)?
> > > What is max.in.flight.requests.per.connection (default is 5)?
> > >
> > > It sounds to me like your publishers are using acks=0 and so they are
> not
> > > actually succeeding in publishing (i.e. you are getting no acks) but
> they
> > > will retry over and over and will have up to 5 retries in flight, so
> when
> > > the broker comes back up, you are getting 4 or 5 copies of the same
> > message.
> > >
> > > Try setting max.in.flight.requests.per.connection=1 to get rid of
> > > duplicates Try setting acks=all to ensure the messages are being
> > persisted
> > > by the leader and all the available replicas in the kafka cluster.
> > >
> > > -hans
> > >
> > > /**
> > >  * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
> > >  * hans@confluent.io (650)924-2670
> > >  */
> > >
> > > On Tue, Apr 18, 2017 at 4:10 PM, Shrikant Patel <SP...@pdxinc.com>
> > wrote:
> > >
> > > > Hi All,
> > > >
> > > > I am seeing strange behavior between ZK and Kafka. We have 5 node in
> > > > ZK and Kafka cluster each. Kafka version - 2.11-0.10.1.1
> > > >
> > > > The min.insync.replicas is 3, replication.factor is 5 for all topics,
> > > > unclean.leader.election.enable is false. We have 15 partitions for
> > > > each topic.
> > > >
> > > > The step we are following in our testing.
> > > >
> > > >
> > > > *         My understanding is that ZK needs aleast 3 out of 5 server
> to
> > > be
> > > > functional. Kafka could not be functional without zookeeper. In out
> > > > testing, we bring down 3 ZK nodes and don't touch Kafka nodes. Kafka
> > > > is still functional, consumer\producer can still consume\publish from
> > > > Kafka cluster. We then bring down all ZK nodes, Kafka
> > > > consumer\producers are still functional. I am not able to understand
> > > > why Kafka cluster is not failing as soon as majority of ZK nodes are
> > > > down. I do see error in Kafka that it cannot connection to ZK
> cluster.
> > > >
> > > >
> > > >
> > > > *         With all or majority of ZK node down, we bring down 1 Kafka
> > > > nodes (out of 5, so 4 are running). And at that point the consumer
> and
> > > > producer start failing. My guess is the new leadership election
> cannot
> > > > happen without ZK.
> > > >
> > > >
> > > >
> > > > *         Then we bring up the majority of ZK node up. (1st Kafka is
> > > still
> > > > down) Now the Kafka cluster become functional, consumer and producer
> > > > now start working again. But Consumer sees big junk of message from
> > > > kafka, and many of them are duplicates. It's like these messages were
> > > > held up somewhere, Where\Why I don't know?  And why the duplicates? I
> > > > can understand few duplicates for messages that consumer would not
> > > > commit before 1st node when down. But why so many duplicates and like
> > > > 4 copy for each message. I cannot understand this behavior.
> > > >
> > > > Appreciate some insight about our issues. Also if there are blogs
> that
> > > > describe the ZK and Kafka failover scenario behaviors, that would be
> > > > extremely helpful.
> > > >
> > > > Thanks,
> > > > Shri
> > > >
> > > > This e-mail and its contents (to include attachments) are the
> property
> > > > of National Health Systems, Inc., its subsidiaries and affiliates,
> > > > including but not limited to Rx.com Community Healthcare Network,
> Inc.
> > > > and its subsidiaries, and may contain confidential and proprietary or
> > > > privileged information. If you are not the intended recipient of this
> > > > e-mail, you are hereby notified that any unauthorized disclosure,
> > > > copying, or distribution of this e-mail or of its attachments, or the
> > > > taking of any unauthorized action based on information contained
> herein
> > > is strictly prohibited.
> > > > Unauthorized use of information contained herein may subject you to
> > > > civil and criminal prosecution and penalties. If you are not the
> > > > intended recipient, please immediately notify the sender by telephone
> > > > at
> > > > 800-433-5719 or return e-mail and permanently delete the original
> > e-mail.
> > > >
> > > This e-mail and its contents (to include attachments) are the property
> of
> > > National Health Systems, Inc., its subsidiaries and affiliates,
> including
> > > but not limited to Rx.com Community Healthcare Network, Inc. and its
> > > subsidiaries, and may contain confidential and proprietary or
> privileged
> > > information. If you are not the intended recipient of this e-mail, you
> > are
> > > hereby notified that any unauthorized disclosure, copying, or
> > distribution
> > > of this e-mail or of its attachments, or the taking of any unauthorized
> > > action based on information contained herein is strictly prohibited.
> > > Unauthorized use of information contained herein may subject you to
> civil
> > > and criminal prosecution and penalties. If you are not the intended
> > > recipient, please immediately notify the sender by telephone at
> > > 800-433-5719 or return e-mail and permanently delete the original
> e-mail.
> > >
> >
>

Re: Re: ZK and Kafka failover testing

Posted by Jun Rao <ju...@confluent.io>.
Hi, Shri,

As Onur explained, if ZK is down, Kafka can still work, but won't be able
to react to actual broker failures until ZK is up again. So if a broker is
down in that window, some of the partitions may not be ready for read or
write.

As for the duplicates in the consumer, Hans had a good point. It would be
useful to see if the duplicates are introduced by the producer or the
consumer. Perhaps you can read the log again and see if duplicates are in
the log in the first place. Note that broker retries can introduce
duplicates.

Hi, Onur,

For the data loss issue that you mentioned, that should only happen with
acks=1. As we discussed offline, if acks=all is used and unclean leader
election is disabled, acked messages shouldn't be lost.

Thanks,

Jun


On Wed, Apr 19, 2017 at 10:19 AM, Onur Karaman <onurkaraman.apache@gmail.com
> wrote:

> If this is what I think it is, it has nothing to do with acks,
> max.in.flight.requests.per.connection, or anything client-side and is
> purely about the kafka cluster.
>
> Here's a simple example involving a single zookeeper instance, 3 brokers, a
> KafkaConsumer and KafkaProducer (neither of these clients interact with
> zookeeper).
> 1. start up zookeeper:
> > ./bin/zookeeper-server-start.sh config/zookeeper.properties
>
> 2. start up some brokers:
> > ./bin/kafka-server-start.sh config/server0.properties
> > ./bin/kafka-server-start.sh config/server1.properties
> > ./bin/kafka-server-start.sh config/server2.properties
>
> 3 create a topic:
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t
> --partitions 1 --replication-factor 3
>
> 4. start a console consumer (this needs to happen before step 5 so we can
> write __consumer_offsets metadata to zookeeper):
> > ./bin/kafka-console-consumer.sh --broker-list
> localhost:9090,localhost:9091,localhost:9092 --topic t
>
> 5. kill zookeeper
>
> 6. start a console producer and produce some messages:
> > ./bin/kafka-console-producer.sh --broker-list
> localhost:9090,localhost:9091,localhost:9092 --topic t
>
> 7. notice the size of the broker logs grow with each message you send:
> > l /tmp/kafka-logs*/t-0
>
> 8. notice the consumer consuming the messages being produced
>
> Basically, zookeeper can be completely offline and your brokers will append
> to logs and process client requests just fine as long as it doesn't need to
> interact with zookeeper. Today, the only way a broker knows to stop
> accepting requests is when it receives instruction from the controller.
>
> I first realized this last July when debugging a small production data loss
> scenario that was a result of this[1]. Maybe this is an attempt at leaning
> towards availability over consistency. Personally I think that brokers
> should stop accepting requests when it disconnects from zookeeper.
>
> [1] The small production data loss scenario happens when accepting requests
> during the small window in between a broker's zookeeper session expiration
> and when the controller instructs the broker to stop accepting requests.
> During this time, the broker still thinks it leads partitions that are
> currently being led by another broker, effectively resulting in a window
> where the partition is led by two brokers. Clients can continue sending
> requests to the old leader, and for producers with low acknowledgement
> settings (like ack=1), their messages will be lost without the client
> knowing, as the messages are being appended to the phantom leader's logs
> instead of the true leader's logs.
>
> On Wed, Apr 19, 2017 at 7:56 AM, Shrikant Patel <SP...@pdxinc.com> wrote:
>
> > While we were testing, our producer had following configuration
> > max.in.flight.requests.per.connection=1, acks= all and retries=3.
> >
> > The entire producer side set is below. The consumer has manual offset
> > commit, it commit offset after it has successfully processed the message.
> >
> > Producer setting
> > bootstrap.servers​= {point the F5 VS fronting Kafka cluster}
> > key.serializer= {appropriate value as per your cases}
> > value.serializer= {appropriate value as per your case}
> > acks= all
> > retries=3
> > ssl.key.password= {appropriate value as per your case}
> > ssl.keystore.location= {appropriate value as per your case}
> > ssl.keystore.password= {appropriate value as per your case}
> > ssl.truststore.location= {appropriate value as per your case}
> > ssl.truststore.password= {appropriate value as per your case}
> > batch.size=16384​
> > client.id= {appropriate value as per your case, may help with debugging}
> > max.block.ms​=65000
> > request.timeout.ms=30000
> > security.protocol= SSL
> > ssl.enabled.protocols=TLSv1.2
> > ssl.keystore.type=JKS
> > ssl.protocol=TLSv1.2
> > ssl.truststore.type=JKS
> > max.in.flight.requests.per.connection=1
> > metadata.fetch.timeout.ms=60000
> > reconnect.backoff.ms=1000
> > retry.backoff.ms​=1000
> > max.request.size=1048576​​
> > linger.ms=0
> >
> > Consumer setting
> > bootstrap.servers​= {point the F5 VS fronting Kafka cluster}
> > key.deserializer= {appropriate value as per your cases}
> > value.deserializer= {appropriate value as per your case}
> > group.id= {appropriate value as per your case}
> > ssl.key.password= {appropriate value as per your case}
> > ssl.keystore.location= {appropriate value as per your case}
> > ssl.keystore.password= {appropriate value as per your case}
> > ssl.truststore.location= {appropriate value as per your case}
> > ssl.truststore.password= {appropriate value as per your case}
> > enable.auto.commit=false
> > security.protocol= SSL
> > ssl.enabled.protocols=TLSv1.2
> > ssl.keystore.type=JKS
> > ssl.protocol=TLSv1.2
> > ssl.truststore.type=JKS
> > client.id= {appropriate value as per your case, may help with
> debugging}​
> > reconnect.backoff.ms=1000
> > retry.backoff.ms​=1000​
> >
> > Thanks,
> > Shri
> >
> > -----Original Message-----
> > From: Hans Jespersen [mailto:hans@confluent.io]
> > Sent: Tuesday, April 18, 2017 7:57 PM
> > To: users@kafka.apache.org
> > Subject: [EXTERNAL] Re: ZK and Kafka failover testing
> >
> > ***** Notice: This email was received from an external source *****
> >
> > When you publish, is acks=0,1 or all (-1)?
> > What is max.in.flight.requests.per.connection (default is 5)?
> >
> > It sounds to me like your publishers are using acks=0 and so they are not
> > actually succeeding in publishing (i.e. you are getting no acks) but they
> > will retry over and over and will have up to 5 retries in flight, so when
> > the broker comes back up, you are getting 4 or 5 copies of the same
> message.
> >
> > Try setting max.in.flight.requests.per.connection=1 to get rid of
> > duplicates Try setting acks=all to ensure the messages are being
> persisted
> > by the leader and all the available replicas in the kafka cluster.
> >
> > -hans
> >
> > /**
> >  * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
> >  * hans@confluent.io (650)924-2670
> >  */
> >
> > On Tue, Apr 18, 2017 at 4:10 PM, Shrikant Patel <SP...@pdxinc.com>
> wrote:
> >
> > > Hi All,
> > >
> > > I am seeing strange behavior between ZK and Kafka. We have 5 node in
> > > ZK and Kafka cluster each. Kafka version - 2.11-0.10.1.1
> > >
> > > The min.insync.replicas is 3, replication.factor is 5 for all topics,
> > > unclean.leader.election.enable is false. We have 15 partitions for
> > > each topic.
> > >
> > > The step we are following in our testing.
> > >
> > >
> > > *         My understanding is that ZK needs aleast 3 out of 5 server to
> > be
> > > functional. Kafka could not be functional without zookeeper. In out
> > > testing, we bring down 3 ZK nodes and don't touch Kafka nodes. Kafka
> > > is still functional, consumer\producer can still consume\publish from
> > > Kafka cluster. We then bring down all ZK nodes, Kafka
> > > consumer\producers are still functional. I am not able to understand
> > > why Kafka cluster is not failing as soon as majority of ZK nodes are
> > > down. I do see error in Kafka that it cannot connection to ZK cluster.
> > >
> > >
> > >
> > > *         With all or majority of ZK node down, we bring down 1 Kafka
> > > nodes (out of 5, so 4 are running). And at that point the consumer and
> > > producer start failing. My guess is the new leadership election cannot
> > > happen without ZK.
> > >
> > >
> > >
> > > *         Then we bring up the majority of ZK node up. (1st Kafka is
> > still
> > > down) Now the Kafka cluster become functional, consumer and producer
> > > now start working again. But Consumer sees big junk of message from
> > > kafka, and many of them are duplicates. It's like these messages were
> > > held up somewhere, Where\Why I don't know?  And why the duplicates? I
> > > can understand few duplicates for messages that consumer would not
> > > commit before 1st node when down. But why so many duplicates and like
> > > 4 copy for each message. I cannot understand this behavior.
> > >
> > > Appreciate some insight about our issues. Also if there are blogs that
> > > describe the ZK and Kafka failover scenario behaviors, that would be
> > > extremely helpful.
> > >
> > > Thanks,
> > > Shri
> > >
> > > This e-mail and its contents (to include attachments) are the property
> > > of National Health Systems, Inc., its subsidiaries and affiliates,
> > > including but not limited to Rx.com Community Healthcare Network, Inc.
> > > and its subsidiaries, and may contain confidential and proprietary or
> > > privileged information. If you are not the intended recipient of this
> > > e-mail, you are hereby notified that any unauthorized disclosure,
> > > copying, or distribution of this e-mail or of its attachments, or the
> > > taking of any unauthorized action based on information contained herein
> > is strictly prohibited.
> > > Unauthorized use of information contained herein may subject you to
> > > civil and criminal prosecution and penalties. If you are not the
> > > intended recipient, please immediately notify the sender by telephone
> > > at
> > > 800-433-5719 or return e-mail and permanently delete the original
> e-mail.
> > >
> > This e-mail and its contents (to include attachments) are the property of
> > National Health Systems, Inc., its subsidiaries and affiliates, including
> > but not limited to Rx.com Community Healthcare Network, Inc. and its
> > subsidiaries, and may contain confidential and proprietary or privileged
> > information. If you are not the intended recipient of this e-mail, you
> are
> > hereby notified that any unauthorized disclosure, copying, or
> distribution
> > of this e-mail or of its attachments, or the taking of any unauthorized
> > action based on information contained herein is strictly prohibited.
> > Unauthorized use of information contained herein may subject you to civil
> > and criminal prosecution and penalties. If you are not the intended
> > recipient, please immediately notify the sender by telephone at
> > 800-433-5719 or return e-mail and permanently delete the original e-mail.
> >
>

Re: Re: ZK and Kafka failover testing

Posted by Hans Jespersen <ha...@confluent.io>.
The OP was asking about duplicate messages, not lost messages, so I think
we are discussing two different possible scenarios. When ever someone says
they see duplicate messages it's always good practice to first double check
ack mode, in flight messages, and retries. Also its important to check if
the messages are really duplicates in the Kafka log, or if they are just
seeing the same message reprocessed several times in the consumer due to
some other issue with offset commits.

-hans

On Wed, Apr 19, 2017 at 10:19 AM, Onur Karaman <onurkaraman.apache@gmail.com
> wrote:

> If this is what I think it is, it has nothing to do with acks,
> max.in.flight.requests.per.connection, or anything client-side and is
> purely about the kafka cluster.
>
> Here's a simple example involving a single zookeeper instance, 3 brokers, a
> KafkaConsumer and KafkaProducer (neither of these clients interact with
> zookeeper).
> 1. start up zookeeper:
> > ./bin/zookeeper-server-start.sh config/zookeeper.properties
>
> 2. start up some brokers:
> > ./bin/kafka-server-start.sh config/server0.properties
> > ./bin/kafka-server-start.sh config/server1.properties
> > ./bin/kafka-server-start.sh config/server2.properties
>
> 3 create a topic:
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t
> --partitions 1 --replication-factor 3
>
> 4. start a console consumer (this needs to happen before step 5 so we can
> write __consumer_offsets metadata to zookeeper):
> > ./bin/kafka-console-consumer.sh --broker-list
> localhost:9090,localhost:9091,localhost:9092 --topic t
>
> 5. kill zookeeper
>
> 6. start a console producer and produce some messages:
> > ./bin/kafka-console-producer.sh --broker-list
> localhost:9090,localhost:9091,localhost:9092 --topic t
>
> 7. notice the size of the broker logs grow with each message you send:
> > l /tmp/kafka-logs*/t-0
>
> 8. notice the consumer consuming the messages being produced
>
> Basically, zookeeper can be completely offline and your brokers will append
> to logs and process client requests just fine as long as it doesn't need to
> interact with zookeeper. Today, the only way a broker knows to stop
> accepting requests is when it receives instruction from the controller.
>
> I first realized this last July when debugging a small production data loss
> scenario that was a result of this[1]. Maybe this is an attempt at leaning
> towards availability over consistency. Personally I think that brokers
> should stop accepting requests when it disconnects from zookeeper.
>
> [1] The small production data loss scenario happens when accepting requests
> during the small window in between a broker's zookeeper session expiration
> and when the controller instructs the broker to stop accepting requests.
> During this time, the broker still thinks it leads partitions that are
> currently being led by another broker, effectively resulting in a window
> where the partition is led by two brokers. Clients can continue sending
> requests to the old leader, and for producers with low acknowledgement
> settings (like ack=1), their messages will be lost without the client
> knowing, as the messages are being appended to the phantom leader's logs
> instead of the true leader's logs.
>
> On Wed, Apr 19, 2017 at 7:56 AM, Shrikant Patel <SP...@pdxinc.com> wrote:
>
> > While we were testing, our producer had following configuration
> > max.in.flight.requests.per.connection=1, acks= all and retries=3.
> >
> > The entire producer side set is below. The consumer has manual offset
> > commit, it commit offset after it has successfully processed the message.
> >
> > Producer setting
> > bootstrap.servers​= {point the F5 VS fronting Kafka cluster}
> > key.serializer= {appropriate value as per your cases}
> > value.serializer= {appropriate value as per your case}
> > acks= all
> > retries=3
> > ssl.key.password= {appropriate value as per your case}
> > ssl.keystore.location= {appropriate value as per your case}
> > ssl.keystore.password= {appropriate value as per your case}
> > ssl.truststore.location= {appropriate value as per your case}
> > ssl.truststore.password= {appropriate value as per your case}
> > batch.size=16384​
> > client.id= {appropriate value as per your case, may help with debugging}
> > max.block.ms​=65000
> > request.timeout.ms=30000
> > security.protocol= SSL
> > ssl.enabled.protocols=TLSv1.2
> > ssl.keystore.type=JKS
> > ssl.protocol=TLSv1.2
> > ssl.truststore.type=JKS
> > max.in.flight.requests.per.connection=1
> > metadata.fetch.timeout.ms=60000
> > reconnect.backoff.ms=1000
> > retry.backoff.ms​=1000
> > max.request.size=1048576​​
> > linger.ms=0
> >
> > Consumer setting
> > bootstrap.servers​= {point the F5 VS fronting Kafka cluster}
> > key.deserializer= {appropriate value as per your cases}
> > value.deserializer= {appropriate value as per your case}
> > group.id= {appropriate value as per your case}
> > ssl.key.password= {appropriate value as per your case}
> > ssl.keystore.location= {appropriate value as per your case}
> > ssl.keystore.password= {appropriate value as per your case}
> > ssl.truststore.location= {appropriate value as per your case}
> > ssl.truststore.password= {appropriate value as per your case}
> > enable.auto.commit=false
> > security.protocol= SSL
> > ssl.enabled.protocols=TLSv1.2
> > ssl.keystore.type=JKS
> > ssl.protocol=TLSv1.2
> > ssl.truststore.type=JKS
> > client.id= {appropriate value as per your case, may help with
> debugging}​
> > reconnect.backoff.ms=1000
> > retry.backoff.ms​=1000​
> >
> > Thanks,
> > Shri
> >
> > -----Original Message-----
> > From: Hans Jespersen [mailto:hans@confluent.io]
> > Sent: Tuesday, April 18, 2017 7:57 PM
> > To: users@kafka.apache.org
> > Subject: [EXTERNAL] Re: ZK and Kafka failover testing
> >
> > ***** Notice: This email was received from an external source *****
> >
> > When you publish, is acks=0,1 or all (-1)?
> > What is max.in.flight.requests.per.connection (default is 5)?
> >
> > It sounds to me like your publishers are using acks=0 and so they are not
> > actually succeeding in publishing (i.e. you are getting no acks) but they
> > will retry over and over and will have up to 5 retries in flight, so when
> > the broker comes back up, you are getting 4 or 5 copies of the same
> message.
> >
> > Try setting max.in.flight.requests.per.connection=1 to get rid of
> > duplicates Try setting acks=all to ensure the messages are being
> persisted
> > by the leader and all the available replicas in the kafka cluster.
> >
> > -hans
> >
> > /**
> >  * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
> >  * hans@confluent.io (650)924-2670
> >  */
> >
> > On Tue, Apr 18, 2017 at 4:10 PM, Shrikant Patel <SP...@pdxinc.com>
> wrote:
> >
> > > Hi All,
> > >
> > > I am seeing strange behavior between ZK and Kafka. We have 5 node in
> > > ZK and Kafka cluster each. Kafka version - 2.11-0.10.1.1
> > >
> > > The min.insync.replicas is 3, replication.factor is 5 for all topics,
> > > unclean.leader.election.enable is false. We have 15 partitions for
> > > each topic.
> > >
> > > The step we are following in our testing.
> > >
> > >
> > > *         My understanding is that ZK needs aleast 3 out of 5 server to
> > be
> > > functional. Kafka could not be functional without zookeeper. In out
> > > testing, we bring down 3 ZK nodes and don't touch Kafka nodes. Kafka
> > > is still functional, consumer\producer can still consume\publish from
> > > Kafka cluster. We then bring down all ZK nodes, Kafka
> > > consumer\producers are still functional. I am not able to understand
> > > why Kafka cluster is not failing as soon as majority of ZK nodes are
> > > down. I do see error in Kafka that it cannot connection to ZK cluster.
> > >
> > >
> > >
> > > *         With all or majority of ZK node down, we bring down 1 Kafka
> > > nodes (out of 5, so 4 are running). And at that point the consumer and
> > > producer start failing. My guess is the new leadership election cannot
> > > happen without ZK.
> > >
> > >
> > >
> > > *         Then we bring up the majority of ZK node up. (1st Kafka is
> > still
> > > down) Now the Kafka cluster become functional, consumer and producer
> > > now start working again. But Consumer sees big junk of message from
> > > kafka, and many of them are duplicates. It's like these messages were
> > > held up somewhere, Where\Why I don't know?  And why the duplicates? I
> > > can understand few duplicates for messages that consumer would not
> > > commit before 1st node when down. But why so many duplicates and like
> > > 4 copy for each message. I cannot understand this behavior.
> > >
> > > Appreciate some insight about our issues. Also if there are blogs that
> > > describe the ZK and Kafka failover scenario behaviors, that would be
> > > extremely helpful.
> > >
> > > Thanks,
> > > Shri
> > >
> > > This e-mail and its contents (to include attachments) are the property
> > > of National Health Systems, Inc., its subsidiaries and affiliates,
> > > including but not limited to Rx.com Community Healthcare Network, Inc.
> > > and its subsidiaries, and may contain confidential and proprietary or
> > > privileged information. If you are not the intended recipient of this
> > > e-mail, you are hereby notified that any unauthorized disclosure,
> > > copying, or distribution of this e-mail or of its attachments, or the
> > > taking of any unauthorized action based on information contained herein
> > is strictly prohibited.
> > > Unauthorized use of information contained herein may subject you to
> > > civil and criminal prosecution and penalties. If you are not the
> > > intended recipient, please immediately notify the sender by telephone
> > > at
> > > 800-433-5719 or return e-mail and permanently delete the original
> e-mail.
> > >
> > This e-mail and its contents (to include attachments) are the property of
> > National Health Systems, Inc., its subsidiaries and affiliates, including
> > but not limited to Rx.com Community Healthcare Network, Inc. and its
> > subsidiaries, and may contain confidential and proprietary or privileged
> > information. If you are not the intended recipient of this e-mail, you
> are
> > hereby notified that any unauthorized disclosure, copying, or
> distribution
> > of this e-mail or of its attachments, or the taking of any unauthorized
> > action based on information contained herein is strictly prohibited.
> > Unauthorized use of information contained herein may subject you to civil
> > and criminal prosecution and penalties. If you are not the intended
> > recipient, please immediately notify the sender by telephone at
> > 800-433-5719 or return e-mail and permanently delete the original e-mail.
> >
>

Re: Re: ZK and Kafka failover testing

Posted by Onur Karaman <on...@gmail.com>.
If this is what I think it is, it has nothing to do with acks,
max.in.flight.requests.per.connection, or anything client-side and is
purely about the kafka cluster.

Here's a simple example involving a single zookeeper instance, 3 brokers, a
KafkaConsumer and KafkaProducer (neither of these clients interact with
zookeeper).
1. start up zookeeper:
> ./bin/zookeeper-server-start.sh config/zookeeper.properties

2. start up some brokers:
> ./bin/kafka-server-start.sh config/server0.properties
> ./bin/kafka-server-start.sh config/server1.properties
> ./bin/kafka-server-start.sh config/server2.properties

3 create a topic:
> ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t
--partitions 1 --replication-factor 3

4. start a console consumer (this needs to happen before step 5 so we can
write __consumer_offsets metadata to zookeeper):
> ./bin/kafka-console-consumer.sh --broker-list
localhost:9090,localhost:9091,localhost:9092 --topic t

5. kill zookeeper

6. start a console producer and produce some messages:
> ./bin/kafka-console-producer.sh --broker-list
localhost:9090,localhost:9091,localhost:9092 --topic t

7. notice the size of the broker logs grow with each message you send:
> l /tmp/kafka-logs*/t-0

8. notice the consumer consuming the messages being produced

Basically, zookeeper can be completely offline and your brokers will append
to logs and process client requests just fine as long as it doesn't need to
interact with zookeeper. Today, the only way a broker knows to stop
accepting requests is when it receives instruction from the controller.

I first realized this last July when debugging a small production data loss
scenario that was a result of this[1]. Maybe this is an attempt at leaning
towards availability over consistency. Personally I think that brokers
should stop accepting requests when it disconnects from zookeeper.

[1] The small production data loss scenario happens when accepting requests
during the small window in between a broker's zookeeper session expiration
and when the controller instructs the broker to stop accepting requests.
During this time, the broker still thinks it leads partitions that are
currently being led by another broker, effectively resulting in a window
where the partition is led by two brokers. Clients can continue sending
requests to the old leader, and for producers with low acknowledgement
settings (like ack=1), their messages will be lost without the client
knowing, as the messages are being appended to the phantom leader's logs
instead of the true leader's logs.

On Wed, Apr 19, 2017 at 7:56 AM, Shrikant Patel <SP...@pdxinc.com> wrote:

> While we were testing, our producer had following configuration
> max.in.flight.requests.per.connection=1, acks= all and retries=3.
>
> The entire producer side set is below. The consumer has manual offset
> commit, it commit offset after it has successfully processed the message.
>
> Producer setting
> bootstrap.servers​= {point the F5 VS fronting Kafka cluster}
> key.serializer= {appropriate value as per your cases}
> value.serializer= {appropriate value as per your case}
> acks= all
> retries=3
> ssl.key.password= {appropriate value as per your case}
> ssl.keystore.location= {appropriate value as per your case}
> ssl.keystore.password= {appropriate value as per your case}
> ssl.truststore.location= {appropriate value as per your case}
> ssl.truststore.password= {appropriate value as per your case}
> batch.size=16384​
> client.id= {appropriate value as per your case, may help with debugging}
> max.block.ms​=65000
> request.timeout.ms=30000
> security.protocol= SSL
> ssl.enabled.protocols=TLSv1.2
> ssl.keystore.type=JKS
> ssl.protocol=TLSv1.2
> ssl.truststore.type=JKS
> max.in.flight.requests.per.connection=1
> metadata.fetch.timeout.ms=60000
> reconnect.backoff.ms=1000
> retry.backoff.ms​=1000
> max.request.size=1048576​​
> linger.ms=0
>
> Consumer setting
> bootstrap.servers​= {point the F5 VS fronting Kafka cluster}
> key.deserializer= {appropriate value as per your cases}
> value.deserializer= {appropriate value as per your case}
> group.id= {appropriate value as per your case}
> ssl.key.password= {appropriate value as per your case}
> ssl.keystore.location= {appropriate value as per your case}
> ssl.keystore.password= {appropriate value as per your case}
> ssl.truststore.location= {appropriate value as per your case}
> ssl.truststore.password= {appropriate value as per your case}
> enable.auto.commit=false
> security.protocol= SSL
> ssl.enabled.protocols=TLSv1.2
> ssl.keystore.type=JKS
> ssl.protocol=TLSv1.2
> ssl.truststore.type=JKS
> client.id= {appropriate value as per your case, may help with debugging}​
> reconnect.backoff.ms=1000
> retry.backoff.ms​=1000​
>
> Thanks,
> Shri
>
> -----Original Message-----
> From: Hans Jespersen [mailto:hans@confluent.io]
> Sent: Tuesday, April 18, 2017 7:57 PM
> To: users@kafka.apache.org
> Subject: [EXTERNAL] Re: ZK and Kafka failover testing
>
> ***** Notice: This email was received from an external source *****
>
> When you publish, is acks=0,1 or all (-1)?
> What is max.in.flight.requests.per.connection (default is 5)?
>
> It sounds to me like your publishers are using acks=0 and so they are not
> actually succeeding in publishing (i.e. you are getting no acks) but they
> will retry over and over and will have up to 5 retries in flight, so when
> the broker comes back up, you are getting 4 or 5 copies of the same message.
>
> Try setting max.in.flight.requests.per.connection=1 to get rid of
> duplicates Try setting acks=all to ensure the messages are being persisted
> by the leader and all the available replicas in the kafka cluster.
>
> -hans
>
> /**
>  * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
>  * hans@confluent.io (650)924-2670
>  */
>
> On Tue, Apr 18, 2017 at 4:10 PM, Shrikant Patel <SP...@pdxinc.com> wrote:
>
> > Hi All,
> >
> > I am seeing strange behavior between ZK and Kafka. We have 5 node in
> > ZK and Kafka cluster each. Kafka version - 2.11-0.10.1.1
> >
> > The min.insync.replicas is 3, replication.factor is 5 for all topics,
> > unclean.leader.election.enable is false. We have 15 partitions for
> > each topic.
> >
> > The step we are following in our testing.
> >
> >
> > *         My understanding is that ZK needs aleast 3 out of 5 server to
> be
> > functional. Kafka could not be functional without zookeeper. In out
> > testing, we bring down 3 ZK nodes and don't touch Kafka nodes. Kafka
> > is still functional, consumer\producer can still consume\publish from
> > Kafka cluster. We then bring down all ZK nodes, Kafka
> > consumer\producers are still functional. I am not able to understand
> > why Kafka cluster is not failing as soon as majority of ZK nodes are
> > down. I do see error in Kafka that it cannot connection to ZK cluster.
> >
> >
> >
> > *         With all or majority of ZK node down, we bring down 1 Kafka
> > nodes (out of 5, so 4 are running). And at that point the consumer and
> > producer start failing. My guess is the new leadership election cannot
> > happen without ZK.
> >
> >
> >
> > *         Then we bring up the majority of ZK node up. (1st Kafka is
> still
> > down) Now the Kafka cluster become functional, consumer and producer
> > now start working again. But Consumer sees big junk of message from
> > kafka, and many of them are duplicates. It's like these messages were
> > held up somewhere, Where\Why I don't know?  And why the duplicates? I
> > can understand few duplicates for messages that consumer would not
> > commit before 1st node when down. But why so many duplicates and like
> > 4 copy for each message. I cannot understand this behavior.
> >
> > Appreciate some insight about our issues. Also if there are blogs that
> > describe the ZK and Kafka failover scenario behaviors, that would be
> > extremely helpful.
> >
> > Thanks,
> > Shri
> >
> > This e-mail and its contents (to include attachments) are the property
> > of National Health Systems, Inc., its subsidiaries and affiliates,
> > including but not limited to Rx.com Community Healthcare Network, Inc.
> > and its subsidiaries, and may contain confidential and proprietary or
> > privileged information. If you are not the intended recipient of this
> > e-mail, you are hereby notified that any unauthorized disclosure,
> > copying, or distribution of this e-mail or of its attachments, or the
> > taking of any unauthorized action based on information contained herein
> is strictly prohibited.
> > Unauthorized use of information contained herein may subject you to
> > civil and criminal prosecution and penalties. If you are not the
> > intended recipient, please immediately notify the sender by telephone
> > at
> > 800-433-5719 or return e-mail and permanently delete the original e-mail.
> >
> This e-mail and its contents (to include attachments) are the property of
> National Health Systems, Inc., its subsidiaries and affiliates, including
> but not limited to Rx.com Community Healthcare Network, Inc. and its
> subsidiaries, and may contain confidential and proprietary or privileged
> information. If you are not the intended recipient of this e-mail, you are
> hereby notified that any unauthorized disclosure, copying, or distribution
> of this e-mail or of its attachments, or the taking of any unauthorized
> action based on information contained herein is strictly prohibited.
> Unauthorized use of information contained herein may subject you to civil
> and criminal prosecution and penalties. If you are not the intended
> recipient, please immediately notify the sender by telephone at
> 800-433-5719 or return e-mail and permanently delete the original e-mail.
>

RE: Re: ZK and Kafka failover testing

Posted by Shrikant Patel <SP...@pdxinc.com>.
While we were testing, our producer had following configuration max.in.flight.requests.per.connection=1, acks= all and retries=3.

The entire producer side set is below. The consumer has manual offset commit, it commit offset after it has successfully processed the message.

Producer setting
bootstrap.servers​= {point the F5 VS fronting Kafka cluster}
key.serializer= {appropriate value as per your cases}
value.serializer= {appropriate value as per your case}
acks= all
retries=3
ssl.key.password= {appropriate value as per your case}
ssl.keystore.location= {appropriate value as per your case}
ssl.keystore.password= {appropriate value as per your case}
ssl.truststore.location= {appropriate value as per your case}
ssl.truststore.password= {appropriate value as per your case}
batch.size=16384​
client.id= {appropriate value as per your case, may help with debugging}
max.block.ms​=65000
request.timeout.ms=30000
security.protocol= SSL
ssl.enabled.protocols=TLSv1.2
ssl.keystore.type=JKS
ssl.protocol=TLSv1.2
ssl.truststore.type=JKS
max.in.flight.requests.per.connection=1
metadata.fetch.timeout.ms=60000
reconnect.backoff.ms=1000
retry.backoff.ms​=1000
max.request.size=1048576​​
linger.ms=0

Consumer setting
bootstrap.servers​= {point the F5 VS fronting Kafka cluster}
key.deserializer= {appropriate value as per your cases}
value.deserializer= {appropriate value as per your case}
group.id= {appropriate value as per your case}
ssl.key.password= {appropriate value as per your case}
ssl.keystore.location= {appropriate value as per your case}
ssl.keystore.password= {appropriate value as per your case}
ssl.truststore.location= {appropriate value as per your case}
ssl.truststore.password= {appropriate value as per your case}
enable.auto.commit=false
security.protocol= SSL
ssl.enabled.protocols=TLSv1.2
ssl.keystore.type=JKS
ssl.protocol=TLSv1.2
ssl.truststore.type=JKS
client.id= {appropriate value as per your case, may help with debugging}​
reconnect.backoff.ms=1000
retry.backoff.ms​=1000​

Thanks,
Shri

-----Original Message-----
From: Hans Jespersen [mailto:hans@confluent.io]
Sent: Tuesday, April 18, 2017 7:57 PM
To: users@kafka.apache.org
Subject: [EXTERNAL] Re: ZK and Kafka failover testing

***** Notice: This email was received from an external source *****

When you publish, is acks=0,1 or all (-1)?
What is max.in.flight.requests.per.connection (default is 5)?

It sounds to me like your publishers are using acks=0 and so they are not actually succeeding in publishing (i.e. you are getting no acks) but they will retry over and over and will have up to 5 retries in flight, so when the broker comes back up, you are getting 4 or 5 copies of the same message.

Try setting max.in.flight.requests.per.connection=1 to get rid of duplicates Try setting acks=all to ensure the messages are being persisted by the leader and all the available replicas in the kafka cluster.

-hans

/**
 * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
 * hans@confluent.io (650)924-2670
 */

On Tue, Apr 18, 2017 at 4:10 PM, Shrikant Patel <SP...@pdxinc.com> wrote:

> Hi All,
>
> I am seeing strange behavior between ZK and Kafka. We have 5 node in
> ZK and Kafka cluster each. Kafka version - 2.11-0.10.1.1
>
> The min.insync.replicas is 3, replication.factor is 5 for all topics,
> unclean.leader.election.enable is false. We have 15 partitions for
> each topic.
>
> The step we are following in our testing.
>
>
> *         My understanding is that ZK needs aleast 3 out of 5 server to be
> functional. Kafka could not be functional without zookeeper. In out
> testing, we bring down 3 ZK nodes and don't touch Kafka nodes. Kafka
> is still functional, consumer\producer can still consume\publish from
> Kafka cluster. We then bring down all ZK nodes, Kafka
> consumer\producers are still functional. I am not able to understand
> why Kafka cluster is not failing as soon as majority of ZK nodes are
> down. I do see error in Kafka that it cannot connection to ZK cluster.
>
>
>
> *         With all or majority of ZK node down, we bring down 1 Kafka
> nodes (out of 5, so 4 are running). And at that point the consumer and
> producer start failing. My guess is the new leadership election cannot
> happen without ZK.
>
>
>
> *         Then we bring up the majority of ZK node up. (1st Kafka is still
> down) Now the Kafka cluster become functional, consumer and producer
> now start working again. But Consumer sees big junk of message from
> kafka, and many of them are duplicates. It's like these messages were
> held up somewhere, Where\Why I don't know?  And why the duplicates? I
> can understand few duplicates for messages that consumer would not
> commit before 1st node when down. But why so many duplicates and like
> 4 copy for each message. I cannot understand this behavior.
>
> Appreciate some insight about our issues. Also if there are blogs that
> describe the ZK and Kafka failover scenario behaviors, that would be
> extremely helpful.
>
> Thanks,
> Shri
>
> This e-mail and its contents (to include attachments) are the property
> of National Health Systems, Inc., its subsidiaries and affiliates,
> including but not limited to Rx.com Community Healthcare Network, Inc.
> and its subsidiaries, and may contain confidential and proprietary or
> privileged information. If you are not the intended recipient of this
> e-mail, you are hereby notified that any unauthorized disclosure,
> copying, or distribution of this e-mail or of its attachments, or the
> taking of any unauthorized action based on information contained herein is strictly prohibited.
> Unauthorized use of information contained herein may subject you to
> civil and criminal prosecution and penalties. If you are not the
> intended recipient, please immediately notify the sender by telephone
> at
> 800-433-5719 or return e-mail and permanently delete the original e-mail.
>
This e-mail and its contents (to include attachments) are the property of National Health Systems, Inc., its subsidiaries and affiliates, including but not limited to Rx.com Community Healthcare Network, Inc. and its subsidiaries, and may contain confidential and proprietary or privileged information. If you are not the intended recipient of this e-mail, you are hereby notified that any unauthorized disclosure, copying, or distribution of this e-mail or of its attachments, or the taking of any unauthorized action based on information contained herein is strictly prohibited. Unauthorized use of information contained herein may subject you to civil and criminal prosecution and penalties. If you are not the intended recipient, please immediately notify the sender by telephone at 800-433-5719 or return e-mail and permanently delete the original e-mail.

Re: ZK and Kafka failover testing

Posted by Hans Jespersen <ha...@confluent.io>.
When you publish, is acks=0,1 or all (-1)?
What is max.in.flight.requests.per.connection (default is 5)?

It sounds to me like your publishers are using acks=0 and so they are not
actually succeeding in publishing (i.e. you are getting no acks) but they
will retry over and over and will have up to 5 retries in flight, so when
the broker comes back up, you are getting 4 or 5 copies of the same
message.

Try setting max.in.flight.requests.per.connection=1 to get rid of duplicates
Try setting acks=all to ensure the messages are being persisted by the
leader and all the available replicas in the kafka cluster.

-hans

/**
 * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
 * hans@confluent.io (650)924-2670
 */

On Tue, Apr 18, 2017 at 4:10 PM, Shrikant Patel <SP...@pdxinc.com> wrote:

> Hi All,
>
> I am seeing strange behavior between ZK and Kafka. We have 5 node in ZK
> and Kafka cluster each. Kafka version - 2.11-0.10.1.1
>
> The min.insync.replicas is 3, replication.factor is 5 for all topics,
> unclean.leader.election.enable is false. We have 15 partitions for each
> topic.
>
> The step we are following in our testing.
>
>
> *         My understanding is that ZK needs aleast 3 out of 5 server to be
> functional. Kafka could not be functional without zookeeper. In out
> testing, we bring down 3 ZK nodes and don't touch Kafka nodes. Kafka is
> still functional, consumer\producer can still consume\publish from Kafka
> cluster. We then bring down all ZK nodes, Kafka consumer\producers are
> still functional. I am not able to understand why Kafka cluster is not
> failing as soon as majority of ZK nodes are down. I do see error in Kafka
> that it cannot connection to ZK cluster.
>
>
>
> *         With all or majority of ZK node down, we bring down 1 Kafka
> nodes (out of 5, so 4 are running). And at that point the consumer and
> producer start failing. My guess is the new leadership election cannot
> happen without ZK.
>
>
>
> *         Then we bring up the majority of ZK node up. (1st Kafka is still
> down) Now the Kafka cluster become functional, consumer and producer now
> start working again. But Consumer sees big junk of message from kafka, and
> many of them are duplicates. It's like these messages were held up
> somewhere, Where\Why I don't know?  And why the duplicates? I can
> understand few duplicates for messages that consumer would not commit
> before 1st node when down. But why so many duplicates and like 4 copy for
> each message. I cannot understand this behavior.
>
> Appreciate some insight about our issues. Also if there are blogs that
> describe the ZK and Kafka failover scenario behaviors, that would be
> extremely helpful.
>
> Thanks,
> Shri
>
> This e-mail and its contents (to include attachments) are the property of
> National Health Systems, Inc., its subsidiaries and affiliates, including
> but not limited to Rx.com Community Healthcare Network, Inc. and its
> subsidiaries, and may contain confidential and proprietary or privileged
> information. If you are not the intended recipient of this e-mail, you are
> hereby notified that any unauthorized disclosure, copying, or distribution
> of this e-mail or of its attachments, or the taking of any unauthorized
> action based on information contained herein is strictly prohibited.
> Unauthorized use of information contained herein may subject you to civil
> and criminal prosecution and penalties. If you are not the intended
> recipient, please immediately notify the sender by telephone at
> 800-433-5719 or return e-mail and permanently delete the original e-mail.
>