You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2012/09/27 21:20:09 UTC

svn commit: r1391168 - in /incubator/kafka/branches/0.8: contrib/hadoop-consumer/src/main/java/kafka/etl/ core/src/main/scala/kafka/api/ core/src/main/scala/kafka/consumer/ core/src/main/scala/kafka/javaapi/ core/src/main/scala/kafka/javaapi/consumer/ ...

Author: jjkoshy
Date: Thu Sep 27 19:20:08 2012
New Revision: 1391168

URL: http://svn.apache.org/viewvc?rev=1391168&view=rev
Log:
getOffset Api should return different latest offset to non-follower and follower consumers. Also, implement a batched version of the getOffset Api. patched by Joel Koshy; reviewed by Jun Rao; KAFKA-501

Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/OffsetResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
Removed:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala
Modified:
    incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/GetOffsetShell.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
    incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala

Modified: incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java (original)
+++ incubator/kafka/branches/0.8/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java Thu Sep 27 19:20:08 2012
@@ -16,15 +16,20 @@
  */
 package kafka.etl;
 
+
 import java.io.IOException;
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
 import kafka.api.FetchRequest;
 import kafka.api.FetchRequestBuilder;
-import kafka.api.OffsetRequest;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.common.TopicAndPartition;
 import kafka.javaapi.FetchResponse;
+import kafka.javaapi.OffsetRequest;
 import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.javaapi.message.ByteBufferMessageSet;
 import kafka.message.MessageAndOffset;
@@ -144,7 +149,7 @@ public class KafkaETLContext {
     
     public boolean fetchMore () throws IOException {
         if (!hasMore()) return false;
-        
+
         FetchRequest fetchRequest = new FetchRequestBuilder()
                 .correlationId(requestId)
                 .clientId(_request.clientId())
@@ -216,15 +221,23 @@ public class KafkaETLContext {
         /* get smallest and largest offsets*/
         long[] range = new long[2];
 
-        long[] startOffsets = _consumer.getOffsetsBefore(_request.getTopic(), _request.getPartition(),
-                OffsetRequest.EarliestTime(), 1);
+        TopicAndPartition topicAndPartition = new TopicAndPartition(_request.getTopic(), _request.getPartition());
+        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
+                new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
+        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime(), 1));
+        OffsetRequest request = new OffsetRequest(
+            requestInfo, kafka.api.OffsetRequest.CurrentVersion(), kafka.api.OffsetRequest.DefaultClientId());
+        long[] startOffsets = _consumer.getOffsetsBefore(request).offsets(_request.getTopic(), _request.getPartition());
         if (startOffsets.length != 1)
             throw new IOException("input:" + _input + " Expect one smallest offset but get "
                                             + startOffsets.length);
         range[0] = startOffsets[0];
         
-        long[] endOffsets = _consumer.getOffsetsBefore(_request.getTopic(), _request.getPartition(),
-                                        OffsetRequest.LatestTime(), 1);
+        requestInfo.clear();
+        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1));
+        request = new OffsetRequest(
+            requestInfo, kafka.api.OffsetRequest.CurrentVersion(), kafka.api.OffsetRequest.DefaultClientId());
+        long[] endOffsets = _consumer.getOffsetsBefore(request).offsets(_request.getTopic(), _request.getPartition());
         if (endOffsets.length != 1)
             throw new IOException("input:" + _input + " Expect one latest offset but get " 
                                             + endOffsets.length);

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala Thu Sep 27 19:20:08 2012
@@ -30,8 +30,6 @@ object FetchRequest {
   val CurrentVersion = 1.shortValue()
   val DefaultCorrelationId = -1
   val DefaultClientId = ""
-  val DefaultReplicaId = -1
-  val NonFollowerId = DefaultReplicaId
   val DefaultMaxWait = 0
   val DefaultMinBytes = 0
 
@@ -60,7 +58,7 @@ object FetchRequest {
 case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
                         correlationId: Int = FetchRequest.DefaultCorrelationId,
                         clientId: String = FetchRequest.DefaultClientId,
-                        replicaId: Int = FetchRequest.DefaultReplicaId,
+                        replicaId: Int = Request.DefaultReplicaId,
                         maxWait: Int = FetchRequest.DefaultMaxWait,
                         minBytes: Int = FetchRequest.DefaultMinBytes,
                         requestInfo: Map[TopicAndPartition, PartitionFetchInfo])
@@ -113,7 +111,7 @@ case class FetchRequest(versionId: Short
     })
   }
 
-  def isFromFollower = replicaId != FetchRequest.NonFollowerId
+  def isFromFollower = replicaId != Request.NonFollowerId
 
   def numPartitions = requestInfo.size
 }
@@ -124,7 +122,7 @@ class FetchRequestBuilder() {
   private var correlationId = FetchRequest.DefaultCorrelationId
   private val versionId = FetchRequest.CurrentVersion
   private var clientId = FetchRequest.DefaultClientId
-  private var replicaId = FetchRequest.DefaultReplicaId
+  private var replicaId = Request.DefaultReplicaId
   private var maxWait = FetchRequest.DefaultMaxWait
   private var minBytes = FetchRequest.DefaultMinBytes
   private val requestMap = new collection.mutable.HashMap[TopicAndPartition, PartitionFetchInfo]

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala Thu Sep 27 19:20:08 2012
@@ -19,6 +19,8 @@ package kafka.api
 
 import java.nio.ByteBuffer
 import kafka.utils.Utils
+import kafka.common.TopicAndPartition
+
 
 object OffsetRequest {
   val CurrentVersion = 1.shortValue()
@@ -32,36 +34,67 @@ object OffsetRequest {
   def readFrom(buffer: ByteBuffer): OffsetRequest = {
     val versionId = buffer.getShort
     val clientId = Utils.readShortString(buffer)
-    val topic = Utils.readShortString(buffer, "UTF-8")
-    val partition = buffer.getInt()
-    val offset = buffer.getLong
-    val maxNumOffsets = buffer.getInt
-    new OffsetRequest(versionId, clientId, topic, partition, offset, maxNumOffsets)
+    val replicaId = buffer.getInt
+    val topicCount = buffer.getInt
+    val pairs = (1 to topicCount).flatMap(_ => {
+      val topic = Utils.readShortString(buffer)
+      val partitionCount = buffer.getInt
+      (1 to partitionCount).map(_ => {
+        val partitionId = buffer.getInt
+        val time = buffer.getLong
+        val maxNumOffsets = buffer.getInt
+        (TopicAndPartition(topic, partitionId), PartitionOffsetRequestInfo(time, maxNumOffsets))
+      })
+    })
+    OffsetRequest(Map(pairs:_*), versionId = versionId, clientId = clientId, replicaId = replicaId)
   }
 }
 
-case class OffsetRequest(versionId: Short = OffsetRequest.CurrentVersion,
-                    clientId: String = OffsetRequest.DefaultClientId,
-                    topic: String,
-                    partition: Int,
-                    time: Long,
-                    maxNumOffsets: Int) extends RequestOrResponse(Some(RequestKeys.OffsetsKey)) {
-  def this(topic: String, partition: Int, time: Long, maxNumOffsets: Int) =
-    this(OffsetRequest.CurrentVersion, OffsetRequest.DefaultClientId, topic, partition, time, maxNumOffsets)
+case class PartitionOffsetRequestInfo(time: Long, maxNumOffsets: Int)
 
+case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequestInfo],
+                         versionId: Short = OffsetRequest.CurrentVersion,
+                         clientId: String = OffsetRequest.DefaultClientId,
+                         replicaId: Int = Request.DefaultReplicaId)
+        extends RequestOrResponse(Some(RequestKeys.OffsetsKey)) {
 
+  lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
     Utils.writeShortString(buffer, clientId)
-    Utils.writeShortString(buffer, topic)
-    buffer.putInt(partition)
-    buffer.putLong(time)
-    buffer.putInt(maxNumOffsets)
+    buffer.putInt(replicaId)
+
+    buffer.putInt(requestInfoGroupedByTopic.size) // topic count
+    requestInfoGroupedByTopic.foreach {
+      case((topic, partitionInfos)) =>
+        Utils.writeShortString(buffer, topic)
+        buffer.putInt(partitionInfos.size) // partition count
+        partitionInfos.foreach {
+          case (TopicAndPartition(_, partition), partitionInfo) =>
+            buffer.putInt(partition)
+            buffer.putLong(partitionInfo.time)
+            buffer.putInt(partitionInfo.maxNumOffsets)
+        }
+    }
   }
 
-  def sizeInBytes(): Int = 2 + (2 + clientId.length()) + (2 + topic.length) + 4 + 8 + 4
+  def sizeInBytes =
+    2 + /* versionId */
+    Utils.shortStringLength(clientId, RequestOrResponse.DefaultCharset) +
+    4 + /* replicaId */
+    4 + /* topic count */
+    requestInfoGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => {
+      val (topic, partitionInfos) = currTopic
+      foldedTopics +
+      Utils.shortStringLength(topic, RequestOrResponse.DefaultCharset) +
+      4 + /* partition count */
+      partitionInfos.size * (
+        4 + /* partition */
+        8 + /* time */
+        4 /* maxNumOffsets */
+      )
+    })
 
-  override def toString(): String= "OffsetRequest(version:" + versionId + ", client id:" + clientId +
-          ", topic:" + topic + ", part:" + partition + ", time:" + time + ", maxNumOffsets:" + maxNumOffsets + ")"
+  def isFromFollower = replicaId != Request.NonFollowerId
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetResponse.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetResponse.scala Thu Sep 27 19:20:08 2012
@@ -18,43 +18,77 @@
 package kafka.api
 
 import java.nio.ByteBuffer
-import kafka.common.ErrorMapping
+import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.utils.Utils
 
 
 object OffsetResponse {
+
   def readFrom(buffer: ByteBuffer): OffsetResponse = {
     val versionId = buffer.getShort
-    val errorCode = buffer.getShort
-    val offsetsSize = buffer.getInt
-    val offsets = new Array[Long](offsetsSize)
-    for( i <- 0 until offsetsSize) {
-      offsets(i) = buffer.getLong
-    }
-    new OffsetResponse(versionId, offsets, errorCode)
+    val numTopics = buffer.getInt
+    val pairs = (1 to numTopics).flatMap(_ => {
+      val topic = Utils.readShortString(buffer)
+      val numPartitions = buffer.getInt
+      (1 to numPartitions).map(_ => {
+        val partition = buffer.getInt
+        val error = buffer.getShort
+        val numOffsets = buffer.getInt
+        val offsets = (1 to numOffsets).map(_ => buffer.getLong)
+        (TopicAndPartition(topic, partition), PartitionOffsetsResponse(error, offsets))
+      })
+    })
+    OffsetResponse(versionId, Map(pairs:_*))
   }
+
 }
 
+
+case class PartitionOffsetsResponse(error: Short, offsets: Seq[Long])
+
+
 case class OffsetResponse(versionId: Short,
-                          offsets: Array[Long],
-                          errorCode: Short = ErrorMapping.NoError) extends RequestOrResponse{
-  val sizeInBytes = 2 + 2 + offsets.foldLeft(4)((sum, _) => sum + 8)
+                          partitionErrorAndOffsets: Map[TopicAndPartition, PartitionOffsetsResponse])
+        extends RequestOrResponse {
 
-  def writeTo(buffer: ByteBuffer) {
-    buffer.putShort(versionId)
-    /* error code */
-    buffer.putShort(errorCode)
-    buffer.putInt(offsets.length)
-    offsets.foreach(buffer.putLong(_))
+  lazy val offsetsGroupedByTopic = partitionErrorAndOffsets.groupBy(_._1.topic)
+
+  def hasError = partitionErrorAndOffsets.values.exists(_.error != ErrorMapping.NoError)
+
+  val sizeInBytes = {
+    2 + /* versionId */
+    4 + /* topic count */
+    offsetsGroupedByTopic.foldLeft(0)((foldedTopics, currTopic) => {
+      val (topic, errorAndOffsetsMap) = currTopic
+      foldedTopics +
+      Utils.shortStringLength(topic) +
+      4 + /* partition count */
+      errorAndOffsetsMap.foldLeft(0)((foldedPartitions, currPartition) => {
+        foldedPartitions +
+        4 + /* partition id */
+        2 + /* partition error */
+        4 + /* offset array length */
+        currPartition._2.offsets.size * 8 /* offset */
+      })
+    })
   }
 
-    // need to override case-class equals due to broken java-array equals()
-  override def equals(other: Any): Boolean = {
-   other match {
-      case that: OffsetResponse =>
-        ( versionId == that.versionId &&
-          errorCode == that.errorCode &&
-          offsets.toSeq == that.offsets.toSeq)
-      case _ => false
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putShort(versionId)
+    buffer.putInt(offsetsGroupedByTopic.size) // topic count
+    offsetsGroupedByTopic.foreach {
+      case((topic, errorAndOffsetsMap)) =>
+        Utils.writeShortString(buffer, topic)
+        buffer.putInt(errorAndOffsetsMap.size) // partition count
+        errorAndOffsetsMap.foreach {
+          case((TopicAndPartition(_, partition), errorAndOffsets)) =>
+            buffer.putInt(partition)
+            buffer.putShort(errorAndOffsets.error)
+            buffer.putInt(errorAndOffsets.offsets.size) // offset array length
+            errorAndOffsets.offsets.foreach(buffer.putLong(_))
+        }
     }
   }
+
 }
+

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala Thu Sep 27 19:20:08 2012
@@ -66,12 +66,11 @@ case class ProducerResponse(versionId: S
       foldedTopics +
       Utils.shortStringLength(currTopic._1, RequestOrResponse.DefaultCharset) +
       4 + /* partition count for this topic */
-      currTopic._2.foldLeft (0) ((foldedPartitions, currPartition) => {
-        foldedPartitions +
+      currTopic._2.size * {
         4 + /* partition id */
         2 + /* error code */
         8 /* offset */
-      })
+      }
     })
   }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestOrResponse.scala Thu Sep 27 19:20:08 2012
@@ -25,6 +25,12 @@ object RequestOrResponse {
 }
 
 
+object Request {
+  val DefaultReplicaId = -1
+  val NonFollowerId = DefaultReplicaId
+}
+
+
 private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None) {
 
   def sizeInBytes: Int

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala Thu Sep 27 19:20:08 2012
@@ -20,7 +20,7 @@ package kafka.consumer
 import kafka.cluster.Broker
 import kafka.server.AbstractFetcherThread
 import kafka.message.ByteBufferMessageSet
-import kafka.api.{FetchRequest, OffsetRequest, PartitionData}
+import kafka.api.{PartitionOffsetRequestInfo, Request, OffsetRequest, PartitionData}
 import kafka.common.TopicAndPartition
 
 
@@ -30,7 +30,7 @@ class ConsumerFetcherThread(name: String
                             val consumerFetcherManager: ConsumerFetcherManager)
         extends AbstractFetcherThread(name = name, sourceBroker = sourceBroker, socketTimeout = config.socketTimeoutMs,
           socketBufferSize = config.socketBufferSize, fetchSize = config.fetchSize,
-          fetcherBrokerId = FetchRequest.NonFollowerId, maxWait = config.maxFetchWaitMs,
+          fetcherBrokerId = Request.NonFollowerId, maxWait = config.maxFetchWaitMs,
           minBytes = config.minFetchBytes) {
 
   // process fetched data
@@ -50,12 +50,13 @@ class ConsumerFetcherThread(name: String
       case OffsetRequest.LargestTimeString => startTimestamp = OffsetRequest.LatestTime
       case _ => startTimestamp = OffsetRequest.LatestTime
     }
-    val newOffset = simpleConsumer.getOffsetsBefore(topic, partitionId, startTimestamp, 1)(0)
-
+    val topicAndPartition = TopicAndPartition(topic, partitionId)
+    val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(startTimestamp, 1)))
+    val newOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
     val pti = consumerFetcherManager.getPartitionTopicInfo((topic, partitionId))
     pti.resetFetchOffset(newOffset)
     pti.resetConsumeOffset(newOffset)
-    return newOffset
+    newOffset
   }
 
   // any logic for partitions whose leader has changed

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala Thu Sep 27 19:20:08 2012
@@ -20,10 +20,10 @@ package kafka.consumer
 import kafka.api._
 import kafka.network._
 import kafka.utils._
-import kafka.common.ErrorMapping
 import java.util.concurrent.TimeUnit
 import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
 
+
 /**
  * A consumer of kafka messages
  */
@@ -110,18 +110,10 @@ class SimpleConsumer( val host: String,
 
   /**
    *  Get a list of valid offsets (up to maxSize) before the given time.
-   *  The result is a list of offsets, in descending order.
-   *
-   *  @param time: time in millisecs (-1, from the latest offset available, -2 from the smallest offset available)
-   *  @return an array of offsets
+   *  @param request a [[kafka.api.OffsetRequest]] object.
+   *  @return a [[kafka.api.OffsetResponse]] object.
    */
-  def getOffsetsBefore(topic: String, partition: Int, time: Long, maxNumOffsets: Int): Array[Long] = {
-    val request = new OffsetRequest(topic, partition, time, maxNumOffsets)
-    val offsetResponse = OffsetResponse.readFrom(sendRequest(request).buffer)
-    // try to throw exception based on global error codes
-    ErrorMapping.maybeThrowException(offsetResponse.errorCode)
-    offsetResponse.offsets
-  }
+  def getOffsetsBefore(request: OffsetRequest) = OffsetResponse.readFrom(sendRequest(request).buffer)
 
   private def getOrMakeConnection() {
     if(!blockingChannel.isConnected) {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Thu Sep 27 19:20:08 2012
@@ -27,10 +27,11 @@ import org.I0Itec.zkclient.exception.ZkN
 import java.net.InetAddress
 import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient}
 import org.apache.zookeeper.Watcher.Event.KeeperState
+import kafka.api.PartitionOffsetRequestInfo
 import java.util.UUID
 import kafka.serializer.Decoder
 import kafka.utils.ZkUtils._
-import kafka.common.{KafkaException, NoBrokersForPartitionException, ConsumerRebalanceFailedException, InvalidConfigException}
+import kafka.common._
 import com.yammer.metrics.core.Gauge
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.Utils._
@@ -263,8 +264,9 @@ private[kafka] class ZookeeperConsumerCo
       }
       simpleConsumer = new SimpleConsumer(broker.host, broker.port, ConsumerConfig.SocketTimeout,
                                             ConsumerConfig.SocketBufferSize)
-      val offsets = simpleConsumer.getOffsetsBefore(topic, partitionId, earliestOrLatest, 1)
-      producedOffset = offsets(0)
+      val topicAndPartition = TopicAndPartition(topic, partitionId)
+      val request = OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)))
+      producedOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
     } catch {
       case e =>
         error("error in earliestOrLatestOffset() ", e)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala Thu Sep 27 19:20:08 2012
@@ -30,6 +30,9 @@ private[javaapi] object Implicits extend
   implicit def toJavaTopicMetadataResponse(response: kafka.api.TopicMetadataResponse): kafka.javaapi.TopicMetadataResponse =
     new kafka.javaapi.TopicMetadataResponse(response)
 
+  implicit def toJavaOffsetResponse(response: kafka.api.OffsetResponse): kafka.javaapi.OffsetResponse =
+    new kafka.javaapi.OffsetResponse(response)
+
   implicit def optionToJavaRef[T](opt: Option[T]): T = {
     opt match {
       case Some(obj) => obj

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/OffsetRequest.scala?rev=1391168&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/OffsetRequest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/OffsetRequest.scala Thu Sep 27 19:20:08 2012
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.javaapi
+
+import kafka.common.TopicAndPartition
+import kafka.api.{Request, PartitionOffsetRequestInfo}
+import collection.JavaConversions
+import java.nio.ByteBuffer
+
+
+class OffsetRequest(requestInfo: java.util.Map[TopicAndPartition, PartitionOffsetRequestInfo],
+                    versionId: Short,
+                    clientId: String) {
+
+  val underlying = {
+    val scalaMap = JavaConversions.asMap(requestInfo).toMap
+    kafka.api.OffsetRequest(
+      requestInfo = scalaMap,
+      versionId = versionId,
+      clientId = clientId,
+      replicaId = Request.NonFollowerId
+    )
+  }
+
+
+  def writeTo(buffer: ByteBuffer) { underlying.writeTo(buffer) }
+
+
+  def sizeInBytes = underlying.sizeInBytes
+
+
+  override def toString = underlying.toString
+
+
+  override def equals(other: Any) = canEqual(other) && {
+    val otherOffsetRequest = other.asInstanceOf[kafka.javaapi.OffsetRequest]
+    this.underlying.equals(otherOffsetRequest.underlying)
+  }
+
+
+  def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.OffsetRequest]
+
+
+  override def hashCode = underlying.hashCode
+
+}

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/OffsetResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/OffsetResponse.scala?rev=1391168&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/OffsetResponse.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/OffsetResponse.scala Thu Sep 27 19:20:08 2012
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.javaapi
+
+import kafka.common.TopicAndPartition
+
+class OffsetResponse(private val underlying: kafka.api.OffsetResponse) {
+
+  def hasError = underlying.hasError
+
+
+  def errorCode(topic: String, partition: Int) =
+    underlying.partitionErrorAndOffsets(TopicAndPartition(topic, partition)).error
+
+
+  def offsets(topic: String, partition: Int) =
+    underlying.partitionErrorAndOffsets(TopicAndPartition(topic, partition)).offsets.toArray
+
+
+  override def equals(other: Any) = canEqual(other) && {
+    val otherOffsetResponse = other.asInstanceOf[kafka.javaapi.OffsetResponse]
+    this.underlying.equals(otherOffsetResponse.underlying)
+  }
+
+
+  def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.OffsetResponse]
+
+
+  override def hashCode = underlying.hashCode
+
+
+  override def toString = underlying.toString
+
+}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala Thu Sep 27 19:20:08 2012
@@ -19,6 +19,8 @@ package kafka.javaapi.consumer
 
 import kafka.utils.threadsafe
 import kafka.javaapi.FetchResponse
+import kafka.javaapi.OffsetRequest
+
 
 /**
  * A consumer of kafka messages
@@ -67,13 +69,14 @@ class SimpleConsumer(val host: String,
 
   /**
    *  Get a list of valid offsets (up to maxSize) before the given time.
-   *  The result is a list of offsets, in descending order.
    *
-   *  @param time: time in millisecs (-1, from the latest offset available, -2 from the smallest offset available)
-   *  @return an array of offsets
+   *  @param request a [[kafka.javaapi.OffsetRequest]] object.
+   *  @return a [[kafka.javaapi.OffsetResponse]] object.
    */
-  def getOffsetsBefore(topic: String, partition: Int, time: Long, maxNumOffsets: Int): Array[Long] =
-    underlying.getOffsetsBefore(topic, partition, time, maxNumOffsets)
+  def getOffsetsBefore(request: OffsetRequest): kafka.javaapi.OffsetResponse = {
+    import kafka.javaapi.Implicits._
+    underlying.getOffsetsBefore(request.underlying)
+  }
 
   def close() {
     underlying.close

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Thu Sep 27 19:20:08 2012
@@ -81,12 +81,10 @@ object Log {
     nf.format(offset) + FileSuffix
   }
 
-  def getEmptyOffsets(request: OffsetRequest): Array[Long] = {
-    if (request.time == OffsetRequest.LatestTime || request.time == OffsetRequest.EarliestTime)
-      return Array(0L)
-    else
-      return Array()
-  }
+  def getEmptyOffsets(timestamp: Long): Seq[Long] =
+    if (timestamp == OffsetRequest.LatestTime || timestamp == OffsetRequest.EarliestTime)
+      Seq(0L)
+    else Nil
 }
 
 
@@ -389,7 +387,7 @@ private[kafka] class Log( val dir: File,
      }
   }
 
-  def getOffsetsBefore(request: OffsetRequest): Array[Long] = {
+  def getOffsetsBefore(timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
     val segsArray = segments.view
     var offsetTimeArray: Array[(Long, Long)] = null
     if (segsArray.last.size > 0)
@@ -403,7 +401,7 @@ private[kafka] class Log( val dir: File,
       offsetTimeArray(segsArray.length) = (segsArray.last.start + segsArray.last.messageSet.sizeInBytes(), time.milliseconds)
 
     var startIndex = -1
-    request.time match {
+    timestamp match {
       case OffsetRequest.LatestTime =>
         startIndex = offsetTimeArray.length - 1
       case OffsetRequest.EarliestTime =>
@@ -413,20 +411,21 @@ private[kafka] class Log( val dir: File,
         debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2)))
         startIndex = offsetTimeArray.length - 1
         while (startIndex >= 0 && !isFound) {
-          if (offsetTimeArray(startIndex)._2 <= request.time)
+          if (offsetTimeArray(startIndex)._2 <= timestamp)
             isFound = true
           else
             startIndex -=1
         }
     }
 
-    val retSize = request.maxNumOffsets.min(startIndex + 1)
+    val retSize = maxNumOffsets.min(startIndex + 1)
     val ret = new Array[Long](retSize)
     for (j <- 0 until retSize) {
       ret(j) = offsetTimeArray(startIndex)._1
       startIndex -= 1
     }
-    ret
+    // ensure that the returned seq is in descending order of offsets
+    ret.toSeq.sortBy(- _)
   }
 
   /**

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala Thu Sep 27 19:20:08 2012
@@ -23,7 +23,7 @@ import scala.collection._
 import kafka.server.KafkaConfig
 import kafka.api.OffsetRequest
 import kafka.log.Log._
-import kafka.common.KafkaException
+import kafka.common.{TopicAndPartition, KafkaException}
 
 /**
  * The guy who creates and hands out logs
@@ -104,11 +104,11 @@ private[kafka] class LogManager(val conf
     }
   }
 
-  def getOffsets(offsetRequest: OffsetRequest): Array[Long] = {
-    val log = getLog(offsetRequest.topic, offsetRequest.partition)
+  def getOffsets(topicAndPartition: TopicAndPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
+    val log = getLog(topicAndPartition.topic, topicAndPartition.partition)
     log match {
-      case Some(l) => l.getOffsetsBefore(offsetRequest)
-      case None => getEmptyOffsets(offsetRequest)
+      case Some(l) => l.getOffsetsBefore(timestamp, maxNumOffsets)
+      case None => getEmptyOffsets(timestamp)
     }
   }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Thu Sep 27 19:20:08 2012
@@ -349,21 +349,36 @@ class KafkaApis(val requestChannel: Requ
       requestLogger.trace("Handling offset request " + offsetRequest.toString)
     trace("Handling offset request " + offsetRequest.toString)
 
-    var response: OffsetResponse = null
-    try {
-      // ensure leader exists
-      replicaManager.getLeaderReplicaIfLocal(offsetRequest.topic, offsetRequest.partition)
-      val offsets = replicaManager.logManager.getOffsets(offsetRequest)
-      response = new OffsetResponse(offsetRequest.versionId, offsets)
-    } catch {
-      case ioe: IOException =>
-        fatal("Halting due to unrecoverable I/O error while handling producer request: " + ioe.getMessage, ioe)
-        System.exit(1)
-      case e =>
-        warn("Error while responding to offset request", e)
-        response = new OffsetResponse(offsetRequest.versionId, Array.empty[Long],
-          ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]).toShort)
-    }
+    val responseMap = offsetRequest.requestInfo.map(elem => {
+      val (topicAndPartition, partitionOffsetRequestInfo) = elem
+      try {
+        // ensure leader exists
+        val leader = replicaManager.getLeaderReplicaIfLocal(
+          topicAndPartition.topic, topicAndPartition.partition)
+        val offsets = {
+          val allOffsets = replicaManager.logManager.getOffsets(topicAndPartition,
+                                                                partitionOffsetRequestInfo.time,
+                                                                partitionOffsetRequestInfo.maxNumOffsets)
+          if (offsetRequest.isFromFollower) allOffsets
+          else {
+            val hw = leader.highWatermark
+            if (allOffsets.exists(_ > hw))
+              hw +: allOffsets.dropWhile(_ > hw)
+            else allOffsets
+          }
+        }
+        (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.NoError, offsets))
+      } catch {
+        case ioe: IOException =>
+          fatal("Halting due to unrecoverable I/O error while handling offset request: " + ioe.getMessage, ioe)
+          // compiler requires scala.sys.exit (not System.exit).
+          exit(1)
+        case e =>
+          warn("Error while responding to offset request", e)
+          (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), Nil) )
+      }
+    })
+    val response = OffsetResponse(OffsetRequest.CurrentVersion, responseMap)
     requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
   }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala Thu Sep 27 19:20:08 2012
@@ -17,7 +17,7 @@
 
 package kafka.server
 
-import kafka.api.{OffsetRequest, PartitionData}
+import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest, PartitionData}
 import kafka.cluster.Broker
 import kafka.message.ByteBufferMessageSet
 import kafka.common.TopicAndPartition
@@ -51,10 +51,15 @@ class ReplicaFetcherThread(name:String, 
   // handle a partition whose offset is out of range and return a new fetch offset
   def handleOffsetOutOfRange(topic: String, partitionId: Int): Long = {
     // This means the local replica is out of date. Truncate the log and catch up from beginning.
-    val offsets = simpleConsumer.getOffsetsBefore(topic, partitionId, OffsetRequest.EarliestTime, 1)
+    val topicAndPartition = TopicAndPartition(topic, partitionId)
+    val request = OffsetRequest(
+      replicaId = brokerConfig.brokerId,
+      requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1))
+    )
+    val offset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
     val replica = replicaMgr.getReplica(topic, partitionId).get
-    replica.log.get.truncateAndStartWithNewOffset(offsets(0))
-    return offsets(0)
+    replica.log.get.truncateAndStartWithNewOffset(offset)
+    offset
   }
 
   // any logic for partitions whose leader has changed

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala Thu Sep 27 19:20:08 2012
@@ -23,9 +23,14 @@ import org.I0Itec.zkclient.ZkClient
 import kafka.utils.{ZkUtils, ZKStringSerializer, Logging}
 import kafka.consumer.SimpleConsumer
 import collection.mutable.Map
+import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest}
+import kafka.common.TopicAndPartition
+import scala.collection._
+
+
 object ConsumerOffsetChecker extends Logging {
 
-  private val consumerMap: Map[String, Option[SimpleConsumer]] = Map()
+  private val consumerMap: mutable.Map[String, Option[SimpleConsumer]] = mutable.Map()
 
   private val BidPidPattern = """(\d+)-(\d+)""".r
 
@@ -61,8 +66,10 @@ object ConsumerOffsetChecker extends Log
           bid, getConsumer(zkClient, bid))
         consumerOpt match {
           case Some(consumer) =>
-            val logSize =
-              consumer.getOffsetsBefore(topic, pid.toInt, -1, 1).last.toLong
+            val topicAndPartition = TopicAndPartition(topic, pid.toInt)
+            val request =
+              OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
+            val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
             println("%20s%d".format("Log size = ", logSize))
             println("%20s%,d (%,.2fG)".format("= ", logSize, logSize / math.pow(1024, 3)))
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/GetOffsetShell.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/GetOffsetShell.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/GetOffsetShell.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/GetOffsetShell.scala Thu Sep 27 19:20:08 2012
@@ -21,6 +21,9 @@ package kafka.tools
 import kafka.consumer._
 import joptsimple._
 import java.net.URI
+import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest}
+import kafka.common.TopicAndPartition
+
 
 object GetOffsetShell {
 
@@ -65,7 +68,9 @@ object GetOffsetShell {
     var time = options.valueOf(timeOpt).longValue
     val nOffsets = options.valueOf(nOffsetsOpt).intValue
     val consumer = new SimpleConsumer(url.getHost, url.getPort, 10000, 100000)
-    val offsets = consumer.getOffsetsBefore(topic, partition, time, nOffsets)
+    val topicAndPartition = TopicAndPartition(topic, partition)
+    val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets)))
+    val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets
     println("get " + offsets.length + " results")
     for (offset <- offsets)
       println(offset)

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala?rev=1391168&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala Thu Sep 27 19:20:08 2012
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.tools
+
+import org.I0Itec.zkclient.ZkClient
+import kafka.consumer.{SimpleConsumer, ConsumerConfig}
+import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest}
+import kafka.common.{TopicAndPartition, KafkaException}
+import kafka.utils.{ZKGroupTopicDirs, ZkUtils, ZKStringSerializer, Utils}
+
+
+/**
+ *  A utility that updates the offset of every broker partition to the offset of earliest or latest log segment file, in ZK.
+ */
+object UpdateOffsetsInZK {
+  val Earliest = "earliest"
+  val Latest = "latest"
+
+  def main(args: Array[String]) {
+    if(args.length < 3)
+      usage
+    val config = new ConsumerConfig(Utils.loadProps(args(1)))
+    val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs,
+        config.zkConnectionTimeoutMs, ZKStringSerializer)
+    args(0) match {
+      case Earliest => getAndSetOffsets(zkClient, OffsetRequest.EarliestTime, config, args(2))
+      case Latest => getAndSetOffsets(zkClient, OffsetRequest.LatestTime, config, args(2))
+      case _ => usage
+    }
+  }
+
+  private def getAndSetOffsets(zkClient: ZkClient, offsetOption: Long, config: ConsumerConfig, topic: String): Unit = {
+    val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, List(topic))
+    var partitions: Seq[Int] = Nil
+
+    partitionsPerTopicMap.get(topic) match {
+      case Some(l) =>  partitions = l.sortWith((s,t) => s < t)
+      case _ => throw new RuntimeException("Can't find topic " + topic)
+    }
+
+    var numParts = 0
+    for (partition <- partitions) {
+      val brokerHostingPartition = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
+
+      val broker = brokerHostingPartition match {
+        case Some(b) => b
+        case None => throw new KafkaException("Broker " + brokerHostingPartition + " is unavailable. Cannot issue " +
+          "getOffsetsBefore request")
+      }
+
+      ZkUtils.getBrokerInfo(zkClient, broker) match {
+        case Some(brokerInfo) =>
+          val consumer = new SimpleConsumer(brokerInfo.host, brokerInfo.port, 10000, 100 * 1024)
+          val topicAndPartition = TopicAndPartition(topic, partition)
+          val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(offsetOption, 1)))
+          val offset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
+          val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
+
+          println("updating partition " + partition + " with new offset: " + offset)
+          ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition, offset.toString)
+          numParts += 1
+        case None => throw new KafkaException("Broker information for broker id %d does not exist in ZK".format(broker))
+      }
+    }
+    println("updated the offset for " + numParts + " partitions")
+  }
+
+  private def usage() = {
+    println("USAGE: " + UpdateOffsetsInZK.getClass.getName + " [earliest | latest] consumer.properties topic")
+    System.exit(1)
+  }
+}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Thu Sep 27 19:20:08 2012
@@ -31,6 +31,7 @@ import joptsimple.{OptionSpec, OptionSet
 import kafka.common.KafkaException
 import kafka.cluster.Broker
 import util.parsing.json.JSON
+import kafka.api.RequestOrResponse
 import kafka.api.{TopicMetadataRequest, TopicMetadataResponse}
 import kafka.producer.{ProducerPool, SyncProducer}
 
@@ -155,7 +156,7 @@ object Utils extends Logging {
    * @param buffer The buffer to read from
    * @param encoding The encoding in which to read the string
    */
-  def readShortString(buffer: ByteBuffer, encoding: String = "UTF-8"): String = {
+  def readShortString(buffer: ByteBuffer, encoding: String = RequestOrResponse.DefaultCharset): String = {
     val size: Int = buffer.getShort()
     if(size < 0)
       return null
@@ -170,7 +171,7 @@ object Utils extends Logging {
    * @param string The string to write
    * @param encoding The encoding in which to write the string
    */
-  def writeShortString(buffer: ByteBuffer, string: String, encoding: String = "UTF-8"): Unit = {
+  def writeShortString(buffer: ByteBuffer, string: String, encoding: String = RequestOrResponse.DefaultCharset) {
     if(string == null) {
       buffer.putShort(-1)
     } else if(string.length > Short.MaxValue) {
@@ -186,7 +187,7 @@ object Utils extends Logging {
    * @param string The string to write
    * @param encoding The encoding in which to write the string
    */
-  def shortStringLength(string: String, encoding: String = "UTF-8"): Int = {
+  def shortStringLength(string: String, encoding: String = RequestOrResponse.DefaultCharset): Int = {
     if(string == null) {
       2
     } else {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala Thu Sep 27 19:20:08 2012
@@ -22,16 +22,15 @@ import kafka.utils._
 import kafka.server.{KafkaConfig, KafkaServer}
 import junit.framework.Assert._
 import java.util.{Random, Properties}
-import collection.mutable.WrappedArray
 import kafka.consumer.SimpleConsumer
 import org.junit.{After, Before, Test}
 import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
 import kafka.zk.ZooKeeperTestHarness
 import org.scalatest.junit.JUnit3Suite
 import kafka.admin.CreateTopicCommand
-import kafka.api.{FetchRequestBuilder, OffsetRequest}
+import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest}
 import kafka.utils.TestUtils._
-import kafka.common.UnknownTopicOrPartitionException
+import kafka.common.{ErrorMapping, TopicAndPartition, UnknownTopicOrPartitionException}
 
 object LogOffsetTest {
   val random = new Random()  
@@ -67,12 +66,12 @@ class LogOffsetTest extends JUnit3Suite 
 
   @Test
   def testGetOffsetsForUnknownTopic() {
-    try {
-      simpleConsumer.getOffsetsBefore("foo", 0, OffsetRequest.LatestTime, 10)
-      fail("Should fail with UnknownTopicException since topic foo was never created")
-    }catch {
-      case e: UnknownTopicOrPartitionException => // this is ok
-    }
+    val topicAndPartition = TopicAndPartition("foo", 0)
+    val request = OffsetRequest(
+      Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 10)))
+    val offsetResponse = simpleConsumer.getOffsetsBefore(request)
+    assertEquals(ErrorMapping.UnknownTopicOrPartitionCode,
+                 offsetResponse.partitionErrorAndOffsets(topicAndPartition).error)
   }
 
   @Test
@@ -92,16 +91,17 @@ class LogOffsetTest extends JUnit3Suite 
       log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
     log.flush()
 
-
-    val offsetRequest = new OffsetRequest(topic, part, OffsetRequest.LatestTime, 10)
-
-    val offsets = log.getOffsetsBefore(offsetRequest)
-    assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]))
+    val offsets = log.getOffsetsBefore(OffsetRequest.LatestTime, 10)
+    assertEquals(Seq(240L, 216L, 108L, 0L), offsets)
 
     waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000)
-    val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part,
-                                                          OffsetRequest.LatestTime, 10)
-    assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (consumerOffsets: WrappedArray[Long]))
+    val topicAndPartition = TopicAndPartition(topic, part)
+    val offsetRequest = OffsetRequest(
+      Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 10)),
+      replicaId = 0)
+    val consumerOffsets =
+      simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
+    assertEquals(Seq(240L, 216L, 108L, 0L), consumerOffsets)
 
     // try to fetch using latest offset
     val fetchResponse = simpleConsumer.fetch(
@@ -124,8 +124,11 @@ class LogOffsetTest extends JUnit3Suite 
 
     var offsetChanged = false
     for(i <- 1 to 14) {
-      val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, 0,
-        OffsetRequest.EarliestTime, 1)
+      val topicAndPartition = TopicAndPartition(topic, 0)
+      val offsetRequest =
+        OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)))
+      val consumerOffsets =
+        simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
 
       if(consumerOffsets(0) == 1) {
         offsetChanged = true
@@ -153,14 +156,15 @@ class LogOffsetTest extends JUnit3Suite 
     time.sleep(20)
     val now = time.milliseconds
 
-    val offsetRequest = new OffsetRequest(topic, part, now, 10)
-    val offsets = log.getOffsetsBefore(offsetRequest)
-    println("Offsets = " + offsets.mkString(","))
-    assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]))
+    val offsets = log.getOffsetsBefore(now, 10)
+    assertEquals(Seq(240L, 216L, 108L, 0L), offsets)
 
     waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000)
-    val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part, now, 10)
-    assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (consumerOffsets: WrappedArray[Long]))
+    val topicAndPartition = TopicAndPartition(topic, part)
+    val offsetRequest = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(now, 10)), replicaId = 0)
+    val consumerOffsets =
+      simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
+    assertEquals(Seq(240L, 216L, 108L, 0L), consumerOffsets)
   }
 
   @Test
@@ -179,16 +183,17 @@ class LogOffsetTest extends JUnit3Suite 
       log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
     log.flush()
 
-    val offsetRequest = new OffsetRequest(topic, part,
-                                          OffsetRequest.EarliestTime, 10)
-    val offsets = log.getOffsetsBefore(offsetRequest)
+    val offsets = log.getOffsetsBefore(OffsetRequest.EarliestTime, 10)
 
-    assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) )
+    assertEquals(Seq(0L), offsets)
 
     waitUntilTrue(() => isLeaderLocalOnBroker(topic, part, server), 1000)
-    val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part,
-                                                          OffsetRequest.EarliestTime, 10)
-    assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) )
+    val topicAndPartition = TopicAndPartition(topic, part)
+    val offsetRequest =
+      OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 10)))
+    val consumerOffsets =
+      simpleConsumer.getOffsetsBefore(offsetRequest).partitionErrorAndOffsets(topicAndPartition).offsets
+    assertEquals(Seq(0L), consumerOffsets)
   }
 
   private def createBrokerConfig(nodeId: Int, port: Int): Properties = {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala Thu Sep 27 19:20:08 2012
@@ -109,12 +109,15 @@ object RpcDataSerializationTestUtils{
     FetchResponse(1, 1, topicData)
   }
 
-  def createTestOffsetRequest: OffsetRequest = {
-    new OffsetRequest(topic1, 1, 1000, 200)
-  }
+  def createTestOffsetRequest = new OffsetRequest(
+      collection.immutable.Map(TopicAndPartition(topic1, 1) -> PartitionOffsetRequestInfo(1000, 200)),
+      replicaId = 0
+  )
 
   def createTestOffsetResponse: OffsetResponse = {
-    new OffsetResponse(1, Array(1000l, 2000l, 3000l, 4000l))
+    new OffsetResponse(OffsetRequest.CurrentVersion, collection.immutable.Map(
+      TopicAndPartition(topic1, 1) -> PartitionOffsetsResponse(ErrorMapping.NoError, Seq(1000l, 2000l, 3000l, 4000l)))
+    )
   }
 
   def createTestTopicMetadataRequest: TopicMetadataRequest = {
@@ -191,7 +194,7 @@ class RpcDataSerializationTest extends J
     assertEquals("The original and deserialzed fetchRequest should be the same", fetchRequest,
                  deserializedFetchRequest)
 
-    buffer = ByteBuffer.allocate(offsetRequest.sizeInBytes())
+    buffer = ByteBuffer.allocate(offsetRequest.sizeInBytes)
     offsetRequest.writeTo(buffer)
     buffer.rewind()
     val deserializedOffsetRequest = OffsetRequest.readFrom(buffer)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala Thu Sep 27 19:20:08 2012
@@ -19,12 +19,16 @@ package kafka.server
 import kafka.cluster.{Partition, Replica}
 import kafka.log.Log
 import kafka.message.{ByteBufferMessageSet, Message}
-import kafka.network.RequestChannel
+import kafka.network.{BoundedByteBufferSend, RequestChannel}
 import kafka.utils.{Time, TestUtils, MockTime}
 import org.easymock.EasyMock
 import org.I0Itec.zkclient.ZkClient
 import org.scalatest.junit.JUnit3Suite
-import kafka.api.{FetchRequest, FetchRequestBuilder}
+import kafka.api._
+import scala.Some
+import org.junit.Assert._
+import kafka.common.TopicAndPartition
+
 
 class SimpleFetchTest extends JUnit3Suite {
 
@@ -88,7 +92,7 @@ class SimpleFetchTest extends JUnit3Suit
 
     // This request (from a follower) wants to read up to 2*HW but should only get back up to HW bytes into the log
     val goodFetch = new FetchRequestBuilder()
-      .replicaId(FetchRequest.NonFollowerId)
+      .replicaId(Request.NonFollowerId)
       .addFetch(topic, partitionId, 0, hw*2)
       .build()
     val goodFetchBB = TestUtils.createRequestByteBuffer(goodFetch)
@@ -98,6 +102,33 @@ class SimpleFetchTest extends JUnit3Suit
 
     // make sure the log only reads bytes between 0->HW (5)
     EasyMock.verify(log)
+
+    // Test offset request from non-replica
+    val topicAndPartition = TopicAndPartition(topic, partition.partitionId)
+    val offsetRequest = OffsetRequest(
+      Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
+    val offsetRequestBB = TestUtils.createRequestByteBuffer(offsetRequest)
+
+    EasyMock.reset(logManager)
+    EasyMock.reset(replicaManager)
+
+    EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get)
+    EasyMock.expect(replicaManager.logManager).andReturn(logManager)
+    EasyMock.expect(logManager.getOffsets(topicAndPartition, OffsetRequest.LatestTime, 1)).andReturn(Seq(leo))
+
+    EasyMock.replay(replicaManager)
+    EasyMock.replay(logManager)
+
+    apis.handleOffsetRequest(new RequestChannel.Request(processor = 0,
+                                                        requestKey = 5,
+                                                        buffer = offsetRequestBB,
+                                                        startTimeNs = 1))
+    val offsetResponseBuffer = requestChannel.receiveResponse(0).responseSend.asInstanceOf[BoundedByteBufferSend].buffer
+    val offsetResponse = OffsetResponse.readFrom(offsetResponseBuffer)
+    EasyMock.verify(replicaManager)
+    EasyMock.verify(logManager)
+    assertEquals(1, offsetResponse.partitionErrorAndOffsets(topicAndPartition).offsets.size)
+    assertEquals(hw.toLong, offsetResponse.partitionErrorAndOffsets(topicAndPartition).offsets.head)
   }
 
   /**
@@ -173,6 +204,34 @@ class SimpleFetchTest extends JUnit3Suit
      * an offset of 15
      */
     EasyMock.verify(log)
+
+    // Test offset request from replica
+    val topicAndPartition = TopicAndPartition(topic, partition.partitionId)
+    val offsetRequest = OffsetRequest(
+      Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)),
+      replicaId = followerReplicaId)
+    val offsetRequestBB = TestUtils.createRequestByteBuffer(offsetRequest)
+
+    EasyMock.reset(logManager)
+    EasyMock.reset(replicaManager)
+
+    EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get)
+    EasyMock.expect(replicaManager.logManager).andReturn(logManager)
+    EasyMock.expect(logManager.getOffsets(topicAndPartition, OffsetRequest.LatestTime, 1)).andReturn(Seq(leo))
+
+    EasyMock.replay(replicaManager)
+    EasyMock.replay(logManager)
+
+    apis.handleOffsetRequest(new RequestChannel.Request(processor = 1,
+                                                        requestKey = 5,
+                                                        buffer = offsetRequestBB,
+                                                        startTimeNs = 1))
+    val offsetResponseBuffer = requestChannel.receiveResponse(1).responseSend.asInstanceOf[BoundedByteBufferSend].buffer
+    val offsetResponse = OffsetResponse.readFrom(offsetResponseBuffer)
+    EasyMock.verify(replicaManager)
+    EasyMock.verify(logManager)
+    assertEquals(1, offsetResponse.partitionErrorAndOffsets(topicAndPartition).offsets.size)
+    assertEquals(leo.toLong, offsetResponse.partitionErrorAndOffsets(topicAndPartition).offsets.head)
   }
 
   private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time, leaderId: Int,

Modified: incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala?rev=1391168&r1=1391167&r2=1391168&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala (original)
+++ incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/SimpleConsumerPerformance.scala Thu Sep 27 19:20:08 2012
@@ -19,11 +19,12 @@ package kafka.perf
 
 import java.net.URI
 import java.text.SimpleDateFormat
-import kafka.api.{FetchRequestBuilder, OffsetRequest}
+import kafka.api.{PartitionOffsetRequestInfo, FetchRequestBuilder, OffsetRequest}
 import kafka.consumer.SimpleConsumer
 import kafka.utils._
 import org.apache.log4j.Logger
-import kafka.message.ByteBufferMessageSet
+import kafka.common.TopicAndPartition
+
 
 /**
  * Performance test for the simple consumer
@@ -44,8 +45,11 @@ object SimpleConsumerPerformance {
     val consumer = new SimpleConsumer(config.url.getHost, config.url.getPort, 30*1000, 2*config.fetchSize)
 
     // reset to latest or smallest offset
-    var offset: Long = if(config.fromLatest) consumer.getOffsetsBefore(config.topic, config.partition, OffsetRequest.LatestTime, 1).head
-                       else consumer.getOffsetsBefore(config.topic, config.partition, OffsetRequest.EarliestTime, 1).head
+    val topicAndPartition = TopicAndPartition(config.topic, config.partition)
+    val request = OffsetRequest(Map(
+      topicAndPartition -> PartitionOffsetRequestInfo(if (config.fromLatest) OffsetRequest.LatestTime else OffsetRequest.EarliestTime, 1)
+      ))
+    var offset: Long = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
 
     val startMs = System.currentTimeMillis
     var done = false