You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/01/04 19:59:58 UTC

[jira] [Commented] (KAFKA-4523) Controlled shutdown fails if consumer group restabilizes during shutdown

    [ https://issues.apache.org/jira/browse/KAFKA-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15799183#comment-15799183 ] 

ASF GitHub Bot commented on KAFKA-4523:
---------------------------------------

GitHub user steveniemitz opened a pull request:

    https://github.com/apache/kafka/pull/2311

    KAFKA-4523; Fix crash at shutdown due to the group coordinator attemp…

    …ting to write to a closed log.
    
    Solution: shut down the group manager before shutting down the log manager to ensure that any delayed operations are done.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tc-dc/kafka KAFKA-4523

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/kafka/pull/2311.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2311
    
----
commit d4d04530f6cf8cd8e8fb23fd087e0549dacd0cfe
Author: steve <sn...@twitter.com>
Date:   2017-01-04T19:48:33Z

    KAFKA-4523; Fix crash at shutdown due to the group coordinator attempting to write to a closed log.

----


> Controlled shutdown fails if consumer group restabilizes during shutdown
> ------------------------------------------------------------------------
>
>                 Key: KAFKA-4523
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4523
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.10.1.0
>            Reporter: Steve Niemitz
>
> If I begin a controlled shutdown of a broker that is a coordinator for a consumer group, often the shutdown will fail with the following error:
> {code}
> [2016-12-12 16:24:15,424] INFO [Replica Manager on Broker 10]: Shut down completely (kafka.server.ReplicaManager)
> [2016-12-12 16:24:15,424] INFO [ExpirationReaper-10], Shutting down (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2016-12-12 16:24:15,450] INFO [ExpirationReaper-10], Stopped  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2016-12-12 16:24:15,450] INFO [ExpirationReaper-10], Shutdown completed (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2016-12-12 16:24:15,451] INFO Shutting down. (kafka.log.LogManager)
> [2016-12-12 16:24:31,241] INFO [GroupCoordinator 10]: Preparing to restabilize group my-consumer-group with old generation 2673 (kafka.coordinator.GroupCoordinator)
> [2016-12-12 16:24:32,499] INFO [GroupCoordinator 10]: Group my-consumer-group with generation 2674 is now empty (kafka.coordinator.GroupCoordinator)
> [2016-12-12 16:24:32,515] FATAL [Replica Manager on Broker 10]: Halting due to unrecoverable I/O error while handling produce request:  (kafka.server.ReplicaManager)
> kafka.common.KafkaStorageException: I/O exception in append to log '__consumer_offsets-33'
> 	at kafka.log.Log.append(Log.scala:349)
> 	at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:443)
> 	at kafka.cluster.Partition$$anonfun$10.apply(Partition.scala:429)
> 	at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
> 	at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:240)
> 	at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429)
> 	at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:407)
> 	at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:393)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> 	at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> 	at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> 	at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:393)
> 	at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:330)
> 	at kafka.coordinator.GroupMetadataManager.store(GroupMetadataManager.scala:251)
> 	at kafka.coordinator.GroupCoordinator$$anonfun$onCompleteJoin$6.apply(GroupCoordinator.scala:726)
> 	at kafka.coordinator.GroupCoordinator$$anonfun$onCompleteJoin$6.apply(GroupCoordinator.scala:726)
> 	at scala.Option.foreach(Option.scala:236)
> 	at kafka.coordinator.GroupCoordinator.onCompleteJoin(GroupCoordinator.scala:726)
> 	at kafka.coordinator.DelayedJoin.onComplete(DelayedJoin.scala:39)
> 	at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
> 	at kafka.coordinator.DelayedJoin$$anonfun$tryComplete$1.apply$mcZ$sp(DelayedJoin.scala:37)
> 	at kafka.coordinator.GroupCoordinator.tryCompleteJoin(GroupCoordinator.scala:672)
> 	at kafka.coordinator.DelayedJoin.tryComplete(DelayedJoin.scala:37)
> 	at kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:315)
> 	at kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:234)
> 	at kafka.coordinator.GroupCoordinator.onMemberFailure(GroupCoordinator.scala:665)
> 	at kafka.coordinator.GroupCoordinator.onExpireHeartbeat(GroupCoordinator.scala:740)
> 	at kafka.coordinator.DelayedHeartbeat.onExpiration(DelayedHeartbeat.scala:33)
> 	at kafka.server.DelayedOperation.run(DelayedOperation.scala:107)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.nio.channels.ClosedChannelException
> 	at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110)
> 	at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:300)
> 	at kafka.log.FileMessageSet.truncateTo(FileMessageSet.scala:405)
> 	at kafka.log.FileMessageSet.trim(FileMessageSet.scala:378)
> 	at kafka.log.Log.roll(Log.scala:773)
> 	at kafka.log.Log.maybeRoll(Log.scala:742)
> 	at kafka.log.Log.append(Log.scala:405)
> 	... 35 more
> {code}
> This then causes the broker to attempt to run recovery on all log segments on the next startup, which obviously is not ideal.  It looks like the group coodinator is shutdown after the log manager [1], should the order be reversed?
> [1] https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaServer.scala#L588



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)