You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Chengwei Yang <ch...@gmail.com> on 2014/05/29 11:38:25 UTC
producer & consumer fail if the leader failed-over
Hi List,
I'm new to kafka, feel sorry if this has been asked, I didn't figure out
my answer by googling, so asked here. Thanks in advance!
I'm following the kafka quick start
http://kafka.apache.org/documentation.html#quickstart
and managed setup a kafka cluster with two brokers, which connected to a
zookeeper service, consists of 3 zk hosts.
The working state looks like below
----------------------8<--------------- topic describe ----8<-------------
Topic:test PartitionCount:3 ReplicationFactor:2 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: test Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: test Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
--------------------8<---------------- end ------------8<--------------
-------------------8<------------- console-producer.sh -------8<-------
$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
further details.
this is a test message
another test message
haha
--------------------8<---------------- end ------------8<--------------
-------------------8<------------- console-consumer.sh -------8<-------
$ ./bin/kafka-console-consumer.sh --zookeeper <my zk service here> --topic test --from-beginning
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
further details.
this is a test message
another test message
haha
-----------------8<---------------- end ----------------8<-----------
So far, everything works, however, when I following the quick start to
kill the leader broker for topic *test*, say that one has broker.id 0,
then error happended, like below
-------------------8<------------- console-consumer.sh -------8<-------
[2014-05-29 17:05:26,446] WARN Reconnect due to socket error: Received -1 when reading from channel, socket has likely been closed. (kafka.consumer.SimpleConsumer)
[2014-05-29 17:05:26,460] ERROR [ConsumerFetcherThread-console-consumer-40784_<hostname>-1401354179291-21cd16e1-0-0], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 1401; ClientId: console-consumer-40784-ConsumerFetcherThread-console-consumer-40784_<hostname>-1401354179291-21cd16e1-0-0; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [test,0] -> PartitionFetchInfo(0,1048576),[test,2] -> PartitionFetchInfo(0,1048576) (kafka.consumer.ConsumerFetcherThread)
java.net.ConnectException: Connection refused
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:465)
at sun.nio.ch.Net.connect(Net.java:457)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
-----------------8<---------------- end ----------------8<-----------
And when I send another message by using console-producer, it fails with
the same error, connect refused.
>From the kafka-topic.sh, it says the leader already failed-over, like
below.
----------------------8<--------------- topic describe ----8<-------------
Topic:test PartitionCount:3 ReplicationFactor:2 Configs:
Topic: test Partition: 0 Leader: 1 Replicas: 0,1 Isr: 1
Topic: test Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1
Topic: test Partition: 2 Leader: 1 Replicas: 0,1 Isr: 1
-----------------8<---------------- end ----------------8<-----------
However, from the quick start, it seems it shouldn't fail at all.
--
Thanks,
Chengwei
Re: producer & consumer fail if the leader failed-over
Posted by Jun Rao <ju...@gmail.com>.
That's just a warning and the producer will retry. Was there an error
(which means all retries failed)?
Thanks,
Jun
On Thu, May 29, 2014 at 6:06 PM, Chengwei Yang <ch...@gmail.com>
wrote:
> Hi Jun,
>
> Thanks for your reply.
>
> On Thu, May 29, 2014 at 07:52:11AM -0700, Jun Rao wrote:
> > You need to pass in multiple brokers in --broker-list in the producer. To
> > obtain the new leaders, the producer needs to talk to a live broker to
> get
> > the new metadata.
>
> Then I did tried with with 3 broker, and put them all into
> --broker-list like below, but still fail with broken pipe.
>
> $ ./bin/kafka-console-producer.sh --broker-list
> kafka-broker1:9092,kafka-broker2:9092,kafka-broker3:9092 --topic test-1
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> further details.
> ok
> will kill leader broker.id=2
>
> fail?
> [2014-05-29 18:05:31,973] WARN Failed to send producer request with
> correlation id 8 to broker 2 with data for partitions [test-1,0]
> (kafka.producer.async.DefaultEventHandler)
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
> at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
> at sun.nio.ch.IOUtil.write(IOUtil.java:148)
>
> Another question is why the consumer failed?
>
> --
> Thanks,
> Chengwei
>
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Thu, May 29, 2014 at 2:38 AM, Chengwei Yang
> > <ch...@gmail.com>wrote:
> >
> > > Hi List,
> > >
> > > I'm new to kafka, feel sorry if this has been asked, I didn't figure
> out
> > > my answer by googling, so asked here. Thanks in advance!
> > >
> > > I'm following the kafka quick start
> > > http://kafka.apache.org/documentation.html#quickstart
> > >
> > > and managed setup a kafka cluster with two brokers, which connected to
> a
> > > zookeeper service, consists of 3 zk hosts.
> > >
> > > The working state looks like below
> > > ----------------------8<--------------- topic describe
> ----8<-------------
> > > Topic:test PartitionCount:3 ReplicationFactor:2
> Configs:
> > > Topic: test Partition: 0 Leader: 0 Replicas: 0,1
> > > Isr: 0,1
> > > Topic: test Partition: 1 Leader: 1 Replicas: 1,0
> > > Isr: 1,0
> > > Topic: test Partition: 2 Leader: 0 Replicas: 0,1
> > > Isr: 0,1
> > > --------------------8<---------------- end ------------8<--------------
> > >
> > > -------------------8<------------- console-producer.sh -------8<-------
> > > $ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
> test
> > > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> > > SLF4J: Defaulting to no-operation (NOP) logger implementation
> > > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> > > further details.
> > > this is a test message
> > > another test message
> > > haha
> > > --------------------8<---------------- end ------------8<--------------
> > >
> > > -------------------8<------------- console-consumer.sh -------8<-------
> > > $ ./bin/kafka-console-consumer.sh --zookeeper <my zk service here>
> --topic
> > > test --from-beginning
> > > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> > > SLF4J: Defaulting to no-operation (NOP) logger implementation
> > > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> > > further details.
> > > this is a test message
> > > another test message
> > > haha
> > > -----------------8<---------------- end ----------------8<-----------
> > >
> > > So far, everything works, however, when I following the quick start to
> > > kill the leader broker for topic *test*, say that one has broker.id 0,
> > > then error happended, like below
> > >
> > > -------------------8<------------- console-consumer.sh -------8<-------
> > > [2014-05-29 17:05:26,446] WARN Reconnect due to socket error: Received
> -1
> > > when reading from channel, socket has likely been closed.
> > > (kafka.consumer.SimpleConsumer)
> > > [2014-05-29 17:05:26,460] ERROR
> > >
> [ConsumerFetcherThread-console-consumer-40784_<hostname>-1401354179291-21cd16e1-0-0],
> > > Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 1401;
> > > ClientId:
> > >
> console-consumer-40784-ConsumerFetcherThread-console-consumer-40784_<hostname>-1401354179291-21cd16e1-0-0;
> > > ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo:
> [test,0] ->
> > > PartitionFetchInfo(0,1048576),[test,2] -> PartitionFetchInfo(0,1048576)
> > > (kafka.consumer.ConsumerFetcherThread)
> > > java.net.ConnectException: Connection refused
> > > at sun.nio.ch.Net.connect0(Native Method)
> > > at sun.nio.ch.Net.connect(Net.java:465)
> > > at sun.nio.ch.Net.connect(Net.java:457)
> > > at
> sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
> > > at
> kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > at
> kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> > > at
> kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > > at
> > > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> > > at
> > >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > > at
> > >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > > at
> > >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> > > at
> > >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > at
> > >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
> > > at
> > >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> > > at
> > >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > at
> kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
> > > at
> > >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> > > at
> > >
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> > > at
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > -----------------8<---------------- end ----------------8<-----------
> > >
> > > And when I send another message by using console-producer, it fails
> with
> > > the same error, connect refused.
> > >
> > > From the kafka-topic.sh, it says the leader already failed-over, like
> > > below.
> > >
> > > ----------------------8<--------------- topic describe
> ----8<-------------
> > > Topic:test PartitionCount:3 ReplicationFactor:2
> Configs:
> > > Topic: test Partition: 0 Leader: 1 Replicas: 0,1
> > > Isr: 1
> > > Topic: test Partition: 1 Leader: 1 Replicas: 1,0
> > > Isr: 1
> > > Topic: test Partition: 2 Leader: 1 Replicas: 0,1
> > > Isr: 1
> > > -----------------8<---------------- end ----------------8<-----------
> > >
> > > However, from the quick start, it seems it shouldn't fail at all.
> > >
> > > --
> > > Thanks,
> > > Chengwei
> > >
>
Re: producer & consumer fail if the leader failed-over
Posted by Chengwei Yang <ch...@gmail.com>.
Hi Jun,
Thanks for your reply.
On Thu, May 29, 2014 at 07:52:11AM -0700, Jun Rao wrote:
> You need to pass in multiple brokers in --broker-list in the producer. To
> obtain the new leaders, the producer needs to talk to a live broker to get
> the new metadata.
Then I did tried with with 3 broker, and put them all into
--broker-list like below, but still fail with broken pipe.
$ ./bin/kafka-console-producer.sh --broker-list kafka-broker1:9092,kafka-broker2:9092,kafka-broker3:9092 --topic test-1
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
further details.
ok
will kill leader broker.id=2
fail?
[2014-05-29 18:05:31,973] WARN Failed to send producer request with
correlation id 8 to broker 2 with data for partitions [test-1,0]
(kafka.producer.async.DefaultEventHandler)
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
at sun.nio.ch.IOUtil.write(IOUtil.java:148)
Another question is why the consumer failed?
--
Thanks,
Chengwei
>
> Thanks,
>
> Jun
>
>
> On Thu, May 29, 2014 at 2:38 AM, Chengwei Yang
> <ch...@gmail.com>wrote:
>
> > Hi List,
> >
> > I'm new to kafka, feel sorry if this has been asked, I didn't figure out
> > my answer by googling, so asked here. Thanks in advance!
> >
> > I'm following the kafka quick start
> > http://kafka.apache.org/documentation.html#quickstart
> >
> > and managed setup a kafka cluster with two brokers, which connected to a
> > zookeeper service, consists of 3 zk hosts.
> >
> > The working state looks like below
> > ----------------------8<--------------- topic describe ----8<-------------
> > Topic:test PartitionCount:3 ReplicationFactor:2 Configs:
> > Topic: test Partition: 0 Leader: 0 Replicas: 0,1
> > Isr: 0,1
> > Topic: test Partition: 1 Leader: 1 Replicas: 1,0
> > Isr: 1,0
> > Topic: test Partition: 2 Leader: 0 Replicas: 0,1
> > Isr: 0,1
> > --------------------8<---------------- end ------------8<--------------
> >
> > -------------------8<------------- console-producer.sh -------8<-------
> > $ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> > SLF4J: Defaulting to no-operation (NOP) logger implementation
> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> > further details.
> > this is a test message
> > another test message
> > haha
> > --------------------8<---------------- end ------------8<--------------
> >
> > -------------------8<------------- console-consumer.sh -------8<-------
> > $ ./bin/kafka-console-consumer.sh --zookeeper <my zk service here> --topic
> > test --from-beginning
> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> > SLF4J: Defaulting to no-operation (NOP) logger implementation
> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> > further details.
> > this is a test message
> > another test message
> > haha
> > -----------------8<---------------- end ----------------8<-----------
> >
> > So far, everything works, however, when I following the quick start to
> > kill the leader broker for topic *test*, say that one has broker.id 0,
> > then error happended, like below
> >
> > -------------------8<------------- console-consumer.sh -------8<-------
> > [2014-05-29 17:05:26,446] WARN Reconnect due to socket error: Received -1
> > when reading from channel, socket has likely been closed.
> > (kafka.consumer.SimpleConsumer)
> > [2014-05-29 17:05:26,460] ERROR
> > [ConsumerFetcherThread-console-consumer-40784_<hostname>-1401354179291-21cd16e1-0-0],
> > Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 1401;
> > ClientId:
> > console-consumer-40784-ConsumerFetcherThread-console-consumer-40784_<hostname>-1401354179291-21cd16e1-0-0;
> > ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [test,0] ->
> > PartitionFetchInfo(0,1048576),[test,2] -> PartitionFetchInfo(0,1048576)
> > (kafka.consumer.ConsumerFetcherThread)
> > java.net.ConnectException: Connection refused
> > at sun.nio.ch.Net.connect0(Native Method)
> > at sun.nio.ch.Net.connect(Net.java:465)
> > at sun.nio.ch.Net.connect(Net.java:457)
> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
> > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> > at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > at
> > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> > at
> > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > at
> > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > at
> > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> > at
> > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > at
> > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
> > at
> > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> > at
> > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
> > at
> > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> > at
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > -----------------8<---------------- end ----------------8<-----------
> >
> > And when I send another message by using console-producer, it fails with
> > the same error, connect refused.
> >
> > From the kafka-topic.sh, it says the leader already failed-over, like
> > below.
> >
> > ----------------------8<--------------- topic describe ----8<-------------
> > Topic:test PartitionCount:3 ReplicationFactor:2 Configs:
> > Topic: test Partition: 0 Leader: 1 Replicas: 0,1
> > Isr: 1
> > Topic: test Partition: 1 Leader: 1 Replicas: 1,0
> > Isr: 1
> > Topic: test Partition: 2 Leader: 1 Replicas: 0,1
> > Isr: 1
> > -----------------8<---------------- end ----------------8<-----------
> >
> > However, from the quick start, it seems it shouldn't fail at all.
> >
> > --
> > Thanks,
> > Chengwei
> >
Re: Message lost during leader fail-over
Posted by Chengwei Yang <ch...@gmail.com>.
On Thu, May 29, 2014 at 08:39:47PM -0700, Jun Rao wrote:
> Have you looked at request.required.acks in
> http://kafka.apache.org/documentation.html#producerconfigs ?
Thanks Jun, that's make sense to me.
--
Thanks,
Chengwei
>
> Thanks,
>
> Jun
Re: Message lost during leader fail-over
Posted by Jun Rao <ju...@gmail.com>.
Have you looked at request.required.acks in
http://kafka.apache.org/documentation.html#producerconfigs ?
Thanks,
Jun
On Thu, May 29, 2014 at 6:43 PM, Chengwei Yang <ch...@gmail.com>
wrote:
> Hi Jun,
>
> I think I was scared by these exceptions, the producer and consumer are
> still alive, seems reconnect to another broker though it through a lot
> of exceptions.
>
> However I find another issue seems if the consumer write a message just
> after the leader broker killed, the message will lost, means messages
> write during the fail-over period will be lost.
>
> See below logs.
>
> ------------------8<----------- producer -------------8<-------------
> $ ./bin/kafka-console-producer.sh --broker-list
> broker1:9092,broker2:9092,broker3:9092 --topic test-1
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
> details.
> lalal
> ok
> [2014-05-30 09:33:40,415] WARN Failed to send producer request with
> correlation id 7 to broker 1 with data for partitions [test-1,0]
> (kafka.producer.async.DefaultEventHandler)
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
> ...
> ... exceptions output
> ...
>
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
> at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> fine?
> the previous "ok" lost?
>
> --------------8<----------- consumer -------------8<--------------
> $ ./bin/kafka-console-consumer.sh --zookeeper zk1:2181,zk2:2181,zk3:2181
> --topic test-1 --from-beginning
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
> details.
> lalal
> [2014-05-30 09:33:21,280] WARN Reconnect due to socket error: Received -1
> when reading from channel, socket has likely been closed.
> (kafka.consumer.SimpleConsumer)
> [2014-05-30 09:33:21,295] ERROR
> [ConsumerFetcherThread-console-consumer-94537_xulijian-mesos-online010-cqdx.qiyi.virtual-1401413557464-3a6250eb-0-1],
> Error in fetch Name: FetchRequest; Version: 0; CorrelationId:
> 406; ClientId:
> console-consumer-94537-ConsumerFetcherThread-console-consumer-94537_xulijian-mesos-online010-cqdx.qiyi.virtual-1401413557464-3a6250eb-0-1;
> ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; Reque
> stInfo: [test-1,0] -> PartitionFetchInfo(10,1048576)
> (kafka.consumer.ConsumerFetcherThread)
> java.net.ConnectException: Connection refused
> at sun.nio.ch.Net.connect0(Native Method)
> at sun.nio.ch.Net.connect(Net.java:465)
> ...
> ... exceptions output
> ...
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> fine?
> the previous "ok" lost?
>
> --
> Thanks,
> Chengwei
>
> On Thu, May 29, 2014 at 07:52:11AM -0700, Jun Rao wrote:
> > You need to pass in multiple brokers in --broker-list in the producer. To
> > obtain the new leaders, the producer needs to talk to a live broker to
> get
> > the new metadata.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Thu, May 29, 2014 at 2:38 AM, Chengwei Yang
> > <ch...@gmail.com>wrote:
> >
> > > Hi List,
> > >
> > > I'm new to kafka, feel sorry if this has been asked, I didn't figure
> out
> > > my answer by googling, so asked here. Thanks in advance!
> > >
> > > I'm following the kafka quick start
> > > http://kafka.apache.org/documentation.html#quickstart
> > >
> > > and managed setup a kafka cluster with two brokers, which connected to
> a
> > > zookeeper service, consists of 3 zk hosts.
> > >
> > > The working state looks like below
> > > ----------------------8<--------------- topic describe
> ----8<-------------
> > > Topic:test PartitionCount:3 ReplicationFactor:2
> Configs:
> > > Topic: test Partition: 0 Leader: 0 Replicas: 0,1
> > > Isr: 0,1
> > > Topic: test Partition: 1 Leader: 1 Replicas: 1,0
> > > Isr: 1,0
> > > Topic: test Partition: 2 Leader: 0 Replicas: 0,1
> > > Isr: 0,1
> > > --------------------8<---------------- end ------------8<--------------
> > >
> > > -------------------8<------------- console-producer.sh -------8<-------
> > > $ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
> test
> > > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> > > SLF4J: Defaulting to no-operation (NOP) logger implementation
> > > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> > > further details.
> > > this is a test message
> > > another test message
> > > haha
> > > --------------------8<---------------- end ------------8<--------------
> > >
> > > -------------------8<------------- console-consumer.sh -------8<-------
> > > $ ./bin/kafka-console-consumer.sh --zookeeper <my zk service here>
> --topic
> > > test --from-beginning
> > > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> > > SLF4J: Defaulting to no-operation (NOP) logger implementation
> > > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> > > further details.
> > > this is a test message
> > > another test message
> > > haha
> > > -----------------8<---------------- end ----------------8<-----------
> > >
> > > So far, everything works, however, when I following the quick start to
> > > kill the leader broker for topic *test*, say that one has broker.id 0,
> > > then error happended, like below
> > >
> > > -------------------8<------------- console-consumer.sh -------8<-------
> > > [2014-05-29 17:05:26,446] WARN Reconnect due to socket error: Received
> -1
> > > when reading from channel, socket has likely been closed.
> > > (kafka.consumer.SimpleConsumer)
> > > [2014-05-29 17:05:26,460] ERROR
> > >
> [ConsumerFetcherThread-console-consumer-40784_<hostname>-1401354179291-21cd16e1-0-0],
> > > Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 1401;
> > > ClientId:
> > >
> console-consumer-40784-ConsumerFetcherThread-console-consumer-40784_<hostname>-1401354179291-21cd16e1-0-0;
> > > ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo:
> [test,0] ->
> > > PartitionFetchInfo(0,1048576),[test,2] -> PartitionFetchInfo(0,1048576)
> > > (kafka.consumer.ConsumerFetcherThread)
> > > java.net.ConnectException: Connection refused
> > > at sun.nio.ch.Net.connect0(Native Method)
> > > at sun.nio.ch.Net.connect(Net.java:465)
> > > at sun.nio.ch.Net.connect(Net.java:457)
> > > at
> sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
> > > at
> kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > > at
> kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> > > at
> kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > > at
> > > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> > > at
> > >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > > at
> > >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > > at
> > >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> > > at
> > >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > at
> > >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
> > > at
> > >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> > > at
> > >
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> > > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > > at
> kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
> > > at
> > >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> > > at
> > >
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> > > at
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > -----------------8<---------------- end ----------------8<-----------
> > >
> > > And when I send another message by using console-producer, it fails
> with
> > > the same error, connect refused.
> > >
> > > From the kafka-topic.sh, it says the leader already failed-over, like
> > > below.
> > >
> > > ----------------------8<--------------- topic describe
> ----8<-------------
> > > Topic:test PartitionCount:3 ReplicationFactor:2
> Configs:
> > > Topic: test Partition: 0 Leader: 1 Replicas: 0,1
> > > Isr: 1
> > > Topic: test Partition: 1 Leader: 1 Replicas: 1,0
> > > Isr: 1
> > > Topic: test Partition: 2 Leader: 1 Replicas: 0,1
> > > Isr: 1
> > > -----------------8<---------------- end ----------------8<-----------
> > >
> > > However, from the quick start, it seems it shouldn't fail at all.
> > >
> > > --
> > > Thanks,
> > > Chengwei
> > >
>
Message lost during leader fail-over
Posted by Chengwei Yang <ch...@gmail.com>.
Hi Jun,
I think I was scared by these exceptions, the producer and consumer are
still alive, seems reconnect to another broker though it through a lot
of exceptions.
However I find another issue seems if the consumer write a message just
after the leader broker killed, the message will lost, means messages
write during the fail-over period will be lost.
See below logs.
------------------8<----------- producer -------------8<-------------
$ ./bin/kafka-console-producer.sh --broker-list broker1:9092,broker2:9092,broker3:9092 --topic test-1
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
lalal
ok
[2014-05-30 09:33:40,415] WARN Failed to send producer request with
correlation id 7 to broker 1 with data for partitions [test-1,0]
(kafka.producer.async.DefaultEventHandler)
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
...
... exceptions output
...
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
at
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
fine?
the previous "ok" lost?
--------------8<----------- consumer -------------8<--------------
$ ./bin/kafka-console-consumer.sh --zookeeper zk1:2181,zk2:2181,zk3:2181 --topic test-1 --from-beginning
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
lalal
[2014-05-30 09:33:21,280] WARN Reconnect due to socket error: Received -1 when reading from channel, socket has likely been closed. (kafka.consumer.SimpleConsumer)
[2014-05-30 09:33:21,295] ERROR [ConsumerFetcherThread-console-consumer-94537_xulijian-mesos-online010-cqdx.qiyi.virtual-1401413557464-3a6250eb-0-1], Error in fetch Name: FetchRequest; Version: 0; CorrelationId:
406; ClientId: console-consumer-94537-ConsumerFetcherThread-console-consumer-94537_xulijian-mesos-online010-cqdx.qiyi.virtual-1401413557464-3a6250eb-0-1; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; Reque
stInfo: [test-1,0] -> PartitionFetchInfo(10,1048576) (kafka.consumer.ConsumerFetcherThread)
java.net.ConnectException: Connection refused
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:465)
...
... exceptions output
...
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
fine?
the previous "ok" lost?
--
Thanks,
Chengwei
On Thu, May 29, 2014 at 07:52:11AM -0700, Jun Rao wrote:
> You need to pass in multiple brokers in --broker-list in the producer. To
> obtain the new leaders, the producer needs to talk to a live broker to get
> the new metadata.
>
> Thanks,
>
> Jun
>
>
> On Thu, May 29, 2014 at 2:38 AM, Chengwei Yang
> <ch...@gmail.com>wrote:
>
> > Hi List,
> >
> > I'm new to kafka, feel sorry if this has been asked, I didn't figure out
> > my answer by googling, so asked here. Thanks in advance!
> >
> > I'm following the kafka quick start
> > http://kafka.apache.org/documentation.html#quickstart
> >
> > and managed setup a kafka cluster with two brokers, which connected to a
> > zookeeper service, consists of 3 zk hosts.
> >
> > The working state looks like below
> > ----------------------8<--------------- topic describe ----8<-------------
> > Topic:test PartitionCount:3 ReplicationFactor:2 Configs:
> > Topic: test Partition: 0 Leader: 0 Replicas: 0,1
> > Isr: 0,1
> > Topic: test Partition: 1 Leader: 1 Replicas: 1,0
> > Isr: 1,0
> > Topic: test Partition: 2 Leader: 0 Replicas: 0,1
> > Isr: 0,1
> > --------------------8<---------------- end ------------8<--------------
> >
> > -------------------8<------------- console-producer.sh -------8<-------
> > $ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> > SLF4J: Defaulting to no-operation (NOP) logger implementation
> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> > further details.
> > this is a test message
> > another test message
> > haha
> > --------------------8<---------------- end ------------8<--------------
> >
> > -------------------8<------------- console-consumer.sh -------8<-------
> > $ ./bin/kafka-console-consumer.sh --zookeeper <my zk service here> --topic
> > test --from-beginning
> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> > SLF4J: Defaulting to no-operation (NOP) logger implementation
> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> > further details.
> > this is a test message
> > another test message
> > haha
> > -----------------8<---------------- end ----------------8<-----------
> >
> > So far, everything works, however, when I following the quick start to
> > kill the leader broker for topic *test*, say that one has broker.id 0,
> > then error happended, like below
> >
> > -------------------8<------------- console-consumer.sh -------8<-------
> > [2014-05-29 17:05:26,446] WARN Reconnect due to socket error: Received -1
> > when reading from channel, socket has likely been closed.
> > (kafka.consumer.SimpleConsumer)
> > [2014-05-29 17:05:26,460] ERROR
> > [ConsumerFetcherThread-console-consumer-40784_<hostname>-1401354179291-21cd16e1-0-0],
> > Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 1401;
> > ClientId:
> > console-consumer-40784-ConsumerFetcherThread-console-consumer-40784_<hostname>-1401354179291-21cd16e1-0-0;
> > ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [test,0] ->
> > PartitionFetchInfo(0,1048576),[test,2] -> PartitionFetchInfo(0,1048576)
> > (kafka.consumer.ConsumerFetcherThread)
> > java.net.ConnectException: Connection refused
> > at sun.nio.ch.Net.connect0(Native Method)
> > at sun.nio.ch.Net.connect(Net.java:465)
> > at sun.nio.ch.Net.connect(Net.java:457)
> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
> > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> > at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > at
> > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> > at
> > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > at
> > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > at
> > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> > at
> > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > at
> > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
> > at
> > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> > at
> > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
> > at
> > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> > at
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > -----------------8<---------------- end ----------------8<-----------
> >
> > And when I send another message by using console-producer, it fails with
> > the same error, connect refused.
> >
> > From the kafka-topic.sh, it says the leader already failed-over, like
> > below.
> >
> > ----------------------8<--------------- topic describe ----8<-------------
> > Topic:test PartitionCount:3 ReplicationFactor:2 Configs:
> > Topic: test Partition: 0 Leader: 1 Replicas: 0,1
> > Isr: 1
> > Topic: test Partition: 1 Leader: 1 Replicas: 1,0
> > Isr: 1
> > Topic: test Partition: 2 Leader: 1 Replicas: 0,1
> > Isr: 1
> > -----------------8<---------------- end ----------------8<-----------
> >
> > However, from the quick start, it seems it shouldn't fail at all.
> >
> > --
> > Thanks,
> > Chengwei
> >
Re: producer & consumer fail if the leader failed-over
Posted by Jun Rao <ju...@gmail.com>.
You need to pass in multiple brokers in --broker-list in the producer. To
obtain the new leaders, the producer needs to talk to a live broker to get
the new metadata.
Thanks,
Jun
On Thu, May 29, 2014 at 2:38 AM, Chengwei Yang
<ch...@gmail.com>wrote:
> Hi List,
>
> I'm new to kafka, feel sorry if this has been asked, I didn't figure out
> my answer by googling, so asked here. Thanks in advance!
>
> I'm following the kafka quick start
> http://kafka.apache.org/documentation.html#quickstart
>
> and managed setup a kafka cluster with two brokers, which connected to a
> zookeeper service, consists of 3 zk hosts.
>
> The working state looks like below
> ----------------------8<--------------- topic describe ----8<-------------
> Topic:test PartitionCount:3 ReplicationFactor:2 Configs:
> Topic: test Partition: 0 Leader: 0 Replicas: 0,1
> Isr: 0,1
> Topic: test Partition: 1 Leader: 1 Replicas: 1,0
> Isr: 1,0
> Topic: test Partition: 2 Leader: 0 Replicas: 0,1
> Isr: 0,1
> --------------------8<---------------- end ------------8<--------------
>
> -------------------8<------------- console-producer.sh -------8<-------
> $ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> further details.
> this is a test message
> another test message
> haha
> --------------------8<---------------- end ------------8<--------------
>
> -------------------8<------------- console-consumer.sh -------8<-------
> $ ./bin/kafka-console-consumer.sh --zookeeper <my zk service here> --topic
> test --from-beginning
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> further details.
> this is a test message
> another test message
> haha
> -----------------8<---------------- end ----------------8<-----------
>
> So far, everything works, however, when I following the quick start to
> kill the leader broker for topic *test*, say that one has broker.id 0,
> then error happended, like below
>
> -------------------8<------------- console-consumer.sh -------8<-------
> [2014-05-29 17:05:26,446] WARN Reconnect due to socket error: Received -1
> when reading from channel, socket has likely been closed.
> (kafka.consumer.SimpleConsumer)
> [2014-05-29 17:05:26,460] ERROR
> [ConsumerFetcherThread-console-consumer-40784_<hostname>-1401354179291-21cd16e1-0-0],
> Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 1401;
> ClientId:
> console-consumer-40784-ConsumerFetcherThread-console-consumer-40784_<hostname>-1401354179291-21cd16e1-0-0;
> ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [test,0] ->
> PartitionFetchInfo(0,1048576),[test,2] -> PartitionFetchInfo(0,1048576)
> (kafka.consumer.ConsumerFetcherThread)
> java.net.ConnectException: Connection refused
> at sun.nio.ch.Net.connect0(Native Method)
> at sun.nio.ch.Net.connect(Net.java:465)
> at sun.nio.ch.Net.connect(Net.java:457)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
> at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> -----------------8<---------------- end ----------------8<-----------
>
> And when I send another message by using console-producer, it fails with
> the same error, connect refused.
>
> From the kafka-topic.sh, it says the leader already failed-over, like
> below.
>
> ----------------------8<--------------- topic describe ----8<-------------
> Topic:test PartitionCount:3 ReplicationFactor:2 Configs:
> Topic: test Partition: 0 Leader: 1 Replicas: 0,1
> Isr: 1
> Topic: test Partition: 1 Leader: 1 Replicas: 1,0
> Isr: 1
> Topic: test Partition: 2 Leader: 1 Replicas: 0,1
> Isr: 1
> -----------------8<---------------- end ----------------8<-----------
>
> However, from the quick start, it seems it shouldn't fail at all.
>
> --
> Thanks,
> Chengwei
>
Re: producer & consumer fail if the leader failed-over
Posted by Chengwei Yang <ch...@gmail.com>.
Hi Robert,
Here is the server.log attached.
--
Thanks,
Chengwei
On Thu, May 29, 2014 at 07:22:06AM -0700, Robert Hodges wrote:
> Hi Chengwei,
>
> What do you see in the Kafka server logs? This may help you diagnose the
> failure.
>
> Cheers, Robert
>
>
> On Thu, May 29, 2014 at 2:38 AM, Chengwei Yang <ch...@gmail.com>
> wrote:
>
> > Hi List,
> >
> > I'm new to kafka, feel sorry if this has been asked, I didn't figure out
> > my answer by googling, so asked here. Thanks in advance!
> >
> > I'm following the kafka quick start
> > http://kafka.apache.org/documentation.html#quickstart
> >
> > and managed setup a kafka cluster with two brokers, which connected to a
> > zookeeper service, consists of 3 zk hosts.
> >
> > The working state looks like below
> > ----------------------8<--------------- topic describe ----8<-------------
> > Topic:test PartitionCount:3 ReplicationFactor:2 Configs:
> > Topic: test Partition: 0 Leader: 0 Replicas: 0,1
> > Isr: 0,1
> > Topic: test Partition: 1 Leader: 1 Replicas: 1,0
> > Isr: 1,0
> > Topic: test Partition: 2 Leader: 0 Replicas: 0,1
> > Isr: 0,1
> > --------------------8<---------------- end ------------8<--------------
> >
> > -------------------8<------------- console-producer.sh -------8<-------
> > $ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> > SLF4J: Defaulting to no-operation (NOP) logger implementation
> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> > further details.
> > this is a test message
> > another test message
> > haha
> > --------------------8<---------------- end ------------8<--------------
> >
> > -------------------8<------------- console-consumer.sh -------8<-------
> > $ ./bin/kafka-console-consumer.sh --zookeeper <my zk service here> --topic
> > test --from-beginning
> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> > SLF4J: Defaulting to no-operation (NOP) logger implementation
> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> > further details.
> > this is a test message
> > another test message
> > haha
> > -----------------8<---------------- end ----------------8<-----------
> >
> > So far, everything works, however, when I following the quick start to
> > kill the leader broker for topic *test*, say that one has broker.id 0,
> > then error happended, like below
> >
> > -------------------8<------------- console-consumer.sh -------8<-------
> > [2014-05-29 17:05:26,446] WARN Reconnect due to socket error: Received -1
> > when reading from channel, socket has likely been closed.
> > (kafka.consumer.SimpleConsumer)
> > [2014-05-29 17:05:26,460] ERROR
> > [ConsumerFetcherThread-console-consumer-40784_<hostname>-1401354179291-21cd16e1-0-0],
> > Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 1401;
> > ClientId:
> > console-consumer-40784-ConsumerFetcherThread-console-consumer-40784_<hostname>-1401354179291-21cd16e1-0-0;
> > ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [test,0] ->
> > PartitionFetchInfo(0,1048576),[test,2] -> PartitionFetchInfo(0,1048576)
> > (kafka.consumer.ConsumerFetcherThread)
> > java.net.ConnectException: Connection refused
> > at sun.nio.ch.Net.connect0(Native Method)
> > at sun.nio.ch.Net.connect(Net.java:465)
> > at sun.nio.ch.Net.connect(Net.java:457)
> > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
> > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> > at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> > at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> > at
> > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> > at
> > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > at
> > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> > at
> > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> > at
> > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > at
> > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
> > at
> > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> > at
> > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
> > at
> > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> > at
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > -----------------8<---------------- end ----------------8<-----------
> >
> > And when I send another message by using console-producer, it fails with
> > the same error, connect refused.
> >
> > From the kafka-topic.sh, it says the leader already failed-over, like
> > below.
> >
> > ----------------------8<--------------- topic describe ----8<-------------
> > Topic:test PartitionCount:3 ReplicationFactor:2 Configs:
> > Topic: test Partition: 0 Leader: 1 Replicas: 0,1
> > Isr: 1
> > Topic: test Partition: 1 Leader: 1 Replicas: 1,0
> > Isr: 1
> > Topic: test Partition: 2 Leader: 1 Replicas: 0,1
> > Isr: 1
> > -----------------8<---------------- end ----------------8<-----------
> >
> > However, from the quick start, it seems it shouldn't fail at all.
> >
> > --
> > Thanks,
> > Chengwei
> >
Re: producer & consumer fail if the leader failed-over
Posted by Robert Hodges <be...@gmail.com>.
Hi Chengwei,
What do you see in the Kafka server logs? This may help you diagnose the
failure.
Cheers, Robert
On Thu, May 29, 2014 at 2:38 AM, Chengwei Yang <ch...@gmail.com>
wrote:
> Hi List,
>
> I'm new to kafka, feel sorry if this has been asked, I didn't figure out
> my answer by googling, so asked here. Thanks in advance!
>
> I'm following the kafka quick start
> http://kafka.apache.org/documentation.html#quickstart
>
> and managed setup a kafka cluster with two brokers, which connected to a
> zookeeper service, consists of 3 zk hosts.
>
> The working state looks like below
> ----------------------8<--------------- topic describe ----8<-------------
> Topic:test PartitionCount:3 ReplicationFactor:2 Configs:
> Topic: test Partition: 0 Leader: 0 Replicas: 0,1
> Isr: 0,1
> Topic: test Partition: 1 Leader: 1 Replicas: 1,0
> Isr: 1,0
> Topic: test Partition: 2 Leader: 0 Replicas: 0,1
> Isr: 0,1
> --------------------8<---------------- end ------------8<--------------
>
> -------------------8<------------- console-producer.sh -------8<-------
> $ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> further details.
> this is a test message
> another test message
> haha
> --------------------8<---------------- end ------------8<--------------
>
> -------------------8<------------- console-consumer.sh -------8<-------
> $ ./bin/kafka-console-consumer.sh --zookeeper <my zk service here> --topic
> test --from-beginning
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> further details.
> this is a test message
> another test message
> haha
> -----------------8<---------------- end ----------------8<-----------
>
> So far, everything works, however, when I following the quick start to
> kill the leader broker for topic *test*, say that one has broker.id 0,
> then error happended, like below
>
> -------------------8<------------- console-consumer.sh -------8<-------
> [2014-05-29 17:05:26,446] WARN Reconnect due to socket error: Received -1
> when reading from channel, socket has likely been closed.
> (kafka.consumer.SimpleConsumer)
> [2014-05-29 17:05:26,460] ERROR
> [ConsumerFetcherThread-console-consumer-40784_<hostname>-1401354179291-21cd16e1-0-0],
> Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 1401;
> ClientId:
> console-consumer-40784-ConsumerFetcherThread-console-consumer-40784_<hostname>-1401354179291-21cd16e1-0-0;
> ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [test,0] ->
> PartitionFetchInfo(0,1048576),[test,2] -> PartitionFetchInfo(0,1048576)
> (kafka.consumer.ConsumerFetcherThread)
> java.net.ConnectException: Connection refused
> at sun.nio.ch.Net.connect0(Native Method)
> at sun.nio.ch.Net.connect(Net.java:465)
> at sun.nio.ch.Net.connect(Net.java:457)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
> at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
> at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
> at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> -----------------8<---------------- end ----------------8<-----------
>
> And when I send another message by using console-producer, it fails with
> the same error, connect refused.
>
> From the kafka-topic.sh, it says the leader already failed-over, like
> below.
>
> ----------------------8<--------------- topic describe ----8<-------------
> Topic:test PartitionCount:3 ReplicationFactor:2 Configs:
> Topic: test Partition: 0 Leader: 1 Replicas: 0,1
> Isr: 1
> Topic: test Partition: 1 Leader: 1 Replicas: 1,0
> Isr: 1
> Topic: test Partition: 2 Leader: 1 Replicas: 0,1
> Isr: 1
> -----------------8<---------------- end ----------------8<-----------
>
> However, from the quick start, it seems it shouldn't fail at all.
>
> --
> Thanks,
> Chengwei
>