You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2011/10/06 21:57:38 UTC

svn commit: r1179796 - /incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

Author: nehanarkhede
Date: Thu Oct  6 19:57:38 2011
New Revision: 1179796

URL: http://svn.apache.org/viewvc?rev=1179796&view=rev
Log:
KAFKA-89 consumer should initialize to a valid offset;patched by junrao; reviewed by jjkoshy

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1179796&r1=1179795&r2=1179796&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Thu Oct  6 19:57:38 2011
@@ -30,6 +30,7 @@ import org.apache.zookeeper.Watcher.Even
 import kafka.api.OffsetRequest
 import java.util.UUID
 import kafka.serializer.Decoder
+import kafka.common.InvalidConfigException
 
 /**
  * This class handles the consumers interaction with zookeeper
@@ -302,7 +303,10 @@ private[kafka] class ZookeeperConsumerCo
     return -2
   }
 
-  def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long = {
+  def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long =
+    earliestOrLatestOffset(topic, brokerId, partitionId, OffsetRequest.LatestTime)
+
+  private def earliestOrLatestOffset(topic: String, brokerId: Int, partitionId: Int, earliestOrLatest: Long): Long = {
     var simpleConsumer: SimpleConsumer = null
     var producedOffset: Long = -1L
     try {
@@ -310,13 +314,12 @@ private[kafka] class ZookeeperConsumerCo
       val broker = cluster.getBroker(brokerId)
       simpleConsumer = new SimpleConsumer(broker.host, broker.port, ConsumerConfig.SocketTimeout,
                                             ConsumerConfig.SocketBufferSize)
-      val latestOffset = simpleConsumer.getOffsetsBefore(topic, partitionId,
-                                                       OffsetRequest.LatestTime, 1)
-      producedOffset = latestOffset(0)
+      val offsets = simpleConsumer.getOffsetsBefore(topic, partitionId, earliestOrLatest, 1)
+      producedOffset = offsets(0)
     }
     catch {
       case e =>
-        logger.error("error in getLatestOffset jmx ", e)
+        logger.error("error in earliestOrLatestOffset() ", e)
     }
     finally {
       if (simpleConsumer != null)
@@ -553,9 +556,19 @@ private[kafka] class ZookeeperConsumerCo
 
       val znode = topicDirs.consumerOffsetDir + "/" + partition.name
       val offsetString = ZkUtils.readDataMaybeNull(zkClient, znode)
-      // If first time starting a consumer, use default offset.
-      // TODO: handle this better (if client doesn't know initial offsets)
-      val offset : Long = if (offsetString == null) Long.MaxValue else offsetString.toLong
+      // If first time starting a consumer, set the initial offset based on the config
+      var offset : Long = 0L
+      if (offsetString == null)
+        offset = config.autoOffsetReset match {
+              case OffsetRequest.SmallestTimeString =>
+                  earliestOrLatestOffset(topic, partition.brokerId, partition.partId, OffsetRequest.EarliestTime)
+              case OffsetRequest.LargestTimeString =>
+                  earliestOrLatestOffset(topic, partition.brokerId, partition.partId, OffsetRequest.LatestTime)
+              case _ =>
+                  throw new InvalidConfigException("Wrong value in autoOffsetReset in ConsumerConfig")
+        }
+      else
+        offset = offsetString.toLong
       val queue = queues.get((topic, consumerThreadId))
       val consumedOffset = new AtomicLong(offset)
       val fetchedOffset = new AtomicLong(offset)