You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Warren Jin (JIRA)" <ji...@apache.org> on 2015/09/23 10:58:04 UTC

[jira] [Updated] (KAFKA-2575) inconsistant offset count in replication-offset-checkpoint

     [ https://issues.apache.org/jira/browse/KAFKA-2575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Warren Jin updated KAFKA-2575:
------------------------------
    Summary: inconsistant offset count in replication-offset-checkpoint  (was: kafka.server.OffsetCheckpoint found inconsistant offset entry count, lead to NotAssignedReplicaException)

> inconsistant offset count in replication-offset-checkpoint
> ----------------------------------------------------------
>
>                 Key: KAFKA-2575
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2575
>             Project: Kafka
>          Issue Type: Bug
>          Components: kafka streams
>    Affects Versions: 0.8.2.1
>            Reporter: Warren Jin
>
> We have more than 100 topics in production, the default partition number is 24 for each topic.
> We noticed the following errors in recent days.
> 2015-09-22 22:25:12,529 ERROR Error on broker 1 while processing LeaderAndIsr request correlationId 438501 received from controller 2 epoch 12 for partition [LOGIST.DELIVERY.SUBSCRIBE,7] (state.change.logger)
> java.io.IOException: Expected 3918 entries but found only 3904
> 	at kafka.server.OffsetCheckpoint.read(OffsetCheckpoint.scala:99)
> 	at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:91)
> 	at kafka.cluster.Partition$$anonfun$makeLeader$1$$anonfun$apply$mcZ$sp$4.apply(Partition.scala:171)
> 	at kafka.cluster.Partition$$anonfun$makeLeader$1$$anonfun$apply$mcZ$sp$4.apply(Partition.scala:171)
> 	at scala.collection.immutable.Set$Set3.foreach(Set.scala:115)
> 	at kafka.cluster.Partition$$anonfun$makeLeader$1.apply$mcZ$sp(Partition.scala:171)
> 	at kafka.cluster.Partition$$anonfun$makeLeader$1.apply(Partition.scala:163)
> 	at kafka.cluster.Partition$$anonfun$makeLeader$1.apply(Partition.scala:163)
> 	at kafka.utils.Utils$.inLock(Utils.scala:535)
> 	at kafka.utils.Utils$.inWriteLock(Utils.scala:543)
> 	at kafka.cluster.Partition.makeLeader(Partition.scala:163)
> 	at kafka.server.ReplicaManager$$anonfun$makeLeaders$5.apply(ReplicaManager.scala:427)
> 	at kafka.server.ReplicaManager$$anonfun$makeLeaders$5.apply(ReplicaManager.scala:426)
> 	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> 	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> 	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> 	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> 	at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> 	at kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:426)
> 	at kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:378)
> 	at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:120)
> 	at kafka.server.KafkaApis.handle(KafkaApis.scala:63)
> 	at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
> 	at java.lang.Thread.run(Thread.java:745)
> the it repeatly pring out the error message:
> 2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request with correlation id 14943530 from client ReplicaFetcherThread-2-1 on partition [LOGIST.DELIVERY.SUBSCRIBE,22] failed due to Leader not local for partition [LOGIST.DELIVERY.SUBSCRIBE,22] on broker 1 (kafka.server.ReplicaManager)
> 2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request with correlation id 15022337 from client ReplicaFetcherThread-1-1 on partition [LOGIST.DELIVERY.SUBSCRIBE,1] failed due to Leader not local for partition [LOGIST.DELIVERY.SUBSCRIBE,1] on broker 1 (kafka.server.ReplicaManager)
> 2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request with correlation id 15078431 from client ReplicaFetcherThread-0-1 on partition [LOGIST.DELIVERY.SUBSCRIBE,4] failed due to Leader not local for partition [LOGIST.DELIVERY.SUBSCRIBE,4] on broker 1 (kafka.server.ReplicaManager)
> 2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request with correlation id 13477660 from client ReplicaFetcherThread-2-1 on partition [LOGIST.DELIVERY.SUBSCRIBE,10] failed due to Leader not local for partition [LOGIST.DELIVERY.SUBSCRIBE,10] on broker 1 (kafka.server.ReplicaManager)
> 2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request with correlation id 15022337 from client ReplicaFetcherThread-1-1 on partition [LOGIST.DELIVERY.SUBSCRIBE,13] failed due to Leader not local for partition [LOGIST.DELIVERY.SUBSCRIBE,13] on broker 1 (kafka.server.ReplicaManager)
> 2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request with correlation id 15078431 from client ReplicaFetcherThread-0-1 on partition [LOGIST.DELIVERY.SUBSCRIBE,16] failed due to Leader not local for partition [LOGIST.DELIVERY.SUBSCRIBE,16] on broker 1 (kafka.server.ReplicaManager)
> 2015-09-23 10:20:03 525 WARN [Replica Manager on Broker 1]: Fetch request with correlation id 14988525 from client ReplicaFetcherThread-3-1 on partition [LOGIST.DELIVERY.SUBSCRIBE,19] failed due to Leader not local for partition [LOGIST.DELIVERY.SUBSCRIBE,19] on broker 1 (kafka.server.ReplicaManager)
> 2015-09-23 10:20:03 525 ERROR [KafkaApi-1] error when handling request Name: FetchRequest; Version: 0; CorrelationId: 15022337; ClientId: ReplicaFetcherThread-1-1; ReplicaId: 0; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [LOGIST.DELIVERY.SUBSCRIBE,1] -> PartitionFetchInfo(0,1048576),[LOGIST.DELIVERY.SUBSCRIBE,13] -> PartitionFetchInfo(0,1048576) (kafka.server.KafkaApis)
> kafka.common.NotAssignedReplicaException: Leader 1 failed to record follower 0's position -1 since the replica is not recognized to be one of the assigned replicas  for partition [LOGIST.DELIVERY.SUBSCRIBE,1]
> 	at kafka.server.ReplicaManager.updateReplicaLEOAndPartitionHW(ReplicaManager.scala:574)
> 	at kafka.server.KafkaApis$$anonfun$recordFollowerLogEndOffsets$2.apply(KafkaApis.scala:388)
> 	at kafka.server.KafkaApis$$anonfun$recordFollowerLogEndOffsets$2.apply(KafkaApis.scala:386)
> 	at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
> 	at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
> 	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> 	at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
> 	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> 	at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
> 	at kafka.server.KafkaApis.recordFollowerLogEndOffsets(KafkaApis.scala:386)
> 	at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:351)
> 	at kafka.server.KafkaApis.handle(KafkaApis.scala:60)
> 	at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
> 	at java.lang.Thread.run(Thread.java:745)
> 2015-09-23 10:20:03 525 ERROR [KafkaApi-1] error when handling request Name: FetchRequest; Version: 0; CorrelationId: 15078431; ClientId: ReplicaFetcherThread-0-1; ReplicaId: 0; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [LOGIST.DELIVERY.SUBSCRIBE,4] -> PartitionFetchInfo(0,1048576),[LOGIST.DELIVERY.SUBSCRIBE,16] -> PartitionFetchInfo(0,1048576) (kafka.server.KafkaApis)
> kafka.common.NotAssignedReplicaException: Leader 1 failed to record follower 0's position -1 since the replica is not recognized to be one of the assigned replicas  for partition [LOGIST.DELIVERY.SUBSCRIBE,4]
> 	at kafka.server.ReplicaManager.updateReplicaLEOAndPartitionHW(ReplicaManager.scala:574)
> 	at kafka.server.KafkaApis$$anonfun$recordFollowerLogEndOffsets$2.apply(KafkaApis.scala:388)
> 	at kafka.server.KafkaApis$$anonfun$recordFollowerLogEndOffsets$2.apply(KafkaApis.scala:386)
> 	at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
> 	at scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
> 	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> 	at scala.collection.immutable.Map$Map2.foreach(Map.scala:130)
> 	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> 	at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
> 	at kafka.server.KafkaApis.recordFollowerLogEndOffsets(KafkaApis.scala:386)
> 	at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:351)
> 	at kafka.server.KafkaApis.handle(KafkaApis.scala:60)
> 	at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
> 	at java.lang.Thread.run(Thread.java:745)
> I checked the kafka source code, the replication manager write the offsets of all the partitions to replication-offset-checkpoint every 5 seconds, and it has the internel lock for this file for every OffsetCheckpoint, it shoud be impossible that the offset count is 3918, but the actual count of offset entries is 3904? Is it the multihread issue that some other thread flush the content to the same file due to the internal lock in OffsetCheckPoint.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)