You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2011/10/06 21:57:38 UTC
svn commit: r1179796 -
/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
Author: nehanarkhede
Date: Thu Oct 6 19:57:38 2011
New Revision: 1179796
URL: http://svn.apache.org/viewvc?rev=1179796&view=rev
Log:
KAFKA-89 consumer should initialize to a valid offset;patched by junrao; reviewed by jjkoshy
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1179796&r1=1179795&r2=1179796&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Thu Oct 6 19:57:38 2011
@@ -30,6 +30,7 @@ import org.apache.zookeeper.Watcher.Even
import kafka.api.OffsetRequest
import java.util.UUID
import kafka.serializer.Decoder
+import kafka.common.InvalidConfigException
/**
* This class handles the consumers interaction with zookeeper
@@ -302,7 +303,10 @@ private[kafka] class ZookeeperConsumerCo
return -2
}
- def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long = {
+ def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long =
+ earliestOrLatestOffset(topic, brokerId, partitionId, OffsetRequest.LatestTime)
+
+ private def earliestOrLatestOffset(topic: String, brokerId: Int, partitionId: Int, earliestOrLatest: Long): Long = {
var simpleConsumer: SimpleConsumer = null
var producedOffset: Long = -1L
try {
@@ -310,13 +314,12 @@ private[kafka] class ZookeeperConsumerCo
val broker = cluster.getBroker(brokerId)
simpleConsumer = new SimpleConsumer(broker.host, broker.port, ConsumerConfig.SocketTimeout,
ConsumerConfig.SocketBufferSize)
- val latestOffset = simpleConsumer.getOffsetsBefore(topic, partitionId,
- OffsetRequest.LatestTime, 1)
- producedOffset = latestOffset(0)
+ val offsets = simpleConsumer.getOffsetsBefore(topic, partitionId, earliestOrLatest, 1)
+ producedOffset = offsets(0)
}
catch {
case e =>
- logger.error("error in getLatestOffset jmx ", e)
+ logger.error("error in earliestOrLatestOffset() ", e)
}
finally {
if (simpleConsumer != null)
@@ -553,9 +556,19 @@ private[kafka] class ZookeeperConsumerCo
val znode = topicDirs.consumerOffsetDir + "/" + partition.name
val offsetString = ZkUtils.readDataMaybeNull(zkClient, znode)
- // If first time starting a consumer, use default offset.
- // TODO: handle this better (if client doesn't know initial offsets)
- val offset : Long = if (offsetString == null) Long.MaxValue else offsetString.toLong
+ // If first time starting a consumer, set the initial offset based on the config
+ var offset : Long = 0L
+ if (offsetString == null)
+ offset = config.autoOffsetReset match {
+ case OffsetRequest.SmallestTimeString =>
+ earliestOrLatestOffset(topic, partition.brokerId, partition.partId, OffsetRequest.EarliestTime)
+ case OffsetRequest.LargestTimeString =>
+ earliestOrLatestOffset(topic, partition.brokerId, partition.partId, OffsetRequest.LatestTime)
+ case _ =>
+ throw new InvalidConfigException("Wrong value in autoOffsetReset in ConsumerConfig")
+ }
+ else
+ offset = offsetString.toLong
val queue = queues.get((topic, consumerThreadId))
val consumedOffset = new AtomicLong(offset)
val fetchedOffset = new AtomicLong(offset)