You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Jian Lin (JIRA)" <ji...@apache.org> on 2017/06/14 13:04:00 UTC
[jira] [Comment Edited] (KAFKA-5432) producer and consumer
SocketTimeoutException
[ https://issues.apache.org/jira/browse/KAFKA-5432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16049149#comment-16049149 ]
Jian Lin edited comment on KAFKA-5432 at 6/14/17 1:03 PM:
----------------------------------------------------------
[~huxi_2b]
look at the log
consumer
{code:java}
2017-06-12 10:45:47[sms-consumer-pool-2-thread:602809565]-[INFO] receive message:XXXXXXXXXXX
....
2017-06-12 10:45:47[sms-consumer-pool-2-thread:602809598]-[INFO] time cost. get temp:2 ms,http:26 ms,save log:5 ms /*this means nomal*/
2017-06-12 10:46:52[ConsumerFetcherThread-sms-consumer-group1_zw_78_64-1496632739724-69516149-0-176:602874523]-[INFO] Reconnect due to socket error: java.net.SocketTimeoutException /*error occurred*/
{code}
producer:
{code:java}
2017-06-12 10:45:47[qtp958382397-79:591409620]-[INFO] send message success /*this means nomal*/
2017-06-12 10:45:51[qtp958382397-80:591413798]-[INFO] send start,XXXXX
.... /*write some message to kafka,but not get feedback util it occurred timeout error*/
2017-06-12 10:46:01[qtp958382397-80:591423836]-[INFO] Disconnecting from 10.17.24.176:9092 /*kafka write message default timeout is 10s, so at 10:46:01 it occcurred error.*/
2017-06-12 10:46:01[qtp958382397-80:591423838]-[WARN] Failed to send producer request with correlation id 234645 to broker 176 with data for partitions [sms,3]
java.net.SocketTimeoutException
{code}
I know it might unrelated to kafka rolled log, but the time point *2017-06-12 10:45:51,425* is too coincidentally.
Yesterday, I restart my kafka cluster and set the log.roll.ms=120000.Then test write message and consume, it performs normally when the kafka rolled the log, and it not have many CLOSE_WAITs on my kafka server.
It likely related to those CLOSE_WAITs,because when I restart kafka server.It releases those CLOSE_WAITs connection.Then I can produce and consume message normally.
But I am not sure about it,actually it second time I encountered this problem.
First time, it also ran normally for a week, then thrown SocketTimeoutException.But I haven't save the detail error log.Just keep
{code:java}
2017-05-31 11:42:44[qtp958382397-18:45744]-[WARN] Fetching topic metadata with correlation id 0 for topics [Set(sms)] from broker [id:0,host:10.17.24.175,port:9092] failed
java.net.SocketTimeoutException
at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229)
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
at kafka.utils.Utils$.read(Utils.scala:380)
at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
at kafka.utils.Utils$.swallow(Utils.scala:172)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.Utils$.swallowError(Utils.scala:45)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
at kafka.producer.Producer.send(Producer.scala:77)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
{code}
was (Author: jian lin):
[~huxi_2b]
look at the log
consumer
{code:java}
2017-06-12 10:45:47[sms-consumer-pool-2-thread:602809565]-[INFO] receive message:XXXXXXXXXXX
....
2017-06-12 10:45:47[sms-consumer-pool-2-thread:602809598]-[INFO] time cost. get temp:2 ms,http:26 ms,save log:5 ms /*this means nomal*/
2017-06-12 10:46:52[ConsumerFetcherThread-sms-consumer-group1_zw_78_64-1496632739724-69516149-0-176:602874523]-[INFO] Reconnect due to socket error: java.net.SocketTimeoutException /*error occurred*/
{code}
producer:
{code:java}
2017-06-12 10:45:47[qtp958382397-79:591409620]-[INFO] send message success /*this means nomal*/
2017-06-12 10:45:51[qtp958382397-80:591413798]-[INFO] send start,XXXXX
.... /*write some message to kafka,but not get feedback util it occurred timeout error*/
2017-06-12 10:46:01[qtp958382397-80:591423836]-[INFO] Disconnecting from 10.17.24.176:9092 /*kafka write message default timeout is 10s, so at 10:46:01 it occcurred error.*/{color}
2017-06-12 10:46:01[qtp958382397-80:591423838]-[WARN] Failed to send producer request with correlation id 234645 to broker 176 with data for partitions [sms,3]
java.net.SocketTimeoutException
{code}
I know it might unrelated to kafka rolled log, but the time point *2017-06-12 10:45:51,425* is too coincidentally.
Yesterday, I restart my kafka cluster and set the log.roll.ms=120000.Then test write message and consume, it performs normally when the kafka rolled the log, and it not have many CLOSE_WAITs on my kafka server.
It likely related to those CLOSE_WAITs,because when I restart kafka server.It releases those CLOSE_WAITs connection.Then I can produce and consume message normally.
But I am not sure about it,actually it second time I encountered this problem.
First time, it also ran normally for a week, then thrown SocketTimeoutException.But I haven't save the detail error log.Just keep
{code:java}
2017-05-31 11:42:44[qtp958382397-18:45744]-[WARN] Fetching topic metadata with correlation id 0 for topics [Set(sms)] from broker [id:0,host:10.17.24.175,port:9092] failed
java.net.SocketTimeoutException
at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229)
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
at kafka.utils.Utils$.read(Utils.scala:380)
at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
at kafka.utils.Utils$.swallow(Utils.scala:172)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.Utils$.swallowError(Utils.scala:45)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
at kafka.producer.Producer.send(Producer.scala:77)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
{code}
> producer and consumer SocketTimeoutException
> ---------------------------------------------
>
> Key: KAFKA-5432
> URL: https://issues.apache.org/jira/browse/KAFKA-5432
> Project: Kafka
> Issue Type: Bug
> Components: network
> Affects Versions: 0.10.2.0
> Environment: os:Red Hat 4.4.7-17
> java:
> java version "1.8.0_131"
> Java(TM) SE Runtime Environment (build 1.8.0_131-b11)
> Java HotSpot(TM) 64-Bit Server VM (build 25.131-b11, mixed mode)
> Reporter: Jian Lin
> Attachments: server.properties
>
>
> Hey all, I met a strange problem, hope someone can help me.
> The program ran normally for a week, and I did not do any changes, but today it reported a mistake suddenly
> Producer error log:
> {code:java}
> 2017-06-12 10:46:01[qtp958382397-80:591423838]-[WARN] Failed to send producer request with correlation id 234645 to broker 176 with data for partitions [sms,3]
> java.net.SocketTimeoutException
> at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:229)
> at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
> at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
> at kafka.utils.Utils$.read(Utils.scala:380)
> at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
> at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
> at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
> at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
> at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
> at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
> at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
> at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
> at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
> at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
> at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
> at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
> at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
> at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
> at kafka.producer.Producer.send(Producer.scala:77)
> at kafka.javaapi.producer.Producer.send(Producer.scala:33)
> {code}
> Consumer error log:
> {code:java}
> 2017-06-12 10:46:52[ConsumerFetcherThread-sms-consumer-group1_zw_78_64-1496632739724-69516149-0-176:602874523]-[INFO] Reconnect due to socket error: java.net.SocketTimeoutException
> 2017-06-12 10:47:22[ConsumerFetcherThread-sms-consumer-group1_zw_78_64-1496632739724-69516149-0-176:602904544]-[WARN] [ConsumerFetcherThread-sms-consumer-group1_zw_78_64-1496632739724-69516149-0-176], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 6060231; ClientId: sms-consumer-group1; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [sms,0] -> PartitionFetchInfo(81132,1048576),[sms,3] -> PartitionFetchInfo(94040,1048576). Possible cause: java.net.SocketTimeoutException
> 2017-06-12 10:47:22[sms-consumer-group1_zw_78_64-1496632739724-69516149-leader-finder-thread:602904576]-[INFO] Verifying properties
> 2017-06-12 10:47:22[sms-consumer-group1_zw_78_64-1496632739724-69516149-leader-finder-thread:602904576]-[INFO] Property client.id is overridden to sms-consumer-group1
> 2017-06-12 10:47:22[sms-consumer-group1_zw_78_64-1496632739724-69516149-leader-finder-thread:602904576]-[INFO] Property metadata.broker.list is overridden to 10.17.24.175:9092,10.17.24.176:9092,10.17.24.177:9092
> 2017-06-12 10:47:22[sms-consumer-group1_zw_78_64-1496632739724-69516149-leader-finder-thread:602904576]-[INFO] Property request.timeout.ms is overridden to 30000
> 2017-06-12 10:47:22[sms-consumer-group1_zw_78_64-1496632739724-69516149-leader-finder-thread:602904576]-[INFO] Fetching metadata from broker id:177,host:10.17.24.177,port:9092 with correlation id 1 for 1 topic(s) Set(sms)
> 2017-06-12 10:47:22[sms-consumer-group1_zw_78_64-1496632739724-69516149-leader-finder-thread:602904577]-[INFO] Connected to 10.17.24.177:9092 for producing
> 2017-06-12 10:47:22[sms-consumer-group1_zw_78_64-1496632739724-69516149-leader-finder-thread:602904578]-[INFO] Disconnecting from 10.17.24.177:9092
> 2017-06-12 10:47:22[ConsumerFetcherThread-sms-consumer-group1_zw_78_64-1496632739724-69516149-0-176:602904579]-[INFO] Reconnect due to socket error: java.nio.channels.ClosedChannelException
> 2017-06-12 10:47:22[sms-consumer-group1_zw_78_64-1496632739724-69516149-leader-finder-thread:602904579]-[INFO] [ConsumerFetcherManager-1496632739747] Added fetcher for partitions ArrayBuffer([[sms,0], initOffset 81132 to broker id:176,host:10.17.24.176,port:9092] , [[sms,3], initOffset 94040 to broker id:176,host:10.17.24.176,port:9092] )
> 2017-06-12 10:47:52[ConsumerFetcherThread-sms-consumer-group1_zw_78_64-1496632739724-69516149-0-176:602934581]-[WARN] [ConsumerFetcherThread-sms-consumer-group1_zw_78_64-1496632739724-69516149-0-176], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 6060232; ClientId: sms-consumer-group1; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [sms,0] -> PartitionFetchInfo(81132,1048576),[sms,3] -> PartitionFetchInfo(94040,1048576). Possible cause: java.net.SocketTimeoutException
> 2017-06-12 10:47:52[sms-consumer-group1_zw_78_64-1496632739724-69516149-leader-finder-thread:602934600]-[INFO] Verifying properties
> 2017-06-12 10:47:52[sms-consumer-group1_zw_78_64-1496632739724-69516149-leader-finder-thread:602934600]-[INFO] Property client.id is overridden to sms-consumer-group1
> 2017-06-12 10:47:52[sms-consumer-group1_zw_78_64-1496632739724-69516149-leader-finder-thread:602934600]-[INFO] Property metadata.broker.list is overridden to 10.17.24.175:9092,10.17.24.176:9092,10.17.24.177:9092
> 2017-06-12 10:47:52[sms-consumer-group1_zw_78_64-1496632739724-69516149-leader-finder-thread:602934600]-[INFO] Property request.timeout.ms is overridden to 30000
> 2017-06-12 10:47:52[sms-consumer-group1_zw_78_64-1496632739724-69516149-leader-finder-thread:602934600]-[INFO] Fetching metadata from broker id:176,host:10.17.24.176,port:9092 with correlation id 2 for 1 topic(s) Set(sms)
> 2017-06-12 10:47:52[sms-consumer-group1_zw_78_64-1496632739724-69516149-leader-finder-thread:602934601]-[INFO] Connected to 10.17.24.176:9092 for producing
> 2017-06-12 10:48:22[sms-consumer-group1_zw_78_64-1496632739724-69516149-leader-finder-thread:602964602]-[INFO] Disconnecting from 10.17.24.176:9092
> 2017-06-12 10:48:22[sms-consumer-group1_zw_78_64-1496632739724-69516149-leader-finder-thread:602964627]-[WARN] Fetching topic metadata with correlation id 2 for topics [Set(sms)] from broker [id:176,host:10.17.24.176,port:9092] failed
> java.net.SocketTimeoutException
> at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201)
> at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
> at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221)
> at kafka.utils.Utils$.read(Utils.scala:380)
> at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
> at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
> at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> {code}
> I check the kafka server.log, it just print one message *[2017-06-12 10:45:51,425] INFO Rolled new log segment for 'sms-3' in 2 ms. (kafka.log.Log)*
> ----
> The kafka version that i used is *2.10-0.10.2.0*, and on client side is *kafka_2.9.2-0.8.2.1.jar*.
> attach file is my kafka config, my kafka cluster has three broker 175,176,177.
> topic info:
> {code:java}
> Topic:sms PartitionCount:6 ReplicationFactor:2 Configs:
> Topic: sms Partition: 0 Leader: 176 Replicas: 176,177 Isr: 176
> Topic: sms Partition: 1 Leader: 177 Replicas: 177,175 Isr: 177
> Topic: sms Partition: 2 Leader: 175 Replicas: 175,176 Isr: 175
> Topic: sms Partition: 3 Leader: 176 Replicas: 176,175 Isr: 176
> Topic: sms Partition: 4 Leader: 177 Replicas: 177,176 Isr: 177
> Topic: sms Partition: 5 Leader: 175 Replicas: 175,177 Isr: 175
> {code}
> ----update at 2017-06-13
> I found the broker 176 has too many TCP CLOSE_WAIT
> {code:java}
> Active Internet connections (w/o servers)
> Proto Recv-Q Send-Q Local Address Foreign Address State
> tcp 0 0 10.17.24.176:ssh 10.10.52.171:54275 ESTABLISHED
> tcp 48 0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:54189 CLOSE_WAIT
> tcp 48 0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:48538 CLOSE_WAIT
> tcp 48 0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:55449 CLOSE_WAIT
> tcp 0 0 ::ffff:10.17.24.176:45856 ::ffff:10.17.24.17:eforward ESTABLISHED
> tcp 48 0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:7817 CLOSE_WAIT
> tcp 48 0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:26435 CLOSE_WAIT
> tcp 48 0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:46790 CLOSE_WAIT
> tcp 36 0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:54199 CLOSE_WAIT
> tcp 36 0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:42725 CLOSE_WAIT
> tcp 48 0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:50994 CLOSE_WAIT
> tcp 48 0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:37867 CLOSE_WAIT
> tcp 48 0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:12582 CLOSE_WAIT
> tcp 36 0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:23577 CLOSE_WAIT
> tcp 0 0 ::ffff:10.17.24.17:eforward ::ffff:10.17.24.175:32890 ESTABLISHED
> tcp 36 0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:41288 CLOSE_WAIT
> tcp 48 0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:25335 CLOSE_WAIT
> tcp 48 0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:17124 CLOSE_WAIT
> tcp 48 0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:41103 CLOSE_WAIT
> tcp 48 0 ::ffff:10.17.2:XmlIpcRegSvc ::ffff:10.10.78.64:58923 CLOSE_WAIT
> {code}
> TIME_WAIT 1
> CLOSE_WAIT 389
> ESTABLISHED 8
> The 10.10.78.64 is my consumer's ip.
> In addition,at this server, I run *bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker -zookeeper zk -topic topic -group group* every minute, to monitor the topic offset value.
> I do not know whether the reason for the timeout is related to this.
> And when I restart kafka cluster, it recover nomal, the TCP CLOSE_WAIT are disappear.And I can produce and consume normally.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)