You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Gagan Agrawal <ag...@gmail.com> on 2016/06/06 06:07:30 UTC

Consumer offsets not committed in zookeeper during graceful shutdown

Hi,
I am trying to shutdown kafka consumer (version 0.8.2) gracefully by
calling consumer.shutdown (ConsumerConnector.shutdown) and then waiting for
executor threads to finish. However what I have noticed is that during next
start, some of the messages are replayed. I have auto commit enabled.

I looked at the code of kafka.consumer.ZookeeperConsumerConnector
<https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala>
  and realised that there is no commitOffset call during shutdown. Here is
the code snippet from ZookeeperConsumerConnector class. Ideally after
shutting down fetchers, commit offset should have been called to commit all
offsets fetched till now. Also I noticed that during shutdown, zkClient is
being closed, so I can not call commitOffset from outside also after
shutdown.

Is this expected behaviour? Or there is anything I am missing. Is there any
way using high level consumer to make sure all offsets are committed before
shutting down?


def shutdown() {
val canShutdown = isShuttingDown.compareAndSet(false, true);
if (canShutdown) {
logger.info("ZKConsumerConnector shutting down")
try {
scheduler.shutdown
fetcher match {
case Some(f) => f.shutdown
case None =>
}
sendShudownToAllQueues
if (zkClient != null) {
zkClient.close()
zkClient = null
}
}
catch {
case e =>
logger.fatal(e)
logger.fatal(Utils.stackTrace(e))
}
logger.info("ZKConsumerConnector shut down completed")
}
}