You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Json Tu <ka...@126.com> on 2016/10/25 12:32:18 UTC

handleFetchRequest throw exception

Hi all,
	I use Kafka 0.9.0.0, and we have a cluster with 6 nodes, when I restart a broker,we find there are many logs as below,

[2016-10-24 15:29:00,914] ERROR [KafkaApi-2141642] error when handling request Name: FetchRequest; Version: 1; CorrelationId: 4928; ClientId: ReplicaFetcherThread-0-2141642; ReplicaId: 2141386; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [retail.c.order.logistics,0] -> PartitionFetchInfo(215258,1048576),[waimai_c_ugc_msg,1] -> PartitionFetchInfo(12426588,1048576),[waimai_c_ucenter_asyncrelationbind_delay,25] -> PartitionFetchInfo(0,1048576),[waimai_c_order_databus_wmorder,44] -> PartitionFetchInfo(49555913,1048576),[__consumer_offsets,23] -> PartitionFetchInfo(11846051,1048576),[retail.m.sp.sku.update,3] -> PartitionFetchInfo(21563,1048576),[waimai_c_monitor_orderlogisticsstatus,28] -> PartitionFetchInfo(26926356,1048576),[waimai_c_ucenter_loadrelation,0] -> PartitionFetchInfo(54583,1048576),[__consumer_offsets,29] -> PartitionFetchInfo(23479045,1048576),[waimai_c_order_databus_wmorder,14] -> PartitionFetchInfo(49568225,1048576),[waimai_c_ordertag_orderdealremark,31] -> PartitionFetchInfo(1829838,1048576),[retail.d.ris.spider.request,0] -> PartitionFetchInfo(709845,1048576),[__consumer_offsets,13] -> PartitionFetchInfo(9376691,1048576),[waimai_c_ugc_msg_staging,2] -> PartitionFetchInfo(38,1048576),[retail.b.openapi.push.retry.stage,0] -> PartitionFetchInfo(0,1048576),[waimai_c_ucenter_favoritepoi,15] -> PartitionFetchInfo(390045,1048576),[retail.b.order.phonecall,0] -> PartitionFetchInfo(1,1048576),[waimai_c_ucenter_loadrelation,45] -> PartitionFetchInfo(53975,1048576),[waimai_c_ordertag_orderdealremark,1] -> PartitionFetchInfo(1829848,1048576),[retail.d.ris.spider.jddj.request,0] -> PartitionFetchInfo(5116337,1048576),[waimai_c_ucenter_asyncrelationbind_delay,13] -> PartitionFetchInfo(0,1048576),[waimai_c_ucenter_asyncrelationbind_delay,55] -> PartitionFetchInfo(0,1048576),[waimai_push_e_operate_prod,3] -> PartitionFetchInfo(442564,1048576),[waimai_ordersa_topic_user_order_in_poi_count_diff,5] -> PartitionFetchInfo(23791010,1048576),[retail.c.order.create,4] -> PartitionFetchInfo(72902,1048576),[waimai_c_ucenter_asyncrelationbind_staging,2] -> PartitionFetchInfo(66,1048576),[waimai_c_order_orderevent_topic,35] -> PartitionFetchInfo(0,1048576),[waimai_c_ucenter_syncuserrelation,43] -> PartitionFetchInfo(0,1048576),[waimai_c_order_databus_wmorder,48] -> PartitionFetchInfo(49496018,1048576),[waimai_c_monitor_orderstatus,32] -> PartitionFetchInfo(50623699,1048576),[waimai_c_ucenter_loadrelation,15] -> PartitionFetchInfo(54360,1048576),[waimai_c_monitor_orderstatus,2] -> PartitionFetchInfo(50624881,1048576),[waimai_c_order_databus_wmorder,24] -> PartitionFetchInfo(49548334,1048576),[waimai_c_order_databus_wmorder,18] -> PartitionFetchInfo(49489397,1048576),[waimai_c_ucenter_asyncrelationbind,36] -> PartitionFetchInfo(53430,1048576) (kafka.server.KafkaApis)
kafka.common.NotLeaderForPartitionException: Leader not local for partition [retail.d.ris.spider.request,1] on broker 2141642
        at kafka.server.ReplicaManager.getLeaderReplicaIfLocal(ReplicaManager.scala:296)
        at kafka.server.DelayedFetch$$anonfun$tryComplete$1.apply(DelayedFetch.scala:77)
        at kafka.server.DelayedFetch$$anonfun$tryComplete$1.apply(DelayedFetch.scala:72)
        at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
        at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
        at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
        at kafka.server.DelayedFetch.tryComplete(DelayedFetch.scala:72)
        at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307)
        at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227)
        at kafka.server.ReplicaManager.tryCompleteDelayedFetch(ReplicaManager.scala:202)
        at kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:372)
        at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:294)
        at kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:243)
        at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:852)
        at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:849)
        at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
        at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
        at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
        at kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:849)
        at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:467)
        at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:434)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:69)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
        at java.lang.Thread.run(Thread.java:745)


what confused me is that,retail.d.ris.spider.request is not contained in this request,why will log it in handleFetchRequest,how can it happen and how to resolve it?




Re: handleFetchRequest throw exception

Posted by Json Tu <ka...@126.com>.
Thanks to guozhang.
According to your suggestions,I found my new patch to kafka 0.9.0.0 may casue the problem,
In delayedfetch.scala,  I include import org.apache.kafka.common.errors.NotLeaderForPartitionException but not import kafka.common.NotLeaderForPartitionException for intelij auto import,
so the getLeaderReplicaIfLocal’s internal throw( kafka.common.NotLeaderForPartitionException) can not be catch by tryComplete(), so it throw up to until handle, I think it may be the cause of repeated error log and other strange thing.

> 在 2016年10月27日,上午7:31,Guozhang Wang <wa...@gmail.com> 写道:
> 
> Json,
> 
> As you mentioned yourself the "NotLeaderForPartitionException" thrown
> from getLeaderReplicaIfLocal
> should be caught in the end, and hence I'm not sure why the reported stack
> trace "ERROR: ..." throwing the NotLeaderForPartitionException should be
> seen from "tryComplete". Also I have checked the source code in both
> 0.9.0.0 and 0.8.2.2, their line numbers does not match with the reported
> stack trace line numbers (e.g. DelayedFetch.scala:72), so I cannot really
> tell why you could ever see the error message instead of the
> DEBUG-level "Broker
> is no longer the leader of %s, satisfy %s immediately..".
> 
> Following the 0.9.0.0 source code, since it NotLeaderForPartitionException
> is caught and force the delayed fetch request to be sent with some
> potential error code, it will not cause the replica's fetch request to be
> not return successfully to the fetch broker, and hence should not leader
> producer / consumer to fail for a long time. Similarly, since we force
> completing those delayed fetch requests as well, it should not cause a spam
> of repeated error log entries since it should at most print one entry (and
> should be DEBUG not ERROR) for each delayed request whose partition leaders
> have migrated out.
> 
> 
> 
> Guozhang
> 
> 
> 
> On Wed, Oct 26, 2016 at 7:46 AM, Json Tu <ka...@126.com> wrote:
> 
>> it make the cluster can not provide normal service,which leades some
>> producer or fetch fail for a long time before I restart current broker.
>> this error may be come from some formerly fetch operation which contain
>> this partition,which leads many fetch response error.
>> 
>> The delayFetch's tryComplete() function implements as below,
>> override def tryComplete() : Boolean = {
>> var accumulatedSize = 0
>> fetchMetadata.fetchPartitionStatus.foreach {
>>   case (topicAndPartition, fetchStatus) =>
>>     val fetchOffset = fetchStatus.startOffsetMetadata
>>     try {
>>       if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
>>         val replica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic,
>> topicAndPartition.partition)
>>         /*ignore some codes*/
>>       }
>>     } catch {
>>       /*ignore some code*/
>>       case nle: NotLeaderForPartitionException =>  // Case A
>>         debug("Broker is no longer the leader of %s, satisfy %s
>> immediately".format(topicAndPartition, fetchMetadata))
>>         return forceComplete()
>>     }
>> }
>> /* ignore some codes */
>> }
>> 
>> when meet NotLeaderForPartitionException, it will invoke forceComplete()
>> function, then it will invoke onComplete() function, which implements as
>> below,
>> override def onComplete() {
>> val logReadResults = replicaManager.readFromLocalLog(
>> fetchMetadata.fetchOnlyLeader,
>>   fetchMetadata.fetchOnlyCommitted,
>>   fetchMetadata.fetchPartitionStatus.mapValues(status =>
>> status.fetchInfo))
>> 
>> val fetchPartitionData = logReadResults.mapValues(result =>
>>   FetchResponsePartitionData(result.errorCode, result.hw,
>> result.info.messageSet))
>> 
>> responseCallback(fetchPartitionData)
>> }
>> 
>> so, I think it exit the tryComplete function in advance because of this
>> partition, which makes the partition latter in this request may not be
>> completely be satisfied and return to the fetch broker,
>> which leads some producer and consumer fail for a longtime,I don’t know is
>> it correct
>> 
>>> 在 2016年10月25日,下午8:32,Json Tu <ka...@126.com> 写道:
>>> 
>>> Hi all,
>>>      I use Kafka 0.9.0.0, and we have a cluster with 6 nodes, when I
>> restart a broker,we find there are many logs as below,
>>> 
>>> [2016-10-24 15:29:00,914] ERROR [KafkaApi-2141642] error when handling
>> request Name: FetchRequest; Version: 1; CorrelationId: 4928; ClientId:
>> ReplicaFetcherThread-0-2141642; ReplicaId: 2141386; MaxWait: 500 ms;
>> MinBytes: 1 bytes; RequestInfo: [retail.c.order.logistics,0] ->
>> PartitionFetchInfo(215258,1048576),[waimai_c_ugc_msg,1] ->
>> PartitionFetchInfo(12426588,1048576),[waimai_c_ucenter_asyncrelationbind_delay,25]
>> -> PartitionFetchInfo(0,1048576),[waimai_c_order_databus_wmorder,44] ->
>> PartitionFetchInfo(49555913,1048576),[__consumer_offsets,23] ->
>> PartitionFetchInfo(11846051,1048576),[retail.m.sp.sku.update,3] ->
>> PartitionFetchInfo(21563,1048576),[waimai_c_monitor_orderlogisticsstatus,28]
>> -> PartitionFetchInfo(26926356,1048576),[waimai_c_ucenter_loadrelation,0]
>> -> PartitionFetchInfo(54583,1048576),[__consumer_offsets,29] ->
>> PartitionFetchInfo(23479045,1048576),[waimai_c_order_databus_wmorder,14]
>> -> PartitionFetchInfo(49568225,1048576),[waimai_c_ordertag_orderdealremark,31]
>> -> PartitionFetchInfo(1829838,1048576),[retail.d.ris.spider.request,0] ->
>> PartitionFetchInfo(709845,1048576),[__consumer_offsets,13] ->
>> PartitionFetchInfo(9376691,1048576),[waimai_c_ugc_msg_staging,2] ->
>> PartitionFetchInfo(38,1048576),[retail.b.openapi.push.retry.stage,0] ->
>> PartitionFetchInfo(0,1048576),[waimai_c_ucenter_favoritepoi,15] ->
>> PartitionFetchInfo(390045,1048576),[retail.b.order.phonecall,0] ->
>> PartitionFetchInfo(1,1048576),[waimai_c_ucenter_loadrelation,45] ->
>> PartitionFetchInfo(53975,1048576),[waimai_c_ordertag_orderdealremark,1]
>> -> PartitionFetchInfo(1829848,1048576),[retail.d.ris.spider.jddj.request,0]
>> -> PartitionFetchInfo(5116337,1048576),[waimai_c_ucenter_asyncrelationbind_delay,13]
>> -> PartitionFetchInfo(0,1048576),[waimai_c_ucenter_asyncrelationbind_delay,55]
>> -> PartitionFetchInfo(0,1048576),[waimai_push_e_operate_prod,3] ->
>> PartitionFetchInfo(442564,1048576),[waimai_ordersa_
>> topic_user_order_in_poi_count_diff,5] -> PartitionFetchInfo(23791010,
>> 1048576),[retail.c.order.create,4] -> PartitionFetchInfo(72902,
>> 1048576),[waimai_c_ucenter_asyncrelationbind_staging,2] ->
>> PartitionFetchInfo(66,1048576),[waimai_c_order_orderevent_topic,35] ->
>> PartitionFetchInfo(0,1048576),[waimai_c_ucenter_syncuserrelation,43] ->
>> PartitionFetchInfo(0,1048576),[waimai_c_order_databus_wmorder,48] ->
>> PartitionFetchInfo(49496018,1048576),[waimai_c_monitor_orderstatus,32] ->
>> PartitionFetchInfo(50623699,1048576),[waimai_c_ucenter_loadrelation,15]
>> -> PartitionFetchInfo(54360,1048576),[waimai_c_monitor_orderstatus,2] ->
>> PartitionFetchInfo(50624881,1048576),[waimai_c_order_databus_wmorder,24]
>> -> PartitionFetchInfo(49548334,1048576),[waimai_c_order_databus_wmorder,18]
>> -> PartitionFetchInfo(49489397,1048576),[waimai_c_ucenter_asyncrelationbind,36]
>> -> PartitionFetchInfo(53430,1048576) (kafka.server.KafkaApis)
>>> kafka.common.NotLeaderForPartitionException: Leader not local for
>> partition [retail.d.ris.spider.request,1] on broker 2141642
>>>       at kafka.server.ReplicaManager.getLeaderReplicaIfLocal(
>> ReplicaManager.scala:296)
>>>       at kafka.server.DelayedFetch$$anonfun$tryComplete$1.apply(
>> DelayedFetch.scala:77)
>>>       at kafka.server.DelayedFetch$$anonfun$tryComplete$1.apply(
>> DelayedFetch.scala:72)
>>>       at scala.collection.immutable.HashMap$HashMap1.foreach(
>> HashMap.scala:224)
>>>       at scala.collection.immutable.HashMap$HashTrieMap.foreach(
>> HashMap.scala:403)
>>>       at scala.collection.immutable.HashMap$HashTrieMap.foreach(
>> HashMap.scala:403)
>>>       at kafka.server.DelayedFetch.tryComplete(DelayedFetch.scala:72)
>>>       at kafka.server.DelayedOperationPurgatory$
>> Watchers.tryCompleteWatched(DelayedOperation.scala:307)
>>>       at kafka.server.DelayedOperationPurgatory.checkAndComplete(
>> DelayedOperation.scala:227)
>>>       at kafka.server.ReplicaManager.tryCompleteDelayedFetch(
>> ReplicaManager.scala:202)
>>>       at kafka.cluster.Partition.tryCompleteDelayedRequests(
>> Partition.scala:372)
>>>       at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:294)
>>>       at kafka.cluster.Partition.updateReplicaLogReadResult(
>> Partition.scala:243)
>>>       at kafka.server.ReplicaManager$$anonfun$
>> updateFollowerLogReadResults$2.apply(ReplicaManager.scala:852)
>>>       at kafka.server.ReplicaManager$$anonfun$
>> updateFollowerLogReadResults$2.apply(ReplicaManager.scala:849)
>>>       at scala.collection.immutable.HashMap$HashMap1.foreach(
>> HashMap.scala:224)
>>>       at scala.collection.immutable.HashMap$HashTrieMap.foreach(
>> HashMap.scala:403)
>>>       at scala.collection.immutable.HashMap$HashTrieMap.foreach(
>> HashMap.scala:403)
>>>       at kafka.server.ReplicaManager.updateFollowerLogReadResults(
>> ReplicaManager.scala:849)
>>>       at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.
>> scala:467)
>>>       at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:434)
>>>       at kafka.server.KafkaApis.handle(KafkaApis.scala:69)
>>>       at kafka.server.KafkaRequestHandler.run(
>> KafkaRequestHandler.scala:60)
>>>       at java.lang.Thread.run(Thread.java:745)
>>> 
>>> 
>>> what confused me is that,retail.d.ris.spider.request is not contained
>> in this request,why will log it in handleFetchRequest,how can it happen and
>> how to resolve it?
>>> 
>>> 
>> 
>> 
>> 
> 
> 
> -- 
> -- Guozhang



Re: handleFetchRequest throw exception

Posted by Json Tu <ka...@126.com>.
Thanks to guozhang.
According to your suggestions,I found my new patch to kafka 0.9.0.0 may casue the problem,
In delayedfetch.scala,  I include import org.apache.kafka.common.errors.NotLeaderForPartitionException but not import kafka.common.NotLeaderForPartitionException for intelij auto import,
so the getLeaderReplicaIfLocal’s internal throw( kafka.common.NotLeaderForPartitionException) can not be catch by tryComplete(), so it throw up to until handle, I think it may be the cause of repeated error log and other strange thing.

> 在 2016年10月27日,上午7:31,Guozhang Wang <wa...@gmail.com> 写道:
> 
> Json,
> 
> As you mentioned yourself the "NotLeaderForPartitionException" thrown
> from getLeaderReplicaIfLocal
> should be caught in the end, and hence I'm not sure why the reported stack
> trace "ERROR: ..." throwing the NotLeaderForPartitionException should be
> seen from "tryComplete". Also I have checked the source code in both
> 0.9.0.0 and 0.8.2.2, their line numbers does not match with the reported
> stack trace line numbers (e.g. DelayedFetch.scala:72), so I cannot really
> tell why you could ever see the error message instead of the
> DEBUG-level "Broker
> is no longer the leader of %s, satisfy %s immediately..".
> 
> Following the 0.9.0.0 source code, since it NotLeaderForPartitionException
> is caught and force the delayed fetch request to be sent with some
> potential error code, it will not cause the replica's fetch request to be
> not return successfully to the fetch broker, and hence should not leader
> producer / consumer to fail for a long time. Similarly, since we force
> completing those delayed fetch requests as well, it should not cause a spam
> of repeated error log entries since it should at most print one entry (and
> should be DEBUG not ERROR) for each delayed request whose partition leaders
> have migrated out.
> 
> 
> 
> Guozhang
> 
> 
> 
> On Wed, Oct 26, 2016 at 7:46 AM, Json Tu <ka...@126.com> wrote:
> 
>> it make the cluster can not provide normal service,which leades some
>> producer or fetch fail for a long time before I restart current broker.
>> this error may be come from some formerly fetch operation which contain
>> this partition,which leads many fetch response error.
>> 
>> The delayFetch's tryComplete() function implements as below,
>> override def tryComplete() : Boolean = {
>> var accumulatedSize = 0
>> fetchMetadata.fetchPartitionStatus.foreach {
>>   case (topicAndPartition, fetchStatus) =>
>>     val fetchOffset = fetchStatus.startOffsetMetadata
>>     try {
>>       if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
>>         val replica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic,
>> topicAndPartition.partition)
>>         /*ignore some codes*/
>>       }
>>     } catch {
>>       /*ignore some code*/
>>       case nle: NotLeaderForPartitionException =>  // Case A
>>         debug("Broker is no longer the leader of %s, satisfy %s
>> immediately".format(topicAndPartition, fetchMetadata))
>>         return forceComplete()
>>     }
>> }
>> /* ignore some codes */
>> }
>> 
>> when meet NotLeaderForPartitionException, it will invoke forceComplete()
>> function, then it will invoke onComplete() function, which implements as
>> below,
>> override def onComplete() {
>> val logReadResults = replicaManager.readFromLocalLog(
>> fetchMetadata.fetchOnlyLeader,
>>   fetchMetadata.fetchOnlyCommitted,
>>   fetchMetadata.fetchPartitionStatus.mapValues(status =>
>> status.fetchInfo))
>> 
>> val fetchPartitionData = logReadResults.mapValues(result =>
>>   FetchResponsePartitionData(result.errorCode, result.hw,
>> result.info.messageSet))
>> 
>> responseCallback(fetchPartitionData)
>> }
>> 
>> so, I think it exit the tryComplete function in advance because of this
>> partition, which makes the partition latter in this request may not be
>> completely be satisfied and return to the fetch broker,
>> which leads some producer and consumer fail for a longtime,I don’t know is
>> it correct
>> 
>>> 在 2016年10月25日,下午8:32,Json Tu <ka...@126.com> 写道:
>>> 
>>> Hi all,
>>>      I use Kafka 0.9.0.0, and we have a cluster with 6 nodes, when I
>> restart a broker,we find there are many logs as below,
>>> 
>>> [2016-10-24 15:29:00,914] ERROR [KafkaApi-2141642] error when handling
>> request Name: FetchRequest; Version: 1; CorrelationId: 4928; ClientId:
>> ReplicaFetcherThread-0-2141642; ReplicaId: 2141386; MaxWait: 500 ms;
>> MinBytes: 1 bytes; RequestInfo: [retail.c.order.logistics,0] ->
>> PartitionFetchInfo(215258,1048576),[waimai_c_ugc_msg,1] ->
>> PartitionFetchInfo(12426588,1048576),[waimai_c_ucenter_asyncrelationbind_delay,25]
>> -> PartitionFetchInfo(0,1048576),[waimai_c_order_databus_wmorder,44] ->
>> PartitionFetchInfo(49555913,1048576),[__consumer_offsets,23] ->
>> PartitionFetchInfo(11846051,1048576),[retail.m.sp.sku.update,3] ->
>> PartitionFetchInfo(21563,1048576),[waimai_c_monitor_orderlogisticsstatus,28]
>> -> PartitionFetchInfo(26926356,1048576),[waimai_c_ucenter_loadrelation,0]
>> -> PartitionFetchInfo(54583,1048576),[__consumer_offsets,29] ->
>> PartitionFetchInfo(23479045,1048576),[waimai_c_order_databus_wmorder,14]
>> -> PartitionFetchInfo(49568225,1048576),[waimai_c_ordertag_orderdealremark,31]
>> -> PartitionFetchInfo(1829838,1048576),[retail.d.ris.spider.request,0] ->
>> PartitionFetchInfo(709845,1048576),[__consumer_offsets,13] ->
>> PartitionFetchInfo(9376691,1048576),[waimai_c_ugc_msg_staging,2] ->
>> PartitionFetchInfo(38,1048576),[retail.b.openapi.push.retry.stage,0] ->
>> PartitionFetchInfo(0,1048576),[waimai_c_ucenter_favoritepoi,15] ->
>> PartitionFetchInfo(390045,1048576),[retail.b.order.phonecall,0] ->
>> PartitionFetchInfo(1,1048576),[waimai_c_ucenter_loadrelation,45] ->
>> PartitionFetchInfo(53975,1048576),[waimai_c_ordertag_orderdealremark,1]
>> -> PartitionFetchInfo(1829848,1048576),[retail.d.ris.spider.jddj.request,0]
>> -> PartitionFetchInfo(5116337,1048576),[waimai_c_ucenter_asyncrelationbind_delay,13]
>> -> PartitionFetchInfo(0,1048576),[waimai_c_ucenter_asyncrelationbind_delay,55]
>> -> PartitionFetchInfo(0,1048576),[waimai_push_e_operate_prod,3] ->
>> PartitionFetchInfo(442564,1048576),[waimai_ordersa_
>> topic_user_order_in_poi_count_diff,5] -> PartitionFetchInfo(23791010,
>> 1048576),[retail.c.order.create,4] -> PartitionFetchInfo(72902,
>> 1048576),[waimai_c_ucenter_asyncrelationbind_staging,2] ->
>> PartitionFetchInfo(66,1048576),[waimai_c_order_orderevent_topic,35] ->
>> PartitionFetchInfo(0,1048576),[waimai_c_ucenter_syncuserrelation,43] ->
>> PartitionFetchInfo(0,1048576),[waimai_c_order_databus_wmorder,48] ->
>> PartitionFetchInfo(49496018,1048576),[waimai_c_monitor_orderstatus,32] ->
>> PartitionFetchInfo(50623699,1048576),[waimai_c_ucenter_loadrelation,15]
>> -> PartitionFetchInfo(54360,1048576),[waimai_c_monitor_orderstatus,2] ->
>> PartitionFetchInfo(50624881,1048576),[waimai_c_order_databus_wmorder,24]
>> -> PartitionFetchInfo(49548334,1048576),[waimai_c_order_databus_wmorder,18]
>> -> PartitionFetchInfo(49489397,1048576),[waimai_c_ucenter_asyncrelationbind,36]
>> -> PartitionFetchInfo(53430,1048576) (kafka.server.KafkaApis)
>>> kafka.common.NotLeaderForPartitionException: Leader not local for
>> partition [retail.d.ris.spider.request,1] on broker 2141642
>>>       at kafka.server.ReplicaManager.getLeaderReplicaIfLocal(
>> ReplicaManager.scala:296)
>>>       at kafka.server.DelayedFetch$$anonfun$tryComplete$1.apply(
>> DelayedFetch.scala:77)
>>>       at kafka.server.DelayedFetch$$anonfun$tryComplete$1.apply(
>> DelayedFetch.scala:72)
>>>       at scala.collection.immutable.HashMap$HashMap1.foreach(
>> HashMap.scala:224)
>>>       at scala.collection.immutable.HashMap$HashTrieMap.foreach(
>> HashMap.scala:403)
>>>       at scala.collection.immutable.HashMap$HashTrieMap.foreach(
>> HashMap.scala:403)
>>>       at kafka.server.DelayedFetch.tryComplete(DelayedFetch.scala:72)
>>>       at kafka.server.DelayedOperationPurgatory$
>> Watchers.tryCompleteWatched(DelayedOperation.scala:307)
>>>       at kafka.server.DelayedOperationPurgatory.checkAndComplete(
>> DelayedOperation.scala:227)
>>>       at kafka.server.ReplicaManager.tryCompleteDelayedFetch(
>> ReplicaManager.scala:202)
>>>       at kafka.cluster.Partition.tryCompleteDelayedRequests(
>> Partition.scala:372)
>>>       at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:294)
>>>       at kafka.cluster.Partition.updateReplicaLogReadResult(
>> Partition.scala:243)
>>>       at kafka.server.ReplicaManager$$anonfun$
>> updateFollowerLogReadResults$2.apply(ReplicaManager.scala:852)
>>>       at kafka.server.ReplicaManager$$anonfun$
>> updateFollowerLogReadResults$2.apply(ReplicaManager.scala:849)
>>>       at scala.collection.immutable.HashMap$HashMap1.foreach(
>> HashMap.scala:224)
>>>       at scala.collection.immutable.HashMap$HashTrieMap.foreach(
>> HashMap.scala:403)
>>>       at scala.collection.immutable.HashMap$HashTrieMap.foreach(
>> HashMap.scala:403)
>>>       at kafka.server.ReplicaManager.updateFollowerLogReadResults(
>> ReplicaManager.scala:849)
>>>       at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.
>> scala:467)
>>>       at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:434)
>>>       at kafka.server.KafkaApis.handle(KafkaApis.scala:69)
>>>       at kafka.server.KafkaRequestHandler.run(
>> KafkaRequestHandler.scala:60)
>>>       at java.lang.Thread.run(Thread.java:745)
>>> 
>>> 
>>> what confused me is that,retail.d.ris.spider.request is not contained
>> in this request,why will log it in handleFetchRequest,how can it happen and
>> how to resolve it?
>>> 
>>> 
>> 
>> 
>> 
> 
> 
> -- 
> -- Guozhang



Re: handleFetchRequest throw exception

Posted by Guozhang Wang <wa...@gmail.com>.
Json,

As you mentioned yourself the "NotLeaderForPartitionException" thrown
from getLeaderReplicaIfLocal
should be caught in the end, and hence I'm not sure why the reported stack
trace "ERROR: ..." throwing the NotLeaderForPartitionException should be
seen from "tryComplete". Also I have checked the source code in both
0.9.0.0 and 0.8.2.2, their line numbers does not match with the reported
stack trace line numbers (e.g. DelayedFetch.scala:72), so I cannot really
tell why you could ever see the error message instead of the
DEBUG-level "Broker
is no longer the leader of %s, satisfy %s immediately..".

Following the 0.9.0.0 source code, since it NotLeaderForPartitionException
is caught and force the delayed fetch request to be sent with some
potential error code, it will not cause the replica's fetch request to be
not return successfully to the fetch broker, and hence should not leader
producer / consumer to fail for a long time. Similarly, since we force
completing those delayed fetch requests as well, it should not cause a spam
of repeated error log entries since it should at most print one entry (and
should be DEBUG not ERROR) for each delayed request whose partition leaders
have migrated out.



Guozhang



On Wed, Oct 26, 2016 at 7:46 AM, Json Tu <ka...@126.com> wrote:

>  it make the cluster can not provide normal service,which leades some
> producer or fetch fail for a long time before I restart current broker.
>  this error may be come from some formerly fetch operation which contain
> this partition,which leads many fetch response error.
>
> The delayFetch's tryComplete() function implements as below,
>  override def tryComplete() : Boolean = {
>  var accumulatedSize = 0
>  fetchMetadata.fetchPartitionStatus.foreach {
>    case (topicAndPartition, fetchStatus) =>
>      val fetchOffset = fetchStatus.startOffsetMetadata
>      try {
>        if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
>          val replica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic,
> topicAndPartition.partition)
>          /*ignore some codes*/
>        }
>      } catch {
>        /*ignore some code*/
>        case nle: NotLeaderForPartitionException =>  // Case A
>          debug("Broker is no longer the leader of %s, satisfy %s
> immediately".format(topicAndPartition, fetchMetadata))
>          return forceComplete()
>      }
>  }
>  /* ignore some codes */
> }
>
> when meet NotLeaderForPartitionException, it will invoke forceComplete()
> function, then it will invoke onComplete() function, which implements as
> below,
> override def onComplete() {
>  val logReadResults = replicaManager.readFromLocalLog(
> fetchMetadata.fetchOnlyLeader,
>    fetchMetadata.fetchOnlyCommitted,
>    fetchMetadata.fetchPartitionStatus.mapValues(status =>
> status.fetchInfo))
>
>  val fetchPartitionData = logReadResults.mapValues(result =>
>    FetchResponsePartitionData(result.errorCode, result.hw,
> result.info.messageSet))
>
>  responseCallback(fetchPartitionData)
> }
>
> so, I think it exit the tryComplete function in advance because of this
> partition, which makes the partition latter in this request may not be
> completely be satisfied and return to the fetch broker,
> which leads some producer and consumer fail for a longtime,I don’t know is
> it correct
>
> > 在 2016年10月25日,下午8:32,Json Tu <ka...@126.com> 写道:
> >
> > Hi all,
> >       I use Kafka 0.9.0.0, and we have a cluster with 6 nodes, when I
> restart a broker,we find there are many logs as below,
> >
> > [2016-10-24 15:29:00,914] ERROR [KafkaApi-2141642] error when handling
> request Name: FetchRequest; Version: 1; CorrelationId: 4928; ClientId:
> ReplicaFetcherThread-0-2141642; ReplicaId: 2141386; MaxWait: 500 ms;
> MinBytes: 1 bytes; RequestInfo: [retail.c.order.logistics,0] ->
> PartitionFetchInfo(215258,1048576),[waimai_c_ugc_msg,1] ->
> PartitionFetchInfo(12426588,1048576),[waimai_c_ucenter_asyncrelationbind_delay,25]
> -> PartitionFetchInfo(0,1048576),[waimai_c_order_databus_wmorder,44] ->
> PartitionFetchInfo(49555913,1048576),[__consumer_offsets,23] ->
> PartitionFetchInfo(11846051,1048576),[retail.m.sp.sku.update,3] ->
> PartitionFetchInfo(21563,1048576),[waimai_c_monitor_orderlogisticsstatus,28]
> -> PartitionFetchInfo(26926356,1048576),[waimai_c_ucenter_loadrelation,0]
> -> PartitionFetchInfo(54583,1048576),[__consumer_offsets,29] ->
> PartitionFetchInfo(23479045,1048576),[waimai_c_order_databus_wmorder,14]
> -> PartitionFetchInfo(49568225,1048576),[waimai_c_ordertag_orderdealremark,31]
> -> PartitionFetchInfo(1829838,1048576),[retail.d.ris.spider.request,0] ->
> PartitionFetchInfo(709845,1048576),[__consumer_offsets,13] ->
> PartitionFetchInfo(9376691,1048576),[waimai_c_ugc_msg_staging,2] ->
> PartitionFetchInfo(38,1048576),[retail.b.openapi.push.retry.stage,0] ->
> PartitionFetchInfo(0,1048576),[waimai_c_ucenter_favoritepoi,15] ->
> PartitionFetchInfo(390045,1048576),[retail.b.order.phonecall,0] ->
> PartitionFetchInfo(1,1048576),[waimai_c_ucenter_loadrelation,45] ->
> PartitionFetchInfo(53975,1048576),[waimai_c_ordertag_orderdealremark,1]
> -> PartitionFetchInfo(1829848,1048576),[retail.d.ris.spider.jddj.request,0]
> -> PartitionFetchInfo(5116337,1048576),[waimai_c_ucenter_asyncrelationbind_delay,13]
> -> PartitionFetchInfo(0,1048576),[waimai_c_ucenter_asyncrelationbind_delay,55]
> -> PartitionFetchInfo(0,1048576),[waimai_push_e_operate_prod,3] ->
> PartitionFetchInfo(442564,1048576),[waimai_ordersa_
> topic_user_order_in_poi_count_diff,5] -> PartitionFetchInfo(23791010,
> 1048576),[retail.c.order.create,4] -> PartitionFetchInfo(72902,
> 1048576),[waimai_c_ucenter_asyncrelationbind_staging,2] ->
> PartitionFetchInfo(66,1048576),[waimai_c_order_orderevent_topic,35] ->
> PartitionFetchInfo(0,1048576),[waimai_c_ucenter_syncuserrelation,43] ->
> PartitionFetchInfo(0,1048576),[waimai_c_order_databus_wmorder,48] ->
> PartitionFetchInfo(49496018,1048576),[waimai_c_monitor_orderstatus,32] ->
> PartitionFetchInfo(50623699,1048576),[waimai_c_ucenter_loadrelation,15]
> -> PartitionFetchInfo(54360,1048576),[waimai_c_monitor_orderstatus,2] ->
> PartitionFetchInfo(50624881,1048576),[waimai_c_order_databus_wmorder,24]
> -> PartitionFetchInfo(49548334,1048576),[waimai_c_order_databus_wmorder,18]
> -> PartitionFetchInfo(49489397,1048576),[waimai_c_ucenter_asyncrelationbind,36]
> -> PartitionFetchInfo(53430,1048576) (kafka.server.KafkaApis)
> > kafka.common.NotLeaderForPartitionException: Leader not local for
> partition [retail.d.ris.spider.request,1] on broker 2141642
> >        at kafka.server.ReplicaManager.getLeaderReplicaIfLocal(
> ReplicaManager.scala:296)
> >        at kafka.server.DelayedFetch$$anonfun$tryComplete$1.apply(
> DelayedFetch.scala:77)
> >        at kafka.server.DelayedFetch$$anonfun$tryComplete$1.apply(
> DelayedFetch.scala:72)
> >        at scala.collection.immutable.HashMap$HashMap1.foreach(
> HashMap.scala:224)
> >        at scala.collection.immutable.HashMap$HashTrieMap.foreach(
> HashMap.scala:403)
> >        at scala.collection.immutable.HashMap$HashTrieMap.foreach(
> HashMap.scala:403)
> >        at kafka.server.DelayedFetch.tryComplete(DelayedFetch.scala:72)
> >        at kafka.server.DelayedOperationPurgatory$
> Watchers.tryCompleteWatched(DelayedOperation.scala:307)
> >        at kafka.server.DelayedOperationPurgatory.checkAndComplete(
> DelayedOperation.scala:227)
> >        at kafka.server.ReplicaManager.tryCompleteDelayedFetch(
> ReplicaManager.scala:202)
> >        at kafka.cluster.Partition.tryCompleteDelayedRequests(
> Partition.scala:372)
> >        at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:294)
> >        at kafka.cluster.Partition.updateReplicaLogReadResult(
> Partition.scala:243)
> >        at kafka.server.ReplicaManager$$anonfun$
> updateFollowerLogReadResults$2.apply(ReplicaManager.scala:852)
> >        at kafka.server.ReplicaManager$$anonfun$
> updateFollowerLogReadResults$2.apply(ReplicaManager.scala:849)
> >        at scala.collection.immutable.HashMap$HashMap1.foreach(
> HashMap.scala:224)
> >        at scala.collection.immutable.HashMap$HashTrieMap.foreach(
> HashMap.scala:403)
> >        at scala.collection.immutable.HashMap$HashTrieMap.foreach(
> HashMap.scala:403)
> >        at kafka.server.ReplicaManager.updateFollowerLogReadResults(
> ReplicaManager.scala:849)
> >        at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.
> scala:467)
> >        at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:434)
> >        at kafka.server.KafkaApis.handle(KafkaApis.scala:69)
> >        at kafka.server.KafkaRequestHandler.run(
> KafkaRequestHandler.scala:60)
> >        at java.lang.Thread.run(Thread.java:745)
> >
> >
> > what confused me is that,retail.d.ris.spider.request is not contained
> in this request,why will log it in handleFetchRequest,how can it happen and
> how to resolve it?
> >
> >
>
>
>


-- 
-- Guozhang

Re: handleFetchRequest throw exception

Posted by Guozhang Wang <wa...@gmail.com>.
Json,

As you mentioned yourself the "NotLeaderForPartitionException" thrown
from getLeaderReplicaIfLocal
should be caught in the end, and hence I'm not sure why the reported stack
trace "ERROR: ..." throwing the NotLeaderForPartitionException should be
seen from "tryComplete". Also I have checked the source code in both
0.9.0.0 and 0.8.2.2, their line numbers does not match with the reported
stack trace line numbers (e.g. DelayedFetch.scala:72), so I cannot really
tell why you could ever see the error message instead of the
DEBUG-level "Broker
is no longer the leader of %s, satisfy %s immediately..".

Following the 0.9.0.0 source code, since it NotLeaderForPartitionException
is caught and force the delayed fetch request to be sent with some
potential error code, it will not cause the replica's fetch request to be
not return successfully to the fetch broker, and hence should not leader
producer / consumer to fail for a long time. Similarly, since we force
completing those delayed fetch requests as well, it should not cause a spam
of repeated error log entries since it should at most print one entry (and
should be DEBUG not ERROR) for each delayed request whose partition leaders
have migrated out.



Guozhang



On Wed, Oct 26, 2016 at 7:46 AM, Json Tu <ka...@126.com> wrote:

>  it make the cluster can not provide normal service,which leades some
> producer or fetch fail for a long time before I restart current broker.
>  this error may be come from some formerly fetch operation which contain
> this partition,which leads many fetch response error.
>
> The delayFetch's tryComplete() function implements as below,
>  override def tryComplete() : Boolean = {
>  var accumulatedSize = 0
>  fetchMetadata.fetchPartitionStatus.foreach {
>    case (topicAndPartition, fetchStatus) =>
>      val fetchOffset = fetchStatus.startOffsetMetadata
>      try {
>        if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
>          val replica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic,
> topicAndPartition.partition)
>          /*ignore some codes*/
>        }
>      } catch {
>        /*ignore some code*/
>        case nle: NotLeaderForPartitionException =>  // Case A
>          debug("Broker is no longer the leader of %s, satisfy %s
> immediately".format(topicAndPartition, fetchMetadata))
>          return forceComplete()
>      }
>  }
>  /* ignore some codes */
> }
>
> when meet NotLeaderForPartitionException, it will invoke forceComplete()
> function, then it will invoke onComplete() function, which implements as
> below,
> override def onComplete() {
>  val logReadResults = replicaManager.readFromLocalLog(
> fetchMetadata.fetchOnlyLeader,
>    fetchMetadata.fetchOnlyCommitted,
>    fetchMetadata.fetchPartitionStatus.mapValues(status =>
> status.fetchInfo))
>
>  val fetchPartitionData = logReadResults.mapValues(result =>
>    FetchResponsePartitionData(result.errorCode, result.hw,
> result.info.messageSet))
>
>  responseCallback(fetchPartitionData)
> }
>
> so, I think it exit the tryComplete function in advance because of this
> partition, which makes the partition latter in this request may not be
> completely be satisfied and return to the fetch broker,
> which leads some producer and consumer fail for a longtime,I don’t know is
> it correct
>
> > 在 2016年10月25日,下午8:32,Json Tu <ka...@126.com> 写道:
> >
> > Hi all,
> >       I use Kafka 0.9.0.0, and we have a cluster with 6 nodes, when I
> restart a broker,we find there are many logs as below,
> >
> > [2016-10-24 15:29:00,914] ERROR [KafkaApi-2141642] error when handling
> request Name: FetchRequest; Version: 1; CorrelationId: 4928; ClientId:
> ReplicaFetcherThread-0-2141642; ReplicaId: 2141386; MaxWait: 500 ms;
> MinBytes: 1 bytes; RequestInfo: [retail.c.order.logistics,0] ->
> PartitionFetchInfo(215258,1048576),[waimai_c_ugc_msg,1] ->
> PartitionFetchInfo(12426588,1048576),[waimai_c_ucenter_asyncrelationbind_delay,25]
> -> PartitionFetchInfo(0,1048576),[waimai_c_order_databus_wmorder,44] ->
> PartitionFetchInfo(49555913,1048576),[__consumer_offsets,23] ->
> PartitionFetchInfo(11846051,1048576),[retail.m.sp.sku.update,3] ->
> PartitionFetchInfo(21563,1048576),[waimai_c_monitor_orderlogisticsstatus,28]
> -> PartitionFetchInfo(26926356,1048576),[waimai_c_ucenter_loadrelation,0]
> -> PartitionFetchInfo(54583,1048576),[__consumer_offsets,29] ->
> PartitionFetchInfo(23479045,1048576),[waimai_c_order_databus_wmorder,14]
> -> PartitionFetchInfo(49568225,1048576),[waimai_c_ordertag_orderdealremark,31]
> -> PartitionFetchInfo(1829838,1048576),[retail.d.ris.spider.request,0] ->
> PartitionFetchInfo(709845,1048576),[__consumer_offsets,13] ->
> PartitionFetchInfo(9376691,1048576),[waimai_c_ugc_msg_staging,2] ->
> PartitionFetchInfo(38,1048576),[retail.b.openapi.push.retry.stage,0] ->
> PartitionFetchInfo(0,1048576),[waimai_c_ucenter_favoritepoi,15] ->
> PartitionFetchInfo(390045,1048576),[retail.b.order.phonecall,0] ->
> PartitionFetchInfo(1,1048576),[waimai_c_ucenter_loadrelation,45] ->
> PartitionFetchInfo(53975,1048576),[waimai_c_ordertag_orderdealremark,1]
> -> PartitionFetchInfo(1829848,1048576),[retail.d.ris.spider.jddj.request,0]
> -> PartitionFetchInfo(5116337,1048576),[waimai_c_ucenter_asyncrelationbind_delay,13]
> -> PartitionFetchInfo(0,1048576),[waimai_c_ucenter_asyncrelationbind_delay,55]
> -> PartitionFetchInfo(0,1048576),[waimai_push_e_operate_prod,3] ->
> PartitionFetchInfo(442564,1048576),[waimai_ordersa_
> topic_user_order_in_poi_count_diff,5] -> PartitionFetchInfo(23791010,
> 1048576),[retail.c.order.create,4] -> PartitionFetchInfo(72902,
> 1048576),[waimai_c_ucenter_asyncrelationbind_staging,2] ->
> PartitionFetchInfo(66,1048576),[waimai_c_order_orderevent_topic,35] ->
> PartitionFetchInfo(0,1048576),[waimai_c_ucenter_syncuserrelation,43] ->
> PartitionFetchInfo(0,1048576),[waimai_c_order_databus_wmorder,48] ->
> PartitionFetchInfo(49496018,1048576),[waimai_c_monitor_orderstatus,32] ->
> PartitionFetchInfo(50623699,1048576),[waimai_c_ucenter_loadrelation,15]
> -> PartitionFetchInfo(54360,1048576),[waimai_c_monitor_orderstatus,2] ->
> PartitionFetchInfo(50624881,1048576),[waimai_c_order_databus_wmorder,24]
> -> PartitionFetchInfo(49548334,1048576),[waimai_c_order_databus_wmorder,18]
> -> PartitionFetchInfo(49489397,1048576),[waimai_c_ucenter_asyncrelationbind,36]
> -> PartitionFetchInfo(53430,1048576) (kafka.server.KafkaApis)
> > kafka.common.NotLeaderForPartitionException: Leader not local for
> partition [retail.d.ris.spider.request,1] on broker 2141642
> >        at kafka.server.ReplicaManager.getLeaderReplicaIfLocal(
> ReplicaManager.scala:296)
> >        at kafka.server.DelayedFetch$$anonfun$tryComplete$1.apply(
> DelayedFetch.scala:77)
> >        at kafka.server.DelayedFetch$$anonfun$tryComplete$1.apply(
> DelayedFetch.scala:72)
> >        at scala.collection.immutable.HashMap$HashMap1.foreach(
> HashMap.scala:224)
> >        at scala.collection.immutable.HashMap$HashTrieMap.foreach(
> HashMap.scala:403)
> >        at scala.collection.immutable.HashMap$HashTrieMap.foreach(
> HashMap.scala:403)
> >        at kafka.server.DelayedFetch.tryComplete(DelayedFetch.scala:72)
> >        at kafka.server.DelayedOperationPurgatory$
> Watchers.tryCompleteWatched(DelayedOperation.scala:307)
> >        at kafka.server.DelayedOperationPurgatory.checkAndComplete(
> DelayedOperation.scala:227)
> >        at kafka.server.ReplicaManager.tryCompleteDelayedFetch(
> ReplicaManager.scala:202)
> >        at kafka.cluster.Partition.tryCompleteDelayedRequests(
> Partition.scala:372)
> >        at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:294)
> >        at kafka.cluster.Partition.updateReplicaLogReadResult(
> Partition.scala:243)
> >        at kafka.server.ReplicaManager$$anonfun$
> updateFollowerLogReadResults$2.apply(ReplicaManager.scala:852)
> >        at kafka.server.ReplicaManager$$anonfun$
> updateFollowerLogReadResults$2.apply(ReplicaManager.scala:849)
> >        at scala.collection.immutable.HashMap$HashMap1.foreach(
> HashMap.scala:224)
> >        at scala.collection.immutable.HashMap$HashTrieMap.foreach(
> HashMap.scala:403)
> >        at scala.collection.immutable.HashMap$HashTrieMap.foreach(
> HashMap.scala:403)
> >        at kafka.server.ReplicaManager.updateFollowerLogReadResults(
> ReplicaManager.scala:849)
> >        at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.
> scala:467)
> >        at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:434)
> >        at kafka.server.KafkaApis.handle(KafkaApis.scala:69)
> >        at kafka.server.KafkaRequestHandler.run(
> KafkaRequestHandler.scala:60)
> >        at java.lang.Thread.run(Thread.java:745)
> >
> >
> > what confused me is that,retail.d.ris.spider.request is not contained
> in this request,why will log it in handleFetchRequest,how can it happen and
> how to resolve it?
> >
> >
>
>
>


-- 
-- Guozhang

Re: handleFetchRequest throw exception

Posted by Json Tu <ka...@126.com>.
 it make the cluster can not provide normal service,which leades some producer or fetch fail for a long time before I restart current broker.
 this error may be come from some formerly fetch operation which contain this partition,which leads many fetch response error.

The delayFetch's tryComplete() function implements as below,
 override def tryComplete() : Boolean = {
 var accumulatedSize = 0
 fetchMetadata.fetchPartitionStatus.foreach {
   case (topicAndPartition, fetchStatus) =>
     val fetchOffset = fetchStatus.startOffsetMetadata
     try {
       if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
         val replica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition)
         /*ignore some codes*/
       }
     } catch {
       /*ignore some code*/
       case nle: NotLeaderForPartitionException =>  // Case A
         debug("Broker is no longer the leader of %s, satisfy %s immediately".format(topicAndPartition, fetchMetadata))
         return forceComplete()
     }
 }
 /* ignore some codes */
}

when meet NotLeaderForPartitionException, it will invoke forceComplete() function, then it will invoke onComplete() function, which implements as below,
override def onComplete() {
 val logReadResults = replicaManager.readFromLocalLog(fetchMetadata.fetchOnlyLeader,
   fetchMetadata.fetchOnlyCommitted,
   fetchMetadata.fetchPartitionStatus.mapValues(status => status.fetchInfo))

 val fetchPartitionData = logReadResults.mapValues(result =>
   FetchResponsePartitionData(result.errorCode, result.hw, result.info.messageSet))

 responseCallback(fetchPartitionData)
}

so, I think it exit the tryComplete function in advance because of this partition, which makes the partition latter in this request may not be completely be satisfied and return to the fetch broker,
which leads some producer and consumer fail for a longtime,I don’t know is it correct

> 在 2016年10月25日,下午8:32,Json Tu <ka...@126.com> 写道:
> 
> Hi all,
> 	I use Kafka 0.9.0.0, and we have a cluster with 6 nodes, when I restart a broker,we find there are many logs as below,
> 
> [2016-10-24 15:29:00,914] ERROR [KafkaApi-2141642] error when handling request Name: FetchRequest; Version: 1; CorrelationId: 4928; ClientId: ReplicaFetcherThread-0-2141642; ReplicaId: 2141386; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [retail.c.order.logistics,0] -> PartitionFetchInfo(215258,1048576),[waimai_c_ugc_msg,1] -> PartitionFetchInfo(12426588,1048576),[waimai_c_ucenter_asyncrelationbind_delay,25] -> PartitionFetchInfo(0,1048576),[waimai_c_order_databus_wmorder,44] -> PartitionFetchInfo(49555913,1048576),[__consumer_offsets,23] -> PartitionFetchInfo(11846051,1048576),[retail.m.sp.sku.update,3] -> PartitionFetchInfo(21563,1048576),[waimai_c_monitor_orderlogisticsstatus,28] -> PartitionFetchInfo(26926356,1048576),[waimai_c_ucenter_loadrelation,0] -> PartitionFetchInfo(54583,1048576),[__consumer_offsets,29] -> PartitionFetchInfo(23479045,1048576),[waimai_c_order_databus_wmorder,14] -> PartitionFetchInfo(49568225,1048576),[waimai_c_ordertag_orderdealremark,31] -> PartitionFetchInfo(1829838,1048576),[retail.d.ris.spider.request,0] -> PartitionFetchInfo(709845,1048576),[__consumer_offsets,13] -> PartitionFetchInfo(9376691,1048576),[waimai_c_ugc_msg_staging,2] -> PartitionFetchInfo(38,1048576),[retail.b.openapi.push.retry.stage,0] -> PartitionFetchInfo(0,1048576),[waimai_c_ucenter_favoritepoi,15] -> PartitionFetchInfo(390045,1048576),[retail.b.order.phonecall,0] -> PartitionFetchInfo(1,1048576),[waimai_c_ucenter_loadrelation,45] -> PartitionFetchInfo(53975,1048576),[waimai_c_ordertag_orderdealremark,1] -> PartitionFetchInfo(1829848,1048576),[retail.d.ris.spider.jddj.request,0] -> PartitionFetchInfo(5116337,1048576),[waimai_c_ucenter_asyncrelationbind_delay,13] -> PartitionFetchInfo(0,1048576),[waimai_c_ucenter_asyncrelationbind_delay,55] -> PartitionFetchInfo(0,1048576),[waimai_push_e_operate_prod,3] -> PartitionFetchInfo(442564,1048576),[waimai_ordersa_topic_user_order_in_poi_count_diff,5] -> PartitionFetchInfo(23791010,1048576),[retail.c.order.create,4] -> PartitionFetchInfo(72902,1048576),[waimai_c_ucenter_asyncrelationbind_staging,2] -> PartitionFetchInfo(66,1048576),[waimai_c_order_orderevent_topic,35] -> PartitionFetchInfo(0,1048576),[waimai_c_ucenter_syncuserrelation,43] -> PartitionFetchInfo(0,1048576),[waimai_c_order_databus_wmorder,48] -> PartitionFetchInfo(49496018,1048576),[waimai_c_monitor_orderstatus,32] -> PartitionFetchInfo(50623699,1048576),[waimai_c_ucenter_loadrelation,15] -> PartitionFetchInfo(54360,1048576),[waimai_c_monitor_orderstatus,2] -> PartitionFetchInfo(50624881,1048576),[waimai_c_order_databus_wmorder,24] -> PartitionFetchInfo(49548334,1048576),[waimai_c_order_databus_wmorder,18] -> PartitionFetchInfo(49489397,1048576),[waimai_c_ucenter_asyncrelationbind,36] -> PartitionFetchInfo(53430,1048576) (kafka.server.KafkaApis)
> kafka.common.NotLeaderForPartitionException: Leader not local for partition [retail.d.ris.spider.request,1] on broker 2141642
>        at kafka.server.ReplicaManager.getLeaderReplicaIfLocal(ReplicaManager.scala:296)
>        at kafka.server.DelayedFetch$$anonfun$tryComplete$1.apply(DelayedFetch.scala:77)
>        at kafka.server.DelayedFetch$$anonfun$tryComplete$1.apply(DelayedFetch.scala:72)
>        at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
>        at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
>        at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
>        at kafka.server.DelayedFetch.tryComplete(DelayedFetch.scala:72)
>        at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307)
>        at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227)
>        at kafka.server.ReplicaManager.tryCompleteDelayedFetch(ReplicaManager.scala:202)
>        at kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:372)
>        at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:294)
>        at kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:243)
>        at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:852)
>        at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:849)
>        at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
>        at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
>        at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
>        at kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:849)
>        at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:467)
>        at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:434)
>        at kafka.server.KafkaApis.handle(KafkaApis.scala:69)
>        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>        at java.lang.Thread.run(Thread.java:745)
> 
> 
> what confused me is that,retail.d.ris.spider.request is not contained in this request,why will log it in handleFetchRequest,how can it happen and how to resolve it?
> 
> 



Re: handleFetchRequest throw exception

Posted by Json Tu <ka...@126.com>.
 it make the cluster can not provide normal service,which leades some producer or fetch fail for a long time before I restart current broker.
 this error may be come from some formerly fetch operation which contain this partition,which leads many fetch response error.

The delayFetch's tryComplete() function implements as below,
 override def tryComplete() : Boolean = {
 var accumulatedSize = 0
 fetchMetadata.fetchPartitionStatus.foreach {
   case (topicAndPartition, fetchStatus) =>
     val fetchOffset = fetchStatus.startOffsetMetadata
     try {
       if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
         val replica = replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition)
         /*ignore some codes*/
       }
     } catch {
       /*ignore some code*/
       case nle: NotLeaderForPartitionException =>  // Case A
         debug("Broker is no longer the leader of %s, satisfy %s immediately".format(topicAndPartition, fetchMetadata))
         return forceComplete()
     }
 }
 /* ignore some codes */
}

when meet NotLeaderForPartitionException, it will invoke forceComplete() function, then it will invoke onComplete() function, which implements as below,
override def onComplete() {
 val logReadResults = replicaManager.readFromLocalLog(fetchMetadata.fetchOnlyLeader,
   fetchMetadata.fetchOnlyCommitted,
   fetchMetadata.fetchPartitionStatus.mapValues(status => status.fetchInfo))

 val fetchPartitionData = logReadResults.mapValues(result =>
   FetchResponsePartitionData(result.errorCode, result.hw, result.info.messageSet))

 responseCallback(fetchPartitionData)
}

so, I think it exit the tryComplete function in advance because of this partition, which makes the partition latter in this request may not be completely be satisfied and return to the fetch broker,
which leads some producer and consumer fail for a longtime,I don’t know is it correct

> 在 2016年10月25日,下午8:32,Json Tu <ka...@126.com> 写道:
> 
> Hi all,
> 	I use Kafka 0.9.0.0, and we have a cluster with 6 nodes, when I restart a broker,we find there are many logs as below,
> 
> [2016-10-24 15:29:00,914] ERROR [KafkaApi-2141642] error when handling request Name: FetchRequest; Version: 1; CorrelationId: 4928; ClientId: ReplicaFetcherThread-0-2141642; ReplicaId: 2141386; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [retail.c.order.logistics,0] -> PartitionFetchInfo(215258,1048576),[waimai_c_ugc_msg,1] -> PartitionFetchInfo(12426588,1048576),[waimai_c_ucenter_asyncrelationbind_delay,25] -> PartitionFetchInfo(0,1048576),[waimai_c_order_databus_wmorder,44] -> PartitionFetchInfo(49555913,1048576),[__consumer_offsets,23] -> PartitionFetchInfo(11846051,1048576),[retail.m.sp.sku.update,3] -> PartitionFetchInfo(21563,1048576),[waimai_c_monitor_orderlogisticsstatus,28] -> PartitionFetchInfo(26926356,1048576),[waimai_c_ucenter_loadrelation,0] -> PartitionFetchInfo(54583,1048576),[__consumer_offsets,29] -> PartitionFetchInfo(23479045,1048576),[waimai_c_order_databus_wmorder,14] -> PartitionFetchInfo(49568225,1048576),[waimai_c_ordertag_orderdealremark,31] -> PartitionFetchInfo(1829838,1048576),[retail.d.ris.spider.request,0] -> PartitionFetchInfo(709845,1048576),[__consumer_offsets,13] -> PartitionFetchInfo(9376691,1048576),[waimai_c_ugc_msg_staging,2] -> PartitionFetchInfo(38,1048576),[retail.b.openapi.push.retry.stage,0] -> PartitionFetchInfo(0,1048576),[waimai_c_ucenter_favoritepoi,15] -> PartitionFetchInfo(390045,1048576),[retail.b.order.phonecall,0] -> PartitionFetchInfo(1,1048576),[waimai_c_ucenter_loadrelation,45] -> PartitionFetchInfo(53975,1048576),[waimai_c_ordertag_orderdealremark,1] -> PartitionFetchInfo(1829848,1048576),[retail.d.ris.spider.jddj.request,0] -> PartitionFetchInfo(5116337,1048576),[waimai_c_ucenter_asyncrelationbind_delay,13] -> PartitionFetchInfo(0,1048576),[waimai_c_ucenter_asyncrelationbind_delay,55] -> PartitionFetchInfo(0,1048576),[waimai_push_e_operate_prod,3] -> PartitionFetchInfo(442564,1048576),[waimai_ordersa_topic_user_order_in_poi_count_diff,5] -> PartitionFetchInfo(23791010,1048576),[retail.c.order.create,4] -> PartitionFetchInfo(72902,1048576),[waimai_c_ucenter_asyncrelationbind_staging,2] -> PartitionFetchInfo(66,1048576),[waimai_c_order_orderevent_topic,35] -> PartitionFetchInfo(0,1048576),[waimai_c_ucenter_syncuserrelation,43] -> PartitionFetchInfo(0,1048576),[waimai_c_order_databus_wmorder,48] -> PartitionFetchInfo(49496018,1048576),[waimai_c_monitor_orderstatus,32] -> PartitionFetchInfo(50623699,1048576),[waimai_c_ucenter_loadrelation,15] -> PartitionFetchInfo(54360,1048576),[waimai_c_monitor_orderstatus,2] -> PartitionFetchInfo(50624881,1048576),[waimai_c_order_databus_wmorder,24] -> PartitionFetchInfo(49548334,1048576),[waimai_c_order_databus_wmorder,18] -> PartitionFetchInfo(49489397,1048576),[waimai_c_ucenter_asyncrelationbind,36] -> PartitionFetchInfo(53430,1048576) (kafka.server.KafkaApis)
> kafka.common.NotLeaderForPartitionException: Leader not local for partition [retail.d.ris.spider.request,1] on broker 2141642
>        at kafka.server.ReplicaManager.getLeaderReplicaIfLocal(ReplicaManager.scala:296)
>        at kafka.server.DelayedFetch$$anonfun$tryComplete$1.apply(DelayedFetch.scala:77)
>        at kafka.server.DelayedFetch$$anonfun$tryComplete$1.apply(DelayedFetch.scala:72)
>        at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
>        at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
>        at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
>        at kafka.server.DelayedFetch.tryComplete(DelayedFetch.scala:72)
>        at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307)
>        at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227)
>        at kafka.server.ReplicaManager.tryCompleteDelayedFetch(ReplicaManager.scala:202)
>        at kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:372)
>        at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:294)
>        at kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:243)
>        at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:852)
>        at kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:849)
>        at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
>        at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
>        at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
>        at kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:849)
>        at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:467)
>        at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:434)
>        at kafka.server.KafkaApis.handle(KafkaApis.scala:69)
>        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>        at java.lang.Thread.run(Thread.java:745)
> 
> 
> what confused me is that,retail.d.ris.spider.request is not contained in this request,why will log it in handleFetchRequest,how can it happen and how to resolve it?
> 
>