You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Ismael Juma (JIRA)" <ji...@apache.org> on 2017/01/04 20:32:58 UTC

[jira] [Updated] (KAFKA-4523) Controlled shutdown fails if consumer group restabilizes during shutdown

     [ https://issues.apache.org/jira/browse/KAFKA-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ismael Juma updated KAFKA-4523:
-------------------------------
    Fix Version/s: 0.10.2.0

> Controlled shutdown fails if consumer group restabilizes during shutdown
> ------------------------------------------------------------------------
>
>                 Key: KAFKA-4523
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4523
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.10.1.0
>            Reporter: Steve Niemitz
>             Fix For: 0.10.2.0
>
>
> If I begin a controlled shutdown of a broker that is a coordinator for a consumer group, often the shutdown will fail with the following error:
> {code}
> [2016-12-12 16:24:15,424] INFO [Replica Manager on Broker 10]: Shut down completely (kafka.server.ReplicaManager)
> [2016-12-12 16:24:15,424] INFO [ExpirationReaper-10], Shutting down (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2016-12-12 16:24:15,450] INFO [ExpirationReaper-10], Stopped  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2016-12-12 16:24:15,450] INFO [ExpirationReaper-10], Shutdown completed (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2016-12-12 16:24:15,451] INFO Shutting down. (kafka.log.LogManager)
> [2016-12-12 16:24:31,241] INFO [GroupCoordinator 10]: Preparing to restabilize group my-consumer-group with old generation 2673 (kafka.coordinator.GroupCoordinator)
> [2016-12-12 16:24:32,499] INFO [GroupCoordinator 10]: Group my-consumer-group with generation 2674 is now empty (kafka.coordinator.GroupCoordinator)
> [2016-12-12 16:24:32,515] FATAL [Replica Manager on Broker 10]: Halting due to unrecoverable I/O error while handling produce request:  (kafka.server.ReplicaManager)
> kafka.common.KafkaStorageException: I/O exception in append to log '__consumer_offsets-33'
> 	at kafka.log.Log.append(Log.scala:349)
> 	at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:443)
> 	at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:429)
> 	at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
> 	at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:240)
> 	at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429)
> 	at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:407)
> 	at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:393)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> 	at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> 	at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> 	at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:393)
> 	at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:330)
> 	at kafka.coordinator.GroupMetadataManager.store(GroupMetadataManager.scala:251)
> 	at kafka.coordinator.GroupCoordinator$$anonfun$onCompleteJoin$6.apply(GroupCoordinator.scala:726)
> 	at kafka.coordinator.GroupCoordinator$$anonfun$onCompleteJoin$6.apply(GroupCoordinator.scala:726)
> 	at scala.Option.foreach(Option.scala:236)
> 	at kafka.coordinator.GroupCoordinator.onCompleteJoin(GroupCoordinator.scala:726)
> 	at kafka.coordinator.DelayedJoin.onComplete(DelayedJoin.scala:39)
> 	at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
> 	at kafka.coordinator.DelayedJoin$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedJoin.scala:37)
> 	at kafka.coordinator.GroupCoordinator.tryCompleteJoin(GroupCoordinator.scala:672)
> 	at kafka.coordinator.DelayedJoin.tryComplete(DelayedJoin.scala:37)
> 	at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:315)
> 	at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:234)
> 	at kafka.coordinator.GroupCoordinator.onMemberFailure(GroupCoordinator.scala:665)
> 	at kafka.coordinator.GroupCoordinator.onExpireHeartbeat(GroupCoordinator.scala:740)
> 	at kafka.coordinator.DelayedHeartbeat.onExpiration(DelayedHeartbeat.scala:33)
> 	at kafka.server.DelayedOperation.run(DelayedOperation.scala:107)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.nio.channels.ClosedChannelException
> 	at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110)
> 	at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:300)
> 	at kafka.log.FileMessageSet.truncateTo(FileMessageSet.scala:405)
> 	at kafka.log.FileMessageSet.trim(FileMessageSet.scala:378)
> 	at kafka.log.Log.roll(Log.scala:773)
> 	at kafka.log.Log.maybeRoll(Log.scala:742)
> 	at kafka.log.Log.append(Log.scala:405)
> 	... 35 more
> {code}
> This then causes the broker to attempt to run recovery on all log segments on the next startup, which obviously is not ideal.  It looks like the group coodinator is shutdown after the log manager [1], should the order be reversed?
> [1] https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaServer.scala#L588



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)