You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Oliver Dain <od...@3cinteractive.com> on 2014/03/26 19:59:17 UTC

data loss on replicated topic

My company currently testing Kafka for throughput and fault tolerance. We've set up a cluster of 5 Kafka brokers and are publishing to a topic with replication factor 3 and 100 partitions. We are publishing with request.required.acks == -1 (e.g. All ISR replicas must ACK before the message is considered sent). If a publication fails, we retry it indefinitely until it succeeds. We ran a test over a weekend in which we published messages as fast as we could (from a single publisher). Each message has a unique ID so we can ensure that all messages are saved by Kafka at least once at the end of the test. We have a simple script, run via cron, that kills one broker (chosen at random) once every other hour (killed via "kill -9"). The broker is then revived 16 minutes after it was killed. At the end of the weekend we ran a script to pull all data from all partitions and then verify that all messages were persisted by Kafka. For the most part, the results are very good. We can sustain about 3k message/second with almost no data loss.

Of the roughly 460 million records we produced over 48 hours we lost only 7 records. But, I don't think we should have lost any record. All of the lost records were produced at almost exactly the time one of the brokers was killed (down to the second which is the granularity of our logs). Note that we're producing around 3k messages/second and we killed brokers many times over the 48 hour period. Only twice did we see data loss: once we lost 4 records and once we lost 3. I have checked the Kafka logs and there are some expected error messages from the surviving brokers that look like:


[2014-03-19 02:21:12,088] ERROR [ReplicaFetcherThread-1-5], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 3491511; ClientId: ReplicaFetcherThread-1-5; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [load_test,20] -> PartitionFetchInfo(521319,1048576),[load_test,74] -> PartitionFetchInfo(559017,1048576),[load_test,14] -> PartitionFetchInfo(420539,1048576),[load_test,0] -> PartitionFetchInfo(776869,1048576),[load_test,34] -> PartitionFetchInfo(446435,1048576),[load_test,94] -> PartitionFetchInfo(849943,1048576),[load_test,40] -> PartitionFetchInfo(241876,1048576),[load_test,80] -> PartitionFetchInfo(508778,1048576),[load_test,60] -> PartitionFetchInfo(81314,1048576),[load_test,54] -> PartitionFetchInfo(165798,1048576) (kafka.server.ReplicaFetcherThread)

java.net.ConnectException: Connection refused

        at sun.nio.ch.Net.connect0(Native Method)

        at sun.nio.ch.Net.connect(Net.java:465)

        at sun.nio.ch.Net.connect(Net.java:457)

        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)

        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)

        at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)

        at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)

        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)

        at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)

        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)

        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)

        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)

        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)

        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)

        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)

        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)

        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)

        at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)

        at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)

        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)

        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)

I have verified that all the partitions mentioned in these messages (e.g. The above mentions partitions 0, 34, 94, etc.) had the newly killed node as the leader. I believe that means that the other 4 brokers were alive and running without issues. There are no other log messages that indicate any other broker communication issues.

As I understand it, this scenario shouldn't cause any data loss since at least 4/5 of the brokers were alive and healthy at all times. Is there any way to explain the data loss? Perhaps a known bug in 0.8.1?

Thanks,
Oliver


Re: data loss on replicated topic

Posted by Jun Rao <ju...@gmail.com>.
Ok. We do have a system test that hard kills the brokers. So far, we
haven't seen data loss there. However, it doesn't run very long and the
number of failures introduced is small. We will try to run a longer version
of that test and see if we can reproduce the issue locally.

Thanks,

Jun


On Fri, Mar 28, 2014 at 9:57 AM, Oliver Dain <od...@3cinteractive.com>wrote:

> Nope. I see lots of the "Error in fetch" messages, but that's all.
>
> It does sound roughly like KAFKA-1193 but I don't know how to verify if
> this is the same issue or not. Any ideas?
>
>
> On 3/27/14, 8:24 PM, "Jun Rao" <ju...@gmail.com> wrote:
>
> >We don't expect to lose data in that case. So, this sounds like a bug. Do
> >you see any other error/warn in broker log around the time the data is
> >lost?
> >
> >Thanks,
> >
> >Jun
> >
> >
> >On Thu, Mar 27, 2014 at 10:52 AM, Oliver Dain
> ><od...@3cinteractive.com>wrote:
> >
> >> Hi Neha,
> >>
> >> Thanks for the reply. I do not see the ³No broker in ISR² message. If my
> >> original diagnosis was correct (that there were at least 2 replicas
> >>alive
> >> for the topic at all times) then I believe this is expected, right? I
> >> gather this makes it more likely that we¹ve hit KAFKA-1193?? If so, is
> >> there any workaround and/or an ETA for a fix?
> >>
> >> Thanks,
> >> Oliver
> >>
> >>
> >>
> >>
> >> On 3/27/14, 5:18 AM, "Neha Narkhede" <ne...@gmail.com> wrote:
> >>
> >> >It is possible that you are hitting KAFKA-1193, but I'm not sure. Do
> >>you
> >> >see the following log line when you observe data loss -
> >> >
> >> >"No broker in ISR is alive for ... There's potential data loss."
> >> >
> >> >Thanks,
> >> >Neha
> >> >
> >> >
> >> >On Wed, Mar 26, 2014 at 12:05 PM, Oliver Dain
> >> ><od...@3cinteractive.com>wrote:
> >> >
> >> >> I just saw https://issues.apache.org/jira/browse/KAFKA-1193 which
> >>seems
> >> >> like it could be the cause of this. Does that sound right? Is there a
> >> >>patch
> >> >> we can test? Any date/time when this is expected to be fixed?
> >> >>
> >> >> From: New User
> >><odain@3cinteractive.com<mailto:odain@3cinteractive.com
> >> >>
> >> >> Date: Wednesday, March 26, 2014 at 11:59 AM
> >> >> To: "users@kafka.apache.org<ma...@kafka.apache.org>" <
> >> >> users@kafka.apache.org<ma...@kafka.apache.org>>
> >> >> Subject: data loss on replicated topic
> >> >>
> >> >> My company currently testing Kafka for throughput and fault
> >>tolerance.
> >> >> We've set up a cluster of 5 Kafka brokers and are publishing to a
> >>topic
> >> >> with replication factor 3 and 100 partitions. We are publishing with
> >> >> request.required.acks == -1 (e.g. All ISR replicas must ACK before
> >>the
> >> >> message is considered sent). If a publication fails, we retry it
> >> >> indefinitely until it succeeds. We ran a test over a weekend in
> >>which we
> >> >> published messages as fast as we could (from a single publisher).
> >>Each
> >> >> message has a unique ID so we can ensure that all messages are saved
> >>by
> >> >> Kafka at least once at the end of the test. We have a simple script,
> >>run
> >> >> via cron, that kills one broker (chosen at random) once every other
> >>hour
> >> >> (killed via "kill -9"). The broker is then revived 16 minutes after
> >>it
> >> >>was
> >> >> killed. At the end of the weekend we ran a script to pull all data
> >>from
> >> >>all
> >> >> partitions and then verify that all messages were persisted by Kafka.
> >> >>For
> >> >> the most part, the results are very good. We can sustain about 3k
> >> >> message/second with almost no data loss.
> >> >>
> >> >> Of the roughly 460 million records we produced over 48 hours we lost
> >> >>only
> >> >> 7 records. But, I don't think we should have lost any record. All of
> >>the
> >> >> lost records were produced at almost exactly the time one of the
> >>brokers
> >> >> was killed (down to the second which is the granularity of our logs).
> >> >>Note
> >> >> that we're producing around 3k messages/second and we killed brokers
> >> >>many
> >> >> times over the 48 hour period. Only twice did we see data loss: once
> >>we
> >> >> lost 4 records and once we lost 3. I have checked the Kafka logs and
> >> >>there
> >> >> are some expected error messages from the surviving brokers that look
> >> >>like:
> >> >>
> >> >>
> >> >> [2014-03-19 02:21:12,088] ERROR [ReplicaFetcherThread-1-5], Error in
> >> >>fetch
> >> >> Name: FetchRequest; Version: 0; CorrelationId: 3491511; ClientId:
> >> >> ReplicaFetcherThread-1-5; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1
> >> >>bytes;
> >> >> RequestInfo: [load_test,20] ->
> >> >> PartitionFetchInfo(521319,1048576),[load_test,74] ->
> >> >> PartitionFetchInfo(559017,1048576),[load_test,14] ->
> >> >> PartitionFetchInfo(420539,1048576),[load_test,0] ->
> >> >> PartitionFetchInfo(776869,1048576),[load_test,34] ->
> >> >> PartitionFetchInfo(446435,1048576),[load_test,94] ->
> >> >> PartitionFetchInfo(849943,1048576),[load_test,40] ->
> >> >> PartitionFetchInfo(241876,1048576),[load_test,80] ->
> >> >> PartitionFetchInfo(508778,1048576),[load_test,60] ->
> >> >> PartitionFetchInfo(81314,1048576),[load_test,54] ->
> >> >> PartitionFetchInfo(165798,1048576)
> >>(kafka.server.ReplicaFetcherThread)
> >> >>
> >> >> java.net.ConnectException: Connection refused
> >> >>
> >> >>         at sun.nio.ch.Net.connect0(Native Method)
> >> >>
> >> >>         at sun.nio.ch.Net.connect(Net.java:465)
> >> >>
> >> >>         at sun.nio.ch.Net.connect(Net.java:457)
> >> >>
> >> >>         at
> >> >>sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
> >> >>
> >> >>         at
> >> >>kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> >> >>
> >> >>         at
> >> >>kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> >> >>
> >> >>         at
> >> >>kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> >> >>
> >> >>         at
> >> >> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> >> >>
> >> >>         at
> >> >>
> >>
> >>>>kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendReques
> >>>>t(
> >> >>SimpleConsumer.scala:71)
> >> >>
> >> >>         at
> >> >>
> >>
> >>>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.
> >>>>ap
> >> >>ply$mcV$sp(SimpleConsumer.scala:109)
> >> >>
> >> >>         at
> >> >>
> >>
> >>>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.
> >>>>ap
> >> >>ply(SimpleConsumer.scala:109)
> >> >>
> >> >>         at
> >> >>
> >>
> >>>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.
> >>>>ap
> >> >>ply(SimpleConsumer.scala:109)
> >> >>
> >> >>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >> >>
> >> >>         at
> >> >>
> >>
> >>>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsu
> >>>>me
> >> >>r.scala:108)
> >> >>
> >> >>         at
> >> >>
> >>
> >>>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.sca
> >>>>la
> >> >>:108)
> >> >>
> >> >>         at
> >> >>
> >>
> >>>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.sca
> >>>>la
> >> >>:108)
> >> >>
> >> >>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >> >>
> >> >>         at
> >>kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
> >> >>
> >> >>         at
> >> >>
> >>
> >>>>kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherT
> >>>>hr
> >> >>ead.scala:96)
> >> >>
> >> >>         at
> >> >>
> >>
> >>>>kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:8
> >>>>8)
> >> >>
> >> >>         at
> >> >>kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> >> >>
> >> >> I have verified that all the partitions mentioned in these messages
> >> >>(e.g.
> >> >> The above mentions partitions 0, 34, 94, etc.) had the newly killed
> >> >>node as
> >> >> the leader. I believe that means that the other 4 brokers were alive
> >>and
> >> >> running without issues. There are no other log messages that indicate
> >> >>any
> >> >> other broker communication issues.
> >> >>
> >> >> As I understand it, this scenario shouldn't cause any data loss
> >>since at
> >> >> least 4/5 of the brokers were alive and healthy at all times. Is
> >>there
> >> >>any
> >> >> way to explain the data loss? Perhaps a known bug in 0.8.1?
> >> >>
> >> >> Thanks,
> >> >> Oliver
> >> >>
> >> >>
> >>
> >>
>
>

Re: data loss on replicated topic

Posted by Oliver Dain <od...@3cinteractive.com>.
Nope. I see lots of the "Error in fetch” messages, but that’s all.

It does sound roughly like KAFKA-1193 but I don’t know how to verify if
this is the same issue or not. Any ideas?


On 3/27/14, 8:24 PM, "Jun Rao" <ju...@gmail.com> wrote:

>We don't expect to lose data in that case. So, this sounds like a bug. Do
>you see any other error/warn in broker log around the time the data is
>lost?
>
>Thanks,
>
>Jun
>
>
>On Thu, Mar 27, 2014 at 10:52 AM, Oliver Dain
><od...@3cinteractive.com>wrote:
>
>> Hi Neha,
>>
>> Thanks for the reply. I do not see the ³No broker in ISR² message. If my
>> original diagnosis was correct (that there were at least 2 replicas
>>alive
>> for the topic at all times) then I believe this is expected, right? I
>> gather this makes it more likely that we¹ve hit KAFKA-1193?? If so, is
>> there any workaround and/or an ETA for a fix?
>>
>> Thanks,
>> Oliver
>>
>>
>>
>>
>> On 3/27/14, 5:18 AM, "Neha Narkhede" <ne...@gmail.com> wrote:
>>
>> >It is possible that you are hitting KAFKA-1193, but I'm not sure. Do
>>you
>> >see the following log line when you observe data loss -
>> >
>> >"No broker in ISR is alive for ... There's potential data loss."
>> >
>> >Thanks,
>> >Neha
>> >
>> >
>> >On Wed, Mar 26, 2014 at 12:05 PM, Oliver Dain
>> ><od...@3cinteractive.com>wrote:
>> >
>> >> I just saw https://issues.apache.org/jira/browse/KAFKA-1193 which
>>seems
>> >> like it could be the cause of this. Does that sound right? Is there a
>> >>patch
>> >> we can test? Any date/time when this is expected to be fixed?
>> >>
>> >> From: New User
>><odain@3cinteractive.com<mailto:odain@3cinteractive.com
>> >>
>> >> Date: Wednesday, March 26, 2014 at 11:59 AM
>> >> To: "users@kafka.apache.org<ma...@kafka.apache.org>" <
>> >> users@kafka.apache.org<ma...@kafka.apache.org>>
>> >> Subject: data loss on replicated topic
>> >>
>> >> My company currently testing Kafka for throughput and fault
>>tolerance.
>> >> We've set up a cluster of 5 Kafka brokers and are publishing to a
>>topic
>> >> with replication factor 3 and 100 partitions. We are publishing with
>> >> request.required.acks == -1 (e.g. All ISR replicas must ACK before
>>the
>> >> message is considered sent). If a publication fails, we retry it
>> >> indefinitely until it succeeds. We ran a test over a weekend in
>>which we
>> >> published messages as fast as we could (from a single publisher).
>>Each
>> >> message has a unique ID so we can ensure that all messages are saved
>>by
>> >> Kafka at least once at the end of the test. We have a simple script,
>>run
>> >> via cron, that kills one broker (chosen at random) once every other
>>hour
>> >> (killed via "kill -9"). The broker is then revived 16 minutes after
>>it
>> >>was
>> >> killed. At the end of the weekend we ran a script to pull all data
>>from
>> >>all
>> >> partitions and then verify that all messages were persisted by Kafka.
>> >>For
>> >> the most part, the results are very good. We can sustain about 3k
>> >> message/second with almost no data loss.
>> >>
>> >> Of the roughly 460 million records we produced over 48 hours we lost
>> >>only
>> >> 7 records. But, I don't think we should have lost any record. All of
>>the
>> >> lost records were produced at almost exactly the time one of the
>>brokers
>> >> was killed (down to the second which is the granularity of our logs).
>> >>Note
>> >> that we're producing around 3k messages/second and we killed brokers
>> >>many
>> >> times over the 48 hour period. Only twice did we see data loss: once
>>we
>> >> lost 4 records and once we lost 3. I have checked the Kafka logs and
>> >>there
>> >> are some expected error messages from the surviving brokers that look
>> >>like:
>> >>
>> >>
>> >> [2014-03-19 02:21:12,088] ERROR [ReplicaFetcherThread-1-5], Error in
>> >>fetch
>> >> Name: FetchRequest; Version: 0; CorrelationId: 3491511; ClientId:
>> >> ReplicaFetcherThread-1-5; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1
>> >>bytes;
>> >> RequestInfo: [load_test,20] ->
>> >> PartitionFetchInfo(521319,1048576),[load_test,74] ->
>> >> PartitionFetchInfo(559017,1048576),[load_test,14] ->
>> >> PartitionFetchInfo(420539,1048576),[load_test,0] ->
>> >> PartitionFetchInfo(776869,1048576),[load_test,34] ->
>> >> PartitionFetchInfo(446435,1048576),[load_test,94] ->
>> >> PartitionFetchInfo(849943,1048576),[load_test,40] ->
>> >> PartitionFetchInfo(241876,1048576),[load_test,80] ->
>> >> PartitionFetchInfo(508778,1048576),[load_test,60] ->
>> >> PartitionFetchInfo(81314,1048576),[load_test,54] ->
>> >> PartitionFetchInfo(165798,1048576)
>>(kafka.server.ReplicaFetcherThread)
>> >>
>> >> java.net.ConnectException: Connection refused
>> >>
>> >>         at sun.nio.ch.Net.connect0(Native Method)
>> >>
>> >>         at sun.nio.ch.Net.connect(Net.java:465)
>> >>
>> >>         at sun.nio.ch.Net.connect(Net.java:457)
>> >>
>> >>         at
>> >>sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
>> >>
>> >>         at
>> >>kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>> >>
>> >>         at
>> >>kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
>> >>
>> >>         at
>> >>kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
>> >>
>> >>         at
>> >> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
>> >>
>> >>         at
>> >>
>> 
>>>>kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendReques
>>>>t(
>> >>SimpleConsumer.scala:71)
>> >>
>> >>         at
>> >>
>> 
>>>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.
>>>>ap
>> >>ply$mcV$sp(SimpleConsumer.scala:109)
>> >>
>> >>         at
>> >>
>> 
>>>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.
>>>>ap
>> >>ply(SimpleConsumer.scala:109)
>> >>
>> >>         at
>> >>
>> 
>>>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.
>>>>ap
>> >>ply(SimpleConsumer.scala:109)
>> >>
>> >>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> >>
>> >>         at
>> >>
>> 
>>>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsu
>>>>me
>> >>r.scala:108)
>> >>
>> >>         at
>> >>
>> 
>>>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.sca
>>>>la
>> >>:108)
>> >>
>> >>         at
>> >>
>> 
>>>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.sca
>>>>la
>> >>:108)
>> >>
>> >>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> >>
>> >>         at 
>>kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>> >>
>> >>         at
>> >>
>> 
>>>>kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherT
>>>>hr
>> >>ead.scala:96)
>> >>
>> >>         at
>> >>
>> 
>>>>kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:8
>>>>8)
>> >>
>> >>         at
>> >>kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>> >>
>> >> I have verified that all the partitions mentioned in these messages
>> >>(e.g.
>> >> The above mentions partitions 0, 34, 94, etc.) had the newly killed
>> >>node as
>> >> the leader. I believe that means that the other 4 brokers were alive
>>and
>> >> running without issues. There are no other log messages that indicate
>> >>any
>> >> other broker communication issues.
>> >>
>> >> As I understand it, this scenario shouldn't cause any data loss
>>since at
>> >> least 4/5 of the brokers were alive and healthy at all times. Is
>>there
>> >>any
>> >> way to explain the data loss? Perhaps a known bug in 0.8.1?
>> >>
>> >> Thanks,
>> >> Oliver
>> >>
>> >>
>>
>>


Re: data loss on replicated topic

Posted by Jun Rao <ju...@gmail.com>.
We don't expect to lose data in that case. So, this sounds like a bug. Do
you see any other error/warn in broker log around the time the data is lost?

Thanks,

Jun


On Thu, Mar 27, 2014 at 10:52 AM, Oliver Dain <od...@3cinteractive.com>wrote:

> Hi Neha,
>
> Thanks for the reply. I do not see the ³No broker in ISR² message. If my
> original diagnosis was correct (that there were at least 2 replicas alive
> for the topic at all times) then I believe this is expected, right? I
> gather this makes it more likely that we¹ve hit KAFKA-1193?? If so, is
> there any workaround and/or an ETA for a fix?
>
> Thanks,
> Oliver
>
>
>
>
> On 3/27/14, 5:18 AM, "Neha Narkhede" <ne...@gmail.com> wrote:
>
> >It is possible that you are hitting KAFKA-1193, but I'm not sure. Do you
> >see the following log line when you observe data loss -
> >
> >"No broker in ISR is alive for ... There's potential data loss."
> >
> >Thanks,
> >Neha
> >
> >
> >On Wed, Mar 26, 2014 at 12:05 PM, Oliver Dain
> ><od...@3cinteractive.com>wrote:
> >
> >> I just saw https://issues.apache.org/jira/browse/KAFKA-1193 which seems
> >> like it could be the cause of this. Does that sound right? Is there a
> >>patch
> >> we can test? Any date/time when this is expected to be fixed?
> >>
> >> From: New User <odain@3cinteractive.com<mailto:odain@3cinteractive.com
> >>
> >> Date: Wednesday, March 26, 2014 at 11:59 AM
> >> To: "users@kafka.apache.org<ma...@kafka.apache.org>" <
> >> users@kafka.apache.org<ma...@kafka.apache.org>>
> >> Subject: data loss on replicated topic
> >>
> >> My company currently testing Kafka for throughput and fault tolerance.
> >> We've set up a cluster of 5 Kafka brokers and are publishing to a topic
> >> with replication factor 3 and 100 partitions. We are publishing with
> >> request.required.acks == -1 (e.g. All ISR replicas must ACK before the
> >> message is considered sent). If a publication fails, we retry it
> >> indefinitely until it succeeds. We ran a test over a weekend in which we
> >> published messages as fast as we could (from a single publisher). Each
> >> message has a unique ID so we can ensure that all messages are saved by
> >> Kafka at least once at the end of the test. We have a simple script, run
> >> via cron, that kills one broker (chosen at random) once every other hour
> >> (killed via "kill -9"). The broker is then revived 16 minutes after it
> >>was
> >> killed. At the end of the weekend we ran a script to pull all data from
> >>all
> >> partitions and then verify that all messages were persisted by Kafka.
> >>For
> >> the most part, the results are very good. We can sustain about 3k
> >> message/second with almost no data loss.
> >>
> >> Of the roughly 460 million records we produced over 48 hours we lost
> >>only
> >> 7 records. But, I don't think we should have lost any record. All of the
> >> lost records were produced at almost exactly the time one of the brokers
> >> was killed (down to the second which is the granularity of our logs).
> >>Note
> >> that we're producing around 3k messages/second and we killed brokers
> >>many
> >> times over the 48 hour period. Only twice did we see data loss: once we
> >> lost 4 records and once we lost 3. I have checked the Kafka logs and
> >>there
> >> are some expected error messages from the surviving brokers that look
> >>like:
> >>
> >>
> >> [2014-03-19 02:21:12,088] ERROR [ReplicaFetcherThread-1-5], Error in
> >>fetch
> >> Name: FetchRequest; Version: 0; CorrelationId: 3491511; ClientId:
> >> ReplicaFetcherThread-1-5; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1
> >>bytes;
> >> RequestInfo: [load_test,20] ->
> >> PartitionFetchInfo(521319,1048576),[load_test,74] ->
> >> PartitionFetchInfo(559017,1048576),[load_test,14] ->
> >> PartitionFetchInfo(420539,1048576),[load_test,0] ->
> >> PartitionFetchInfo(776869,1048576),[load_test,34] ->
> >> PartitionFetchInfo(446435,1048576),[load_test,94] ->
> >> PartitionFetchInfo(849943,1048576),[load_test,40] ->
> >> PartitionFetchInfo(241876,1048576),[load_test,80] ->
> >> PartitionFetchInfo(508778,1048576),[load_test,60] ->
> >> PartitionFetchInfo(81314,1048576),[load_test,54] ->
> >> PartitionFetchInfo(165798,1048576) (kafka.server.ReplicaFetcherThread)
> >>
> >> java.net.ConnectException: Connection refused
> >>
> >>         at sun.nio.ch.Net.connect0(Native Method)
> >>
> >>         at sun.nio.ch.Net.connect(Net.java:465)
> >>
> >>         at sun.nio.ch.Net.connect(Net.java:457)
> >>
> >>         at
> >>sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
> >>
> >>         at
> >>kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> >>
> >>         at
> >>kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> >>
> >>         at
> >>kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> >>
> >>         at
> >> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> >>
> >>         at
> >>
> >>kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(
> >>SimpleConsumer.scala:71)
> >>
> >>         at
> >>
> >>kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.ap
> >>ply$mcV$sp(SimpleConsumer.scala:109)
> >>
> >>         at
> >>
> >>kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.ap
> >>ply(SimpleConsumer.scala:109)
> >>
> >>         at
> >>
> >>kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.ap
> >>ply(SimpleConsumer.scala:109)
> >>
> >>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >>
> >>         at
> >>
> >>kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsume
> >>r.scala:108)
> >>
> >>         at
> >>
> >>kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala
> >>:108)
> >>
> >>         at
> >>
> >>kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala
> >>:108)
> >>
> >>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >>
> >>         at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
> >>
> >>         at
> >>
> >>kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThr
> >>ead.scala:96)
> >>
> >>         at
> >>
> >>kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> >>
> >>         at
> >>kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> >>
> >> I have verified that all the partitions mentioned in these messages
> >>(e.g.
> >> The above mentions partitions 0, 34, 94, etc.) had the newly killed
> >>node as
> >> the leader. I believe that means that the other 4 brokers were alive and
> >> running without issues. There are no other log messages that indicate
> >>any
> >> other broker communication issues.
> >>
> >> As I understand it, this scenario shouldn't cause any data loss since at
> >> least 4/5 of the brokers were alive and healthy at all times. Is there
> >>any
> >> way to explain the data loss? Perhaps a known bug in 0.8.1?
> >>
> >> Thanks,
> >> Oliver
> >>
> >>
>
>

Re: data loss on replicated topic

Posted by Oliver Dain <od...@3cinteractive.com>.
Hi Neha,

Thanks for the reply. I do not see the ³No broker in ISR² message. If my
original diagnosis was correct (that there were at least 2 replicas alive
for the topic at all times) then I believe this is expected, right? I
gather this makes it more likely that we¹ve hit KAFKA-1193?? If so, is
there any workaround and/or an ETA for a fix?

Thanks,
Oliver




On 3/27/14, 5:18 AM, "Neha Narkhede" <ne...@gmail.com> wrote:

>It is possible that you are hitting KAFKA-1193, but I'm not sure. Do you
>see the following log line when you observe data loss -
>
>"No broker in ISR is alive for ... There's potential data loss."
>
>Thanks,
>Neha
>
>
>On Wed, Mar 26, 2014 at 12:05 PM, Oliver Dain
><od...@3cinteractive.com>wrote:
>
>> I just saw https://issues.apache.org/jira/browse/KAFKA-1193 which seems
>> like it could be the cause of this. Does that sound right? Is there a
>>patch
>> we can test? Any date/time when this is expected to be fixed?
>>
>> From: New User <od...@3cinteractive.com>>
>> Date: Wednesday, March 26, 2014 at 11:59 AM
>> To: "users@kafka.apache.org<ma...@kafka.apache.org>" <
>> users@kafka.apache.org<ma...@kafka.apache.org>>
>> Subject: data loss on replicated topic
>>
>> My company currently testing Kafka for throughput and fault tolerance.
>> We've set up a cluster of 5 Kafka brokers and are publishing to a topic
>> with replication factor 3 and 100 partitions. We are publishing with
>> request.required.acks == -1 (e.g. All ISR replicas must ACK before the
>> message is considered sent). If a publication fails, we retry it
>> indefinitely until it succeeds. We ran a test over a weekend in which we
>> published messages as fast as we could (from a single publisher). Each
>> message has a unique ID so we can ensure that all messages are saved by
>> Kafka at least once at the end of the test. We have a simple script, run
>> via cron, that kills one broker (chosen at random) once every other hour
>> (killed via "kill -9"). The broker is then revived 16 minutes after it
>>was
>> killed. At the end of the weekend we ran a script to pull all data from
>>all
>> partitions and then verify that all messages were persisted by Kafka.
>>For
>> the most part, the results are very good. We can sustain about 3k
>> message/second with almost no data loss.
>>
>> Of the roughly 460 million records we produced over 48 hours we lost
>>only
>> 7 records. But, I don't think we should have lost any record. All of the
>> lost records were produced at almost exactly the time one of the brokers
>> was killed (down to the second which is the granularity of our logs).
>>Note
>> that we're producing around 3k messages/second and we killed brokers
>>many
>> times over the 48 hour period. Only twice did we see data loss: once we
>> lost 4 records and once we lost 3. I have checked the Kafka logs and
>>there
>> are some expected error messages from the surviving brokers that look
>>like:
>>
>>
>> [2014-03-19 02:21:12,088] ERROR [ReplicaFetcherThread-1-5], Error in
>>fetch
>> Name: FetchRequest; Version: 0; CorrelationId: 3491511; ClientId:
>> ReplicaFetcherThread-1-5; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1
>>bytes;
>> RequestInfo: [load_test,20] ->
>> PartitionFetchInfo(521319,1048576),[load_test,74] ->
>> PartitionFetchInfo(559017,1048576),[load_test,14] ->
>> PartitionFetchInfo(420539,1048576),[load_test,0] ->
>> PartitionFetchInfo(776869,1048576),[load_test,34] ->
>> PartitionFetchInfo(446435,1048576),[load_test,94] ->
>> PartitionFetchInfo(849943,1048576),[load_test,40] ->
>> PartitionFetchInfo(241876,1048576),[load_test,80] ->
>> PartitionFetchInfo(508778,1048576),[load_test,60] ->
>> PartitionFetchInfo(81314,1048576),[load_test,54] ->
>> PartitionFetchInfo(165798,1048576) (kafka.server.ReplicaFetcherThread)
>>
>> java.net.ConnectException: Connection refused
>>
>>         at sun.nio.ch.Net.connect0(Native Method)
>>
>>         at sun.nio.ch.Net.connect(Net.java:465)
>>
>>         at sun.nio.ch.Net.connect(Net.java:457)
>>
>>         at 
>>sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
>>
>>         at 
>>kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>>
>>         at 
>>kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
>>
>>         at 
>>kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
>>
>>         at
>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
>>
>>         at
>> 
>>kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(
>>SimpleConsumer.scala:71)
>>
>>         at
>> 
>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.ap
>>ply$mcV$sp(SimpleConsumer.scala:109)
>>
>>         at
>> 
>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.ap
>>ply(SimpleConsumer.scala:109)
>>
>>         at
>> 
>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.ap
>>ply(SimpleConsumer.scala:109)
>>
>>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>
>>         at
>> 
>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsume
>>r.scala:108)
>>
>>         at
>> 
>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala
>>:108)
>>
>>         at
>> 
>>kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala
>>:108)
>>
>>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>
>>         at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>>
>>         at
>> 
>>kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThr
>>ead.scala:96)
>>
>>         at
>> 
>>kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
>>
>>         at 
>>kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>>
>> I have verified that all the partitions mentioned in these messages
>>(e.g.
>> The above mentions partitions 0, 34, 94, etc.) had the newly killed
>>node as
>> the leader. I believe that means that the other 4 brokers were alive and
>> running without issues. There are no other log messages that indicate
>>any
>> other broker communication issues.
>>
>> As I understand it, this scenario shouldn't cause any data loss since at
>> least 4/5 of the brokers were alive and healthy at all times. Is there
>>any
>> way to explain the data loss? Perhaps a known bug in 0.8.1?
>>
>> Thanks,
>> Oliver
>>
>>


Re: data loss on replicated topic

Posted by Neha Narkhede <ne...@gmail.com>.
It is possible that you are hitting KAFKA-1193, but I'm not sure. Do you
see the following log line when you observe data loss -

"No broker in ISR is alive for ... There's potential data loss."

Thanks,
Neha


On Wed, Mar 26, 2014 at 12:05 PM, Oliver Dain <od...@3cinteractive.com>wrote:

> I just saw https://issues.apache.org/jira/browse/KAFKA-1193 which seems
> like it could be the cause of this. Does that sound right? Is there a patch
> we can test? Any date/time when this is expected to be fixed?
>
> From: New User <od...@3cinteractive.com>>
> Date: Wednesday, March 26, 2014 at 11:59 AM
> To: "users@kafka.apache.org<ma...@kafka.apache.org>" <
> users@kafka.apache.org<ma...@kafka.apache.org>>
> Subject: data loss on replicated topic
>
> My company currently testing Kafka for throughput and fault tolerance.
> We've set up a cluster of 5 Kafka brokers and are publishing to a topic
> with replication factor 3 and 100 partitions. We are publishing with
> request.required.acks == -1 (e.g. All ISR replicas must ACK before the
> message is considered sent). If a publication fails, we retry it
> indefinitely until it succeeds. We ran a test over a weekend in which we
> published messages as fast as we could (from a single publisher). Each
> message has a unique ID so we can ensure that all messages are saved by
> Kafka at least once at the end of the test. We have a simple script, run
> via cron, that kills one broker (chosen at random) once every other hour
> (killed via "kill -9"). The broker is then revived 16 minutes after it was
> killed. At the end of the weekend we ran a script to pull all data from all
> partitions and then verify that all messages were persisted by Kafka. For
> the most part, the results are very good. We can sustain about 3k
> message/second with almost no data loss.
>
> Of the roughly 460 million records we produced over 48 hours we lost only
> 7 records. But, I don't think we should have lost any record. All of the
> lost records were produced at almost exactly the time one of the brokers
> was killed (down to the second which is the granularity of our logs). Note
> that we're producing around 3k messages/second and we killed brokers many
> times over the 48 hour period. Only twice did we see data loss: once we
> lost 4 records and once we lost 3. I have checked the Kafka logs and there
> are some expected error messages from the surviving brokers that look like:
>
>
> [2014-03-19 02:21:12,088] ERROR [ReplicaFetcherThread-1-5], Error in fetch
> Name: FetchRequest; Version: 0; CorrelationId: 3491511; ClientId:
> ReplicaFetcherThread-1-5; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes;
> RequestInfo: [load_test,20] ->
> PartitionFetchInfo(521319,1048576),[load_test,74] ->
> PartitionFetchInfo(559017,1048576),[load_test,14] ->
> PartitionFetchInfo(420539,1048576),[load_test,0] ->
> PartitionFetchInfo(776869,1048576),[load_test,34] ->
> PartitionFetchInfo(446435,1048576),[load_test,94] ->
> PartitionFetchInfo(849943,1048576),[load_test,40] ->
> PartitionFetchInfo(241876,1048576),[load_test,80] ->
> PartitionFetchInfo(508778,1048576),[load_test,60] ->
> PartitionFetchInfo(81314,1048576),[load_test,54] ->
> PartitionFetchInfo(165798,1048576) (kafka.server.ReplicaFetcherThread)
>
> java.net.ConnectException: Connection refused
>
>         at sun.nio.ch.Net.connect0(Native Method)
>
>         at sun.nio.ch.Net.connect(Net.java:465)
>
>         at sun.nio.ch.Net.connect(Net.java:457)
>
>         at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
>
>         at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>
>         at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
>
>         at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
>
>         at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
>
>         at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
>
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
>
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>
>         at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>
>         at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
>
>         at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
>
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>
> I have verified that all the partitions mentioned in these messages (e.g.
> The above mentions partitions 0, 34, 94, etc.) had the newly killed node as
> the leader. I believe that means that the other 4 brokers were alive and
> running without issues. There are no other log messages that indicate any
> other broker communication issues.
>
> As I understand it, this scenario shouldn't cause any data loss since at
> least 4/5 of the brokers were alive and healthy at all times. Is there any
> way to explain the data loss? Perhaps a known bug in 0.8.1?
>
> Thanks,
> Oliver
>
>

Re: data loss on replicated topic

Posted by Oliver Dain <od...@3cinteractive.com>.
I just saw https://issues.apache.org/jira/browse/KAFKA-1193 which seems like it could be the cause of this. Does that sound right? Is there a patch we can test? Any date/time when this is expected to be fixed?

From: New User <od...@3cinteractive.com>>
Date: Wednesday, March 26, 2014 at 11:59 AM
To: "users@kafka.apache.org<ma...@kafka.apache.org>" <us...@kafka.apache.org>>
Subject: data loss on replicated topic

My company currently testing Kafka for throughput and fault tolerance. We’ve set up a cluster of 5 Kafka brokers and are publishing to a topic with replication factor 3 and 100 partitions. We are publishing with request.required.acks == -1 (e.g. All ISR replicas must ACK before the message is considered sent). If a publication fails, we retry it indefinitely until it succeeds. We ran a test over a weekend in which we published messages as fast as we could (from a single publisher). Each message has a unique ID so we can ensure that all messages are saved by Kafka at least once at the end of the test. We have a simple script, run via cron, that kills one broker (chosen at random) once every other hour (killed via “kill –9”). The broker is then revived 16 minutes after it was killed. At the end of the weekend we ran a script to pull all data from all partitions and then verify that all messages were persisted by Kafka. For the most part, the results are very good. We can sustain about 3k message/second with almost no data loss.

Of the roughly 460 million records we produced over 48 hours we lost only 7 records. But, I don’t think we should have lost any record. All of the lost records were produced at almost exactly the time one of the brokers was killed (down to the second which is the granularity of our logs). Note that we’re producing around 3k messages/second and we killed brokers many times over the 48 hour period. Only twice did we see data loss: once we lost 4 records and once we lost 3. I have checked the Kafka logs and there are some expected error messages from the surviving brokers that look like:


[2014-03-19 02:21:12,088] ERROR [ReplicaFetcherThread-1-5], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 3491511; ClientId: ReplicaFetcherThread-1-5; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [load_test,20] -> PartitionFetchInfo(521319,1048576),[load_test,74] -> PartitionFetchInfo(559017,1048576),[load_test,14] -> PartitionFetchInfo(420539,1048576),[load_test,0] -> PartitionFetchInfo(776869,1048576),[load_test,34] -> PartitionFetchInfo(446435,1048576),[load_test,94] -> PartitionFetchInfo(849943,1048576),[load_test,40] -> PartitionFetchInfo(241876,1048576),[load_test,80] -> PartitionFetchInfo(508778,1048576),[load_test,60] -> PartitionFetchInfo(81314,1048576),[load_test,54] -> PartitionFetchInfo(165798,1048576) (kafka.server.ReplicaFetcherThread)

java.net.ConnectException: Connection refused

        at sun.nio.ch.Net.connect0(Native Method)

        at sun.nio.ch.Net.connect(Net.java:465)

        at sun.nio.ch.Net.connect(Net.java:457)

        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)

        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)

        at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)

        at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)

        at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)

        at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)

        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)

        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)

        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)

        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)

        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)

        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)

        at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)

        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)

        at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)

        at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)

        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)

        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)

I have verified that all the partitions mentioned in these messages (e.g. The above mentions partitions 0, 34, 94, etc.) had the newly killed node as the leader. I believe that means that the other 4 brokers were alive and running without issues. There are no other log messages that indicate any other broker communication issues.

As I understand it, this scenario shouldn’t cause any data loss since at least 4/5 of the brokers were alive and healthy at all times. Is there any way to explain the data loss? Perhaps a known bug in 0.8.1?

Thanks,
Oliver