You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Dennis Haller <dh...@talemetry.com> on 2013/07/08 19:25:46 UTC

Kafka issue with "Reconnect due to socket error"

I have a 4-broker Kafka system running in Amazon EC2,  and we are using
Kafka 0.8 beta1. Most of the standard default configurations remain
unchanged. Running the kafka tool ConsumerOffsetChecker is causing socket
errors to occur. Some of these socket reset errors are also in the kafka
server log.

 Usually the first few message topics are printed, but then every
subsequent one is causing a socket error. The entire command completes in
only 3 or 4 seconds, so I don't know where the timeouts are coming from.

Do you have any suggestions?


Here is the log:



bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group
ArchivingConsumer --zkconnect ec2-23-22-34-191.compute-1.amazonaws.com:2181
[2013-07-08 10:14:33,172] INFO Starting ZkClient event thread.
(org.I0Itec.zkclient.ZkEventThread)
[2013-07-08 10:14:33,186] INFO Client
environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT
(org.apache.zookeeper.ZooKeeper)
[2013-07-08 10:14:33,186] INFO Client
environment:host.name=ip-10-41-3-33.ec2.internal
(org.apache.zookeeper.ZooKeeper)
[2013-07-08 10:14:33,186] INFO Client environment:java.version=1.7.0_21
(org.apache.zookeeper.ZooKeeper)
[2013-07-08 10:14:33,186] INFO Client environment:java.vendor=Oracle
Corporation (org.apache.zookeeper.ZooKeeper)
[2013-07-08 10:14:33,186] INFO Client
environment:java.home=/usr/lib/jvm/java-7-openjdk-amd64/jre
(org.apache.zookeeper.ZooKeeper)
[2013-07-08 10:14:33,187] INFO Client
environment:java.class.path=:bin/../core/target/scala-2.8.0/kafka_2.8.0-0.8.0-beta1.jar:bin/../core/target/scala-2.8.0/kafka-assembly-0.8.0-beta1-deps.jar:bin/../perf/target/scala-2.8.0/kafka-perf_2.8.0-0.8.0-beta1.jar:bin/../libs/*.jar:bin/../kafka*.jar
(org.apache.zookeeper.ZooKeeper)
[2013-07-08 10:14:33,187] INFO Client
environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib/jni:/lib:/usr/lib
(org.apache.zookeeper.ZooKeeper)
[2013-07-08 10:14:33,187] INFO Client environment:java.io.tmpdir=/tmp
(org.apache.zookeeper.ZooKeeper)
[2013-07-08 10:14:33,187] INFO Client environment:java.compiler=<NA>
(org.apache.zookeeper.ZooKeeper)
[2013-07-08 10:14:33,187] INFO Client environment:os.name=Linux
(org.apache.zookeeper.ZooKeeper)
[2013-07-08 10:14:33,187] INFO Client environment:os.arch=amd64
(org.apache.zookeeper.ZooKeeper)
[2013-07-08 10:14:33,188] INFO Client
environment:os.version=3.2.0-32-virtual (org.apache.zookeeper.ZooKeeper)
[2013-07-08 10:14:33,188] INFO Client environment:user.name=ubuntu
(org.apache.zookeeper.ZooKeeper)
[2013-07-08 10:14:33,188] INFO Client environment:user.home=/home/ubuntu
(org.apache.zookeeper.ZooKeeper)
[2013-07-08 10:14:33,188] INFO Client
environment:user.dir=/opt/kafka/kafka-0.8 (org.apache.zookeeper.ZooKeeper)
[2013-07-08 10:14:33,189] INFO Initiating client connection, connectString=
ec2-23-22-34-191.compute-1.amazonaws.com:2181 sessionTimeout=30000
watcher=org.I0Itec.zkclient.ZkClient@737c45ee(org.apache.zookeeper.ZooKeeper)
[2013-07-08 10:14:33,216] INFO Opening socket connection to server
ec2-23-22-34-191.compute-1.amazonaws.com/10.122.169.44:2181(org.apache.zookeeper.ClientCnxn)
[2013-07-08 10:14:33,226] INFO Socket connection established to
ec2-23-22-34-191.compute-1.amazonaws.com/10.122.169.44:2181, initiating
session (org.apache.zookeeper.ClientCnxn)
[2013-07-08 10:14:33,300] INFO Session establishment complete on server
ec2-23-22-34-191.compute-1.amazonaws.com/10.122.169.44:2181, sessionid =
0x13fb0932c3a002a, negotiated timeout = 30000
(org.apache.zookeeper.ClientCnxn)
[2013-07-08 10:14:33,302] INFO zookeeper state changed (SyncConnected)
(org.I0Itec.zkclient.ZkClient)
Group           Topic                          Pid Offset          logSize
        Lag             Owner
ArchivingConsumer qa-M-Candidate-CrmStatus-Events 0   30              30
           0
ArchivingConsumer_ip-10-121-10-80.ec2.internal-1373285289428-6aeafed6-0
ArchivingConsumer qa-M-Match                     0   36              36
         0
ArchivingConsumer_ip-10-121-10-80.ec2.internal-1373285289428-6aeafed6-0
ArchivingConsumer qa-M-friday-01                 0   100             100
          0
ArchivingConsumer_ip-10-121-10-80.ec2.internal-1373285289428-6aeafed6-0
ArchivingConsumer qa-M-test-01                   0   200             200
          0
ArchivingConsumer_ip-10-121-10-80.ec2.internal-1373285289428-6aeafed6-0
[2013-07-08 10:14:35,223] INFO Reconnect due to socket error:
 (kafka.consumer.SimpleConsumer)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
 at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
 at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125)
at
kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72)
 at
kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90)
at
kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
 at
kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
at
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
 at scala.collection.immutable.List.foreach(List.scala:45)
at
kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:89)
 at
kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
at
kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
 at
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
at scala.collection.immutable.List.foreach(List.scala:45)
 at kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153)
at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)

. . . repeated for each topic . . .

ArchivingConsumer qa-f1-Match                    0   14              14
         0
ArchivingConsumer_ip-10-121-10-80.ec2.internal-1373285289428-6aeafed6-0
[2013-07-08 10:14:35,906] INFO Reconnect due to socket error:
 (kafka.consumer.SimpleConsumer)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125)
at
kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72)
at
kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90)
at
kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
at
kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
at
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
at scala.collection.immutable.List.foreach(List.scala:45)
at
kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:89)
at
kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
at
kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
at
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
at scala.collection.immutable.List.foreach(List.scala:45)
at kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153)
at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)
ArchivingConsumer qa-f3-T-Campaign-Email         0   1               1
          0
ArchivingConsumer_ip-10-121-10-80.ec2.internal-1373285289428-6aeafed6-0
[2013-07-08 10:14:35,918] INFO Terminate ZkClient event thread.
(org.I0Itec.zkclient.ZkEventThread)
[2013-07-08 10:14:35,923] INFO EventThread shut down
(org.apache.zookeeper.ClientCnxn)
[2013-07-08 10:14:35,923] INFO Session: 0x13fb0932c3a002a closed
(org.apache.zookeeper.ZooKeeper)

Re: Kafka issue with "Reconnect due to socket error"

Posted by Jun Rao <ju...@gmail.com>.
The reconnect is due to the socket connection to Kafka broker. Technically,
reconnect worked. So the tool still produces the output. The reconnection
is weird though. Do you see the same reconnection when you run console
consumer?

Thanks,

Jun


On Tue, Jul 9, 2013 at 11:06 AM, Dennis Haller <dh...@talemetry.com>wrote:

> Hi,
>
> Thanks for your reply.
>
> I can't see anything in the broker logs, , except this keeps coming up in
> the zookeeper log (instance 1 out of 3):
> WARN  [SyncThread:1:FileTxnLog@321] - fsync-ing the write ahead log in
> SyncThread:1 took 1298ms which will adversely effect operation latency. See
> the ZooKeeper troubleshooting guide
>
>
> I'm not sure if the connection error is from zookeeper or a kafka broker.
> In an edited version of the log file, you see that a normal cycle of
> ConsmerOffsetChecker requires 5 ZK queries before the stats are printed
> out, but when the Exception is thrown it is after 4 queries have completed
> and somewhere in the 5th query:
>
>
> [2013-07-09 10:45:41,845] INFO Client
> environment:user.dir=/opt/kafka/kafka-0.8 (org.apache.zookeeper.ZooKeeper)
>  18 [2013-07-09 10:45:41,846] INFO Initiating client connection,
> connectString=ec2-23-22-34-191.compute-1.amazonaws.comsessionTimeout=30000
> watcher=org.I0Itec.zkclient.ZkClient@3c0db454 (>
>  19 [2013-07-09 10:45:41,848] DEBUG zookeeper.disableAutoWatchReset is
> false (org.apache.zookeeper.ClientCnxn)
>  20 [2013-07-09 10:45:41,934] DEBUG Awaiting connection to Zookeeper server
> (org.I0Itec.zkclient.ZkClient)
>  21 [2013-07-09 10:45:41,934] DEBUG Waiting for keeper state SyncConnected
> (org.I0Itec.zkclient.ZkClient)
>  22 [2013-07-09 10:45:41,934] INFO Opening socket connection to server
>
> ec2-23-22-34-191.compute-1.amazonaws.com/23.22.34.191:2181(org.apache.zookeeper.ClientCnxn)
>  23 [2013-07-09 10:45:42,020] INFO Socket connection established to
> ec2-23-22-34-191.compute-1.amazonaws.com/23.22.34.191:2181, initiating
> session (org.apache.zookeeper.ClientCnxn)
>  24 [2013-07-09 10:45:42,024] DEBUG Session establishment request sent on
>
> ec2-23-22-34-191.compute-1.amazonaws.com/23.22.34.191:2181(org.apache.zookeeper.ClientCnxn)
>  25 [2013-07-09 10:45:42,114] INFO Session establishment complete on server
> ec2-23-22-34-191.compute-1.amazonaws.com/23.22.34.191:2181, sessionid =
> 0x13fc464f0420009, negotiated timeout = >
>  26 [2013-07-09 10:45:42,118] DEBUG Received event: WatchedEvent
> state:SyncConnected type:None path:null (org.I0Itec.zkclient.ZkClient)
>  27 [2013-07-09 10:45:42,118] INFO zookeeper state changed (SyncConnected)
> (org.I0Itec.zkclient.ZkClient)
>  28 [2013-07-09 10:45:42,118] DEBUG Leaving process event
> (org.I0Itec.zkclient.ZkClient)
>  29 [2013-07-09 10:45:42,118] DEBUG State is SyncConnected
> (org.I0Itec.zkclient.ZkClient)
>  30 [2013-07-09 10:45:42,265] DEBUG Reading reply
> sessionid:0x13fc464f0420009, packet:: clientPath:null serverPath:null
> finished:false header:: 1,8  replyHeader:: 1,12884971368,0  request:>
>  31 Group           Topic                          Pid Offset
>  logSize         Lag             Owner
>  32
>  33 [2013-07-09 10:45:42,394] DEBUG Reading reply
> sessionid:0x13fc464f0420009, ..........request::
> '/brokers/topics/qa-M-Candidate-CrmStage-Events,F
>  (org.apache.zookeeper.ClientCnxn)
>  34 [2013-07-09 10:45:42,684] DEBUG Reading reply
> sessionid:0x13fc464f0420009, ..........request::
> '/consumers/KafkaMirror/offsets/qa-M-Candidate-CrmStage-Events/0,F
>  (org.apache.zookeeper>
>  35 [2013-07-09 10:45:42,768] DEBUG Reading reply
> sessionid:0x13fc464f0420009, ..........request::
> '/consumers/KafkaMirror/owners/qa-M-Candidate-CrmStage-Events/0,F
>  (org.apache.zookeeper.>
>  36 [2013-07-09 10:45:42,849] DEBUG Reading reply
> sessionid:0x13fc464f0420009, ..........request::
> '/brokers/topics/qa-M-Candidate-CrmStage-Events/partitions/0/state,F
>  (org.apache.zookeep>
>  37 [2013-07-09 10:45:42,971] DEBUG Reading reply
> sessionid:0x13fc464f0420009, ..........request:: '/brokers/ids/2,F
>  (org.apache.zookeeper.ClientCnxn)
>  38 KafkaMirror     qa-M-Candidate-CrmStage-Events 0   15              15
>            0
> KafkaMirror_ip-10-41-3-33.ec2.internal-1373310093967-e20aba69-0
>  39
>  40 [2013-07-09 10:45:44,061] DEBUG Reading reply
> sessionid:0x13fc464f0420009, ..........request::
> '/brokers/topics/qa-M-Match,F  (org.apache.zookeeper.ClientCnxn)
>  41 [2013-07-09 10:45:44,150] DEBUG Reading reply
> sessionid:0x13fc464f0420009, ..........request::
> '/consumers/KafkaMirror/offsets/qa-M-Match/0,F
>  (org.apache.zookeeper.ClientCnxn)
>  42 [2013-07-09 10:45:44,233] DEBUG Reading reply
> sessionid:0x13fc464f0420009, ..........request::
> '/consumers/KafkaMirror/owners/qa-M-Match/0,F
>  (org.apache.zookeeper.ClientCnxn)
>  43 [2013-07-09 10:45:44,314] DEBUG Reading reply
> sessionid:0x13fc464f0420009, ..........request::
> '/brokers/topics/qa-M-Match/partitions/0/state,F
>  (org.apache.zookeeper.ClientCnxn)
>  44 [2013-07-09 10:45:44,329] INFO Reconnect due to socket error:
>  (kafka.consumer.SimpleConsumer)
>  45 java.nio.channels.ClosedChannelException
>  46 .........
>  47   at
> kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153)
>  48   at
> kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)
>  49 KafkaMirror     qa-M-Match                     0   76              76
>            0
> KafkaMirror_ip-10-41-3-33.ec2.internal-1373310093967-e20aba69-0
>  50
>
>
>
> On Mon, Jul 8, 2013 at 8:56 PM, Jun Rao <ju...@gmail.com> wrote:
>
> > The consumer reconnects because the broker closed the socket. Any
> > error/exception on the broker side around the same time?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Mon, Jul 8, 2013 at 10:25 AM, Dennis Haller <dhaller@talemetry.com
> > >wrote:
> >
> > > I have a 4-broker Kafka system running in Amazon EC2,  and we are using
> > > Kafka 0.8 beta1. Most of the standard default configurations remain
> > > unchanged. Running the kafka tool ConsumerOffsetChecker is causing
> socket
> > > errors to occur. Some of these socket reset errors are also in the
> kafka
> > > server log.
> > >
> > >  Usually the first few message topics are printed, but then every
> > > subsequent one is causing a socket error. The entire command completes
> in
> > > only 3 or 4 seconds, so I don't know where the timeouts are coming
> from.
> > >
> > > Do you have any suggestions?
> > >
> > >
> > > Here is the log:
> > >
> > >
> > >
> > > bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group
> > > ArchivingConsumer --zkconnect
> > > ec2-23-22-34-191.compute-1.amazonaws.com:2181
> > > [2013-07-08 10:14:33,172] INFO Starting ZkClient event thread.
> > > (org.I0Itec.zkclient.ZkEventThread)
> > > [2013-07-08 10:14:33,186] INFO Client
> > > environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47
> > GMT
> > > (org.apache.zookeeper.ZooKeeper)
> > > [2013-07-08 10:14:33,186] INFO Client
> > > environment:host.name=ip-10-41-3-33.ec2.internal
> > > (org.apache.zookeeper.ZooKeeper)
> > > [2013-07-08 10:14:33,186] INFO Client environment:java.version=1.7.0_21
> > > (org.apache.zookeeper.ZooKeeper)
> > > [2013-07-08 10:14:33,186] INFO Client environment:java.vendor=Oracle
> > > Corporation (org.apache.zookeeper.ZooKeeper)
> > > [2013-07-08 10:14:33,186] INFO Client
> > > environment:java.home=/usr/lib/jvm/java-7-openjdk-amd64/jre
> > > (org.apache.zookeeper.ZooKeeper)
> > > [2013-07-08 10:14:33,187] INFO Client
> > >
> > >
> >
> environment:java.class.path=:bin/../core/target/scala-2.8.0/kafka_2.8.0-0.8.0-beta1.jar:bin/../core/target/scala-2.8.0/kafka-assembly-0.8.0-beta1-deps.jar:bin/../perf/target/scala-2.8.0/kafka-perf_2.8.0-0.8.0-beta1.jar:bin/../libs/*.jar:bin/../kafka*.jar
> > > (org.apache.zookeeper.ZooKeeper)
> > > [2013-07-08 10:14:33,187] INFO Client
> > >
> > >
> >
> environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib/jni:/lib:/usr/lib
> > > (org.apache.zookeeper.ZooKeeper)
> > > [2013-07-08 10:14:33,187] INFO Client environment:java.io.tmpdir=/tmp
> > > (org.apache.zookeeper.ZooKeeper)
> > > [2013-07-08 10:14:33,187] INFO Client environment:java.compiler=<NA>
> > > (org.apache.zookeeper.ZooKeeper)
> > > [2013-07-08 10:14:33,187] INFO Client environment:os.name=Linux
> > > (org.apache.zookeeper.ZooKeeper)
> > > [2013-07-08 10:14:33,187] INFO Client environment:os.arch=amd64
> > > (org.apache.zookeeper.ZooKeeper)
> > > [2013-07-08 10:14:33,188] INFO Client
> > > environment:os.version=3.2.0-32-virtual
> (org.apache.zookeeper.ZooKeeper)
> > > [2013-07-08 10:14:33,188] INFO Client environment:user.name=ubuntu
> > > (org.apache.zookeeper.ZooKeeper)
> > > [2013-07-08 10:14:33,188] INFO Client
> environment:user.home=/home/ubuntu
> > > (org.apache.zookeeper.ZooKeeper)
> > > [2013-07-08 10:14:33,188] INFO Client
> > > environment:user.dir=/opt/kafka/kafka-0.8
> > (org.apache.zookeeper.ZooKeeper)
> > > [2013-07-08 10:14:33,189] INFO Initiating client connection,
> > connectString=
> > > ec2-23-22-34-191.compute-1.amazonaws.com:2181 sessionTimeout=30000
> > > watcher=org.I0Itec.zkclient.ZkClient@737c45ee
> > > (org.apache.zookeeper.ZooKeeper)
> > > [2013-07-08 10:14:33,216] INFO Opening socket connection to server
> > >
> > >
> >
> ec2-23-22-34-191.compute-1.amazonaws.com/10.122.169.44:2181(org.apache.zookeeper.ClientCnxn)
> > > [2013-07-08 10:14:33,226] INFO Socket connection established to
> > > ec2-23-22-34-191.compute-1.amazonaws.com/10.122.169.44:2181,
> initiating
> > > session (org.apache.zookeeper.ClientCnxn)
> > > [2013-07-08 10:14:33,300] INFO Session establishment complete on server
> > > ec2-23-22-34-191.compute-1.amazonaws.com/10.122.169.44:2181,
> sessionid =
> > > 0x13fb0932c3a002a, negotiated timeout = 30000
> > > (org.apache.zookeeper.ClientCnxn)
> > > [2013-07-08 10:14:33,302] INFO zookeeper state changed (SyncConnected)
> > > (org.I0Itec.zkclient.ZkClient)
> > > Group           Topic                          Pid Offset
> >  logSize
> > >         Lag             Owner
> > > ArchivingConsumer qa-M-Candidate-CrmStatus-Events 0   30
>  30
> > >            0
> > > ArchivingConsumer_ip-10-121-10-80.ec2.internal-1373285289428-6aeafed6-0
> > > ArchivingConsumer qa-M-Match                     0   36              36
> > >          0
> > > ArchivingConsumer_ip-10-121-10-80.ec2.internal-1373285289428-6aeafed6-0
> > > ArchivingConsumer qa-M-friday-01                 0   100
> 100
> > >           0
> > > ArchivingConsumer_ip-10-121-10-80.ec2.internal-1373285289428-6aeafed6-0
> > > ArchivingConsumer qa-M-test-01                   0   200
> 200
> > >           0
> > > ArchivingConsumer_ip-10-121-10-80.ec2.internal-1373285289428-6aeafed6-0
> > > [2013-07-08 10:14:35,223] INFO Reconnect due to socket error:
> > >  (kafka.consumer.SimpleConsumer)
> > > java.nio.channels.ClosedChannelException
> > > at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
> > >  at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > >  at
> > >
> kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125)
> > > at
> > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72)
> > >  at
> > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90)
> > > at
> > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
> > >  at
> > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
> > > at
> > >
> > >
> >
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> > >  at scala.collection.immutable.List.foreach(List.scala:45)
> > > at
> > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:89)
> > >  at
> > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
> > > at
> > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
> > >  at
> > >
> > >
> >
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> > > at scala.collection.immutable.List.foreach(List.scala:45)
> > >  at
> > >
> kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153)
> > > at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)
> > >
> > > . . . repeated for each topic . . .
> > >
> > > ArchivingConsumer qa-f1-Match                    0   14              14
> > >          0
> > > ArchivingConsumer_ip-10-121-10-80.ec2.internal-1373285289428-6aeafed6-0
> > > [2013-07-08 10:14:35,906] INFO Reconnect due to socket error:
> > >  (kafka.consumer.SimpleConsumer)
> > > java.nio.channels.ClosedChannelException
> > > at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
> > > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
> > > at
> > >
> > >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > > at
> > kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125)
> > > at
> > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72)
> > > at
> > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90)
> > > at
> > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
> > > at
> > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
> > > at
> > >
> > >
> >
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> > > at scala.collection.immutable.List.foreach(List.scala:45)
> > > at
> > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:89)
> > > at
> > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
> > > at
> > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
> > > at
> > >
> > >
> >
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> > > at scala.collection.immutable.List.foreach(List.scala:45)
> > > at
> > kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153)
> > > at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)
> > > ArchivingConsumer qa-f3-T-Campaign-Email         0   1               1
> > >           0
> > > ArchivingConsumer_ip-10-121-10-80.ec2.internal-1373285289428-6aeafed6-0
> > > [2013-07-08 10:14:35,918] INFO Terminate ZkClient event thread.
> > > (org.I0Itec.zkclient.ZkEventThread)
> > > [2013-07-08 10:14:35,923] INFO EventThread shut down
> > > (org.apache.zookeeper.ClientCnxn)
> > > [2013-07-08 10:14:35,923] INFO Session: 0x13fb0932c3a002a closed
> > > (org.apache.zookeeper.ZooKeeper)
> > >
> >
>

Re: Kafka issue with "Reconnect due to socket error"

Posted by Dennis Haller <dh...@talemetry.com>.
Hi,

Thanks for your reply.

I can't see anything in the broker logs, , except this keeps coming up in
the zookeeper log (instance 1 out of 3):
WARN  [SyncThread:1:FileTxnLog@321] - fsync-ing the write ahead log in
SyncThread:1 took 1298ms which will adversely effect operation latency. See
the ZooKeeper troubleshooting guide


I'm not sure if the connection error is from zookeeper or a kafka broker.
In an edited version of the log file, you see that a normal cycle of
ConsmerOffsetChecker requires 5 ZK queries before the stats are printed
out, but when the Exception is thrown it is after 4 queries have completed
and somewhere in the 5th query:


[2013-07-09 10:45:41,845] INFO Client
environment:user.dir=/opt/kafka/kafka-0.8 (org.apache.zookeeper.ZooKeeper)
 18 [2013-07-09 10:45:41,846] INFO Initiating client connection,
connectString=ec2-23-22-34-191.compute-1.amazonaws.com sessionTimeout=30000
watcher=org.I0Itec.zkclient.ZkClient@3c0db454 (>
 19 [2013-07-09 10:45:41,848] DEBUG zookeeper.disableAutoWatchReset is
false (org.apache.zookeeper.ClientCnxn)
 20 [2013-07-09 10:45:41,934] DEBUG Awaiting connection to Zookeeper server
(org.I0Itec.zkclient.ZkClient)
 21 [2013-07-09 10:45:41,934] DEBUG Waiting for keeper state SyncConnected
(org.I0Itec.zkclient.ZkClient)
 22 [2013-07-09 10:45:41,934] INFO Opening socket connection to server
ec2-23-22-34-191.compute-1.amazonaws.com/23.22.34.191:2181(org.apache.zookeeper.ClientCnxn)
 23 [2013-07-09 10:45:42,020] INFO Socket connection established to
ec2-23-22-34-191.compute-1.amazonaws.com/23.22.34.191:2181, initiating
session (org.apache.zookeeper.ClientCnxn)
 24 [2013-07-09 10:45:42,024] DEBUG Session establishment request sent on
ec2-23-22-34-191.compute-1.amazonaws.com/23.22.34.191:2181(org.apache.zookeeper.ClientCnxn)
 25 [2013-07-09 10:45:42,114] INFO Session establishment complete on server
ec2-23-22-34-191.compute-1.amazonaws.com/23.22.34.191:2181, sessionid =
0x13fc464f0420009, negotiated timeout = >
 26 [2013-07-09 10:45:42,118] DEBUG Received event: WatchedEvent
state:SyncConnected type:None path:null (org.I0Itec.zkclient.ZkClient)
 27 [2013-07-09 10:45:42,118] INFO zookeeper state changed (SyncConnected)
(org.I0Itec.zkclient.ZkClient)
 28 [2013-07-09 10:45:42,118] DEBUG Leaving process event
(org.I0Itec.zkclient.ZkClient)
 29 [2013-07-09 10:45:42,118] DEBUG State is SyncConnected
(org.I0Itec.zkclient.ZkClient)
 30 [2013-07-09 10:45:42,265] DEBUG Reading reply
sessionid:0x13fc464f0420009, packet:: clientPath:null serverPath:null
finished:false header:: 1,8  replyHeader:: 1,12884971368,0  request:>
 31 Group           Topic                          Pid Offset
 logSize         Lag             Owner
 32
 33 [2013-07-09 10:45:42,394] DEBUG Reading reply
sessionid:0x13fc464f0420009, ..........request::
'/brokers/topics/qa-M-Candidate-CrmStage-Events,F
 (org.apache.zookeeper.ClientCnxn)
 34 [2013-07-09 10:45:42,684] DEBUG Reading reply
sessionid:0x13fc464f0420009, ..........request::
'/consumers/KafkaMirror/offsets/qa-M-Candidate-CrmStage-Events/0,F
 (org.apache.zookeeper>
 35 [2013-07-09 10:45:42,768] DEBUG Reading reply
sessionid:0x13fc464f0420009, ..........request::
'/consumers/KafkaMirror/owners/qa-M-Candidate-CrmStage-Events/0,F
 (org.apache.zookeeper.>
 36 [2013-07-09 10:45:42,849] DEBUG Reading reply
sessionid:0x13fc464f0420009, ..........request::
'/brokers/topics/qa-M-Candidate-CrmStage-Events/partitions/0/state,F
 (org.apache.zookeep>
 37 [2013-07-09 10:45:42,971] DEBUG Reading reply
sessionid:0x13fc464f0420009, ..........request:: '/brokers/ids/2,F
 (org.apache.zookeeper.ClientCnxn)
 38 KafkaMirror     qa-M-Candidate-CrmStage-Events 0   15              15
           0
KafkaMirror_ip-10-41-3-33.ec2.internal-1373310093967-e20aba69-0
 39
 40 [2013-07-09 10:45:44,061] DEBUG Reading reply
sessionid:0x13fc464f0420009, ..........request::
'/brokers/topics/qa-M-Match,F  (org.apache.zookeeper.ClientCnxn)
 41 [2013-07-09 10:45:44,150] DEBUG Reading reply
sessionid:0x13fc464f0420009, ..........request::
'/consumers/KafkaMirror/offsets/qa-M-Match/0,F
 (org.apache.zookeeper.ClientCnxn)
 42 [2013-07-09 10:45:44,233] DEBUG Reading reply
sessionid:0x13fc464f0420009, ..........request::
'/consumers/KafkaMirror/owners/qa-M-Match/0,F
 (org.apache.zookeeper.ClientCnxn)
 43 [2013-07-09 10:45:44,314] DEBUG Reading reply
sessionid:0x13fc464f0420009, ..........request::
'/brokers/topics/qa-M-Match/partitions/0/state,F
 (org.apache.zookeeper.ClientCnxn)
 44 [2013-07-09 10:45:44,329] INFO Reconnect due to socket error:
 (kafka.consumer.SimpleConsumer)
 45 java.nio.channels.ClosedChannelException
 46 .........
 47   at
kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153)
 48   at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)
 49 KafkaMirror     qa-M-Match                     0   76              76
           0
KafkaMirror_ip-10-41-3-33.ec2.internal-1373310093967-e20aba69-0
 50



On Mon, Jul 8, 2013 at 8:56 PM, Jun Rao <ju...@gmail.com> wrote:

> The consumer reconnects because the broker closed the socket. Any
> error/exception on the broker side around the same time?
>
> Thanks,
>
> Jun
>
>
> On Mon, Jul 8, 2013 at 10:25 AM, Dennis Haller <dhaller@talemetry.com
> >wrote:
>
> > I have a 4-broker Kafka system running in Amazon EC2,  and we are using
> > Kafka 0.8 beta1. Most of the standard default configurations remain
> > unchanged. Running the kafka tool ConsumerOffsetChecker is causing socket
> > errors to occur. Some of these socket reset errors are also in the kafka
> > server log.
> >
> >  Usually the first few message topics are printed, but then every
> > subsequent one is causing a socket error. The entire command completes in
> > only 3 or 4 seconds, so I don't know where the timeouts are coming from.
> >
> > Do you have any suggestions?
> >
> >
> > Here is the log:
> >
> >
> >
> > bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group
> > ArchivingConsumer --zkconnect
> > ec2-23-22-34-191.compute-1.amazonaws.com:2181
> > [2013-07-08 10:14:33,172] INFO Starting ZkClient event thread.
> > (org.I0Itec.zkclient.ZkEventThread)
> > [2013-07-08 10:14:33,186] INFO Client
> > environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47
> GMT
> > (org.apache.zookeeper.ZooKeeper)
> > [2013-07-08 10:14:33,186] INFO Client
> > environment:host.name=ip-10-41-3-33.ec2.internal
> > (org.apache.zookeeper.ZooKeeper)
> > [2013-07-08 10:14:33,186] INFO Client environment:java.version=1.7.0_21
> > (org.apache.zookeeper.ZooKeeper)
> > [2013-07-08 10:14:33,186] INFO Client environment:java.vendor=Oracle
> > Corporation (org.apache.zookeeper.ZooKeeper)
> > [2013-07-08 10:14:33,186] INFO Client
> > environment:java.home=/usr/lib/jvm/java-7-openjdk-amd64/jre
> > (org.apache.zookeeper.ZooKeeper)
> > [2013-07-08 10:14:33,187] INFO Client
> >
> >
> environment:java.class.path=:bin/../core/target/scala-2.8.0/kafka_2.8.0-0.8.0-beta1.jar:bin/../core/target/scala-2.8.0/kafka-assembly-0.8.0-beta1-deps.jar:bin/../perf/target/scala-2.8.0/kafka-perf_2.8.0-0.8.0-beta1.jar:bin/../libs/*.jar:bin/../kafka*.jar
> > (org.apache.zookeeper.ZooKeeper)
> > [2013-07-08 10:14:33,187] INFO Client
> >
> >
> environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib/jni:/lib:/usr/lib
> > (org.apache.zookeeper.ZooKeeper)
> > [2013-07-08 10:14:33,187] INFO Client environment:java.io.tmpdir=/tmp
> > (org.apache.zookeeper.ZooKeeper)
> > [2013-07-08 10:14:33,187] INFO Client environment:java.compiler=<NA>
> > (org.apache.zookeeper.ZooKeeper)
> > [2013-07-08 10:14:33,187] INFO Client environment:os.name=Linux
> > (org.apache.zookeeper.ZooKeeper)
> > [2013-07-08 10:14:33,187] INFO Client environment:os.arch=amd64
> > (org.apache.zookeeper.ZooKeeper)
> > [2013-07-08 10:14:33,188] INFO Client
> > environment:os.version=3.2.0-32-virtual (org.apache.zookeeper.ZooKeeper)
> > [2013-07-08 10:14:33,188] INFO Client environment:user.name=ubuntu
> > (org.apache.zookeeper.ZooKeeper)
> > [2013-07-08 10:14:33,188] INFO Client environment:user.home=/home/ubuntu
> > (org.apache.zookeeper.ZooKeeper)
> > [2013-07-08 10:14:33,188] INFO Client
> > environment:user.dir=/opt/kafka/kafka-0.8
> (org.apache.zookeeper.ZooKeeper)
> > [2013-07-08 10:14:33,189] INFO Initiating client connection,
> connectString=
> > ec2-23-22-34-191.compute-1.amazonaws.com:2181 sessionTimeout=30000
> > watcher=org.I0Itec.zkclient.ZkClient@737c45ee
> > (org.apache.zookeeper.ZooKeeper)
> > [2013-07-08 10:14:33,216] INFO Opening socket connection to server
> >
> >
> ec2-23-22-34-191.compute-1.amazonaws.com/10.122.169.44:2181(org.apache.zookeeper.ClientCnxn)
> > [2013-07-08 10:14:33,226] INFO Socket connection established to
> > ec2-23-22-34-191.compute-1.amazonaws.com/10.122.169.44:2181, initiating
> > session (org.apache.zookeeper.ClientCnxn)
> > [2013-07-08 10:14:33,300] INFO Session establishment complete on server
> > ec2-23-22-34-191.compute-1.amazonaws.com/10.122.169.44:2181, sessionid =
> > 0x13fb0932c3a002a, negotiated timeout = 30000
> > (org.apache.zookeeper.ClientCnxn)
> > [2013-07-08 10:14:33,302] INFO zookeeper state changed (SyncConnected)
> > (org.I0Itec.zkclient.ZkClient)
> > Group           Topic                          Pid Offset
>  logSize
> >         Lag             Owner
> > ArchivingConsumer qa-M-Candidate-CrmStatus-Events 0   30              30
> >            0
> > ArchivingConsumer_ip-10-121-10-80.ec2.internal-1373285289428-6aeafed6-0
> > ArchivingConsumer qa-M-Match                     0   36              36
> >          0
> > ArchivingConsumer_ip-10-121-10-80.ec2.internal-1373285289428-6aeafed6-0
> > ArchivingConsumer qa-M-friday-01                 0   100             100
> >           0
> > ArchivingConsumer_ip-10-121-10-80.ec2.internal-1373285289428-6aeafed6-0
> > ArchivingConsumer qa-M-test-01                   0   200             200
> >           0
> > ArchivingConsumer_ip-10-121-10-80.ec2.internal-1373285289428-6aeafed6-0
> > [2013-07-08 10:14:35,223] INFO Reconnect due to socket error:
> >  (kafka.consumer.SimpleConsumer)
> > java.nio.channels.ClosedChannelException
> > at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
> >  at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
> > at
> >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> >  at
> > kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125)
> > at
> >
> >
> kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72)
> >  at
> >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90)
> > at
> >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
> >  at
> >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
> > at
> >
> >
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> >  at scala.collection.immutable.List.foreach(List.scala:45)
> > at
> >
> >
> kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:89)
> >  at
> >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
> > at
> >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
> >  at
> >
> >
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> > at scala.collection.immutable.List.foreach(List.scala:45)
> >  at
> > kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153)
> > at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)
> >
> > . . . repeated for each topic . . .
> >
> > ArchivingConsumer qa-f1-Match                    0   14              14
> >          0
> > ArchivingConsumer_ip-10-121-10-80.ec2.internal-1373285289428-6aeafed6-0
> > [2013-07-08 10:14:35,906] INFO Reconnect due to socket error:
> >  (kafka.consumer.SimpleConsumer)
> > java.nio.channels.ClosedChannelException
> > at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
> > at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
> > at
> >
> >
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> > at
> kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125)
> > at
> >
> >
> kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72)
> > at
> >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90)
> > at
> >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
> > at
> >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
> > at
> >
> >
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> > at scala.collection.immutable.List.foreach(List.scala:45)
> > at
> >
> >
> kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:89)
> > at
> >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
> > at
> >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
> > at
> >
> >
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> > at scala.collection.immutable.List.foreach(List.scala:45)
> > at
> kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153)
> > at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)
> > ArchivingConsumer qa-f3-T-Campaign-Email         0   1               1
> >           0
> > ArchivingConsumer_ip-10-121-10-80.ec2.internal-1373285289428-6aeafed6-0
> > [2013-07-08 10:14:35,918] INFO Terminate ZkClient event thread.
> > (org.I0Itec.zkclient.ZkEventThread)
> > [2013-07-08 10:14:35,923] INFO EventThread shut down
> > (org.apache.zookeeper.ClientCnxn)
> > [2013-07-08 10:14:35,923] INFO Session: 0x13fb0932c3a002a closed
> > (org.apache.zookeeper.ZooKeeper)
> >
>

Re: Kafka issue with "Reconnect due to socket error"

Posted by Jun Rao <ju...@gmail.com>.
The consumer reconnects because the broker closed the socket. Any
error/exception on the broker side around the same time?

Thanks,

Jun


On Mon, Jul 8, 2013 at 10:25 AM, Dennis Haller <dh...@talemetry.com>wrote:

> I have a 4-broker Kafka system running in Amazon EC2,  and we are using
> Kafka 0.8 beta1. Most of the standard default configurations remain
> unchanged. Running the kafka tool ConsumerOffsetChecker is causing socket
> errors to occur. Some of these socket reset errors are also in the kafka
> server log.
>
>  Usually the first few message topics are printed, but then every
> subsequent one is causing a socket error. The entire command completes in
> only 3 or 4 seconds, so I don't know where the timeouts are coming from.
>
> Do you have any suggestions?
>
>
> Here is the log:
>
>
>
> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group
> ArchivingConsumer --zkconnect
> ec2-23-22-34-191.compute-1.amazonaws.com:2181
> [2013-07-08 10:14:33,172] INFO Starting ZkClient event thread.
> (org.I0Itec.zkclient.ZkEventThread)
> [2013-07-08 10:14:33,186] INFO Client
> environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT
> (org.apache.zookeeper.ZooKeeper)
> [2013-07-08 10:14:33,186] INFO Client
> environment:host.name=ip-10-41-3-33.ec2.internal
> (org.apache.zookeeper.ZooKeeper)
> [2013-07-08 10:14:33,186] INFO Client environment:java.version=1.7.0_21
> (org.apache.zookeeper.ZooKeeper)
> [2013-07-08 10:14:33,186] INFO Client environment:java.vendor=Oracle
> Corporation (org.apache.zookeeper.ZooKeeper)
> [2013-07-08 10:14:33,186] INFO Client
> environment:java.home=/usr/lib/jvm/java-7-openjdk-amd64/jre
> (org.apache.zookeeper.ZooKeeper)
> [2013-07-08 10:14:33,187] INFO Client
>
> environment:java.class.path=:bin/../core/target/scala-2.8.0/kafka_2.8.0-0.8.0-beta1.jar:bin/../core/target/scala-2.8.0/kafka-assembly-0.8.0-beta1-deps.jar:bin/../perf/target/scala-2.8.0/kafka-perf_2.8.0-0.8.0-beta1.jar:bin/../libs/*.jar:bin/../kafka*.jar
> (org.apache.zookeeper.ZooKeeper)
> [2013-07-08 10:14:33,187] INFO Client
>
> environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib/jni:/lib:/usr/lib
> (org.apache.zookeeper.ZooKeeper)
> [2013-07-08 10:14:33,187] INFO Client environment:java.io.tmpdir=/tmp
> (org.apache.zookeeper.ZooKeeper)
> [2013-07-08 10:14:33,187] INFO Client environment:java.compiler=<NA>
> (org.apache.zookeeper.ZooKeeper)
> [2013-07-08 10:14:33,187] INFO Client environment:os.name=Linux
> (org.apache.zookeeper.ZooKeeper)
> [2013-07-08 10:14:33,187] INFO Client environment:os.arch=amd64
> (org.apache.zookeeper.ZooKeeper)
> [2013-07-08 10:14:33,188] INFO Client
> environment:os.version=3.2.0-32-virtual (org.apache.zookeeper.ZooKeeper)
> [2013-07-08 10:14:33,188] INFO Client environment:user.name=ubuntu
> (org.apache.zookeeper.ZooKeeper)
> [2013-07-08 10:14:33,188] INFO Client environment:user.home=/home/ubuntu
> (org.apache.zookeeper.ZooKeeper)
> [2013-07-08 10:14:33,188] INFO Client
> environment:user.dir=/opt/kafka/kafka-0.8 (org.apache.zookeeper.ZooKeeper)
> [2013-07-08 10:14:33,189] INFO Initiating client connection, connectString=
> ec2-23-22-34-191.compute-1.amazonaws.com:2181 sessionTimeout=30000
> watcher=org.I0Itec.zkclient.ZkClient@737c45ee
> (org.apache.zookeeper.ZooKeeper)
> [2013-07-08 10:14:33,216] INFO Opening socket connection to server
>
> ec2-23-22-34-191.compute-1.amazonaws.com/10.122.169.44:2181(org.apache.zookeeper.ClientCnxn)
> [2013-07-08 10:14:33,226] INFO Socket connection established to
> ec2-23-22-34-191.compute-1.amazonaws.com/10.122.169.44:2181, initiating
> session (org.apache.zookeeper.ClientCnxn)
> [2013-07-08 10:14:33,300] INFO Session establishment complete on server
> ec2-23-22-34-191.compute-1.amazonaws.com/10.122.169.44:2181, sessionid =
> 0x13fb0932c3a002a, negotiated timeout = 30000
> (org.apache.zookeeper.ClientCnxn)
> [2013-07-08 10:14:33,302] INFO zookeeper state changed (SyncConnected)
> (org.I0Itec.zkclient.ZkClient)
> Group           Topic                          Pid Offset          logSize
>         Lag             Owner
> ArchivingConsumer qa-M-Candidate-CrmStatus-Events 0   30              30
>            0
> ArchivingConsumer_ip-10-121-10-80.ec2.internal-1373285289428-6aeafed6-0
> ArchivingConsumer qa-M-Match                     0   36              36
>          0
> ArchivingConsumer_ip-10-121-10-80.ec2.internal-1373285289428-6aeafed6-0
> ArchivingConsumer qa-M-friday-01                 0   100             100
>           0
> ArchivingConsumer_ip-10-121-10-80.ec2.internal-1373285289428-6aeafed6-0
> ArchivingConsumer qa-M-test-01                   0   200             200
>           0
> ArchivingConsumer_ip-10-121-10-80.ec2.internal-1373285289428-6aeafed6-0
> [2013-07-08 10:14:35,223] INFO Reconnect due to socket error:
>  (kafka.consumer.SimpleConsumer)
> java.nio.channels.ClosedChannelException
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
>  at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
> at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
>  at
> kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125)
> at
>
> kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72)
>  at
>
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90)
> at
>
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
>  at
>
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
> at
>
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
>  at scala.collection.immutable.List.foreach(List.scala:45)
> at
>
> kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:89)
>  at
>
> kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
> at
>
> kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
>  at
>
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> at scala.collection.immutable.List.foreach(List.scala:45)
>  at
> kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153)
> at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)
>
> . . . repeated for each topic . . .
>
> ArchivingConsumer qa-f1-Match                    0   14              14
>          0
> ArchivingConsumer_ip-10-121-10-80.ec2.internal-1373285289428-6aeafed6-0
> [2013-07-08 10:14:35,906] INFO Reconnect due to socket error:
>  (kafka.consumer.SimpleConsumer)
> java.nio.channels.ClosedChannelException
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:89)
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
> at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
> at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:125)
> at
>
> kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:72)
> at
>
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:90)
> at
>
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
> at
>
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:90)
> at
>
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> at scala.collection.immutable.List.foreach(List.scala:45)
> at
>
> kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:89)
> at
>
> kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
> at
>
> kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:154)
> at
>
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> at scala.collection.immutable.List.foreach(List.scala:45)
> at kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:153)
> at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)
> ArchivingConsumer qa-f3-T-Campaign-Email         0   1               1
>           0
> ArchivingConsumer_ip-10-121-10-80.ec2.internal-1373285289428-6aeafed6-0
> [2013-07-08 10:14:35,918] INFO Terminate ZkClient event thread.
> (org.I0Itec.zkclient.ZkEventThread)
> [2013-07-08 10:14:35,923] INFO EventThread shut down
> (org.apache.zookeeper.ClientCnxn)
> [2013-07-08 10:14:35,923] INFO Session: 0x13fb0932c3a002a closed
> (org.apache.zookeeper.ZooKeeper)
>