You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/03/13 21:12:03 UTC

git commit: KAFKA-763 follow up changes; reviewed by Neha Narkhede

Updated Branches:
  refs/heads/0.8 c5462864a -> 0a9283530


KAFKA-763 follow up changes; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0a928353
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0a928353
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0a928353

Branch: refs/heads/0.8
Commit: 0a9283530418781941b7c232a8b09e4ec071b0e9
Parents: c546286
Author: Swapnil Ghike <sg...@linkedin.com>
Authored: Wed Mar 13 13:11:47 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Wed Mar 13 13:11:54 2013 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/consumer/SimpleConsumer.scala |   18 +++---------
 .../scala/kafka/server/ReplicaFetcherThread.scala  |   22 +++++++-------
 .../scala/kafka/tools/SimpleConsumerShell.scala    |    5 +--
 3 files changed, 17 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0a928353/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index dedbb50..5a0784a 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -136,22 +136,12 @@ class SimpleConsumer(val host: String,
    * Get the earliest or latest offset of a given topic, partition.
    * @param topicAndPartition Topic and partition of which the offset is needed.
    * @param earliestOrLatest A value to indicate earliest or latest offset.
-   * @param consumerId Id of the consumer which could be a client or a follower broker.
-   * @param isFromOrdinaryConsumer Boolean to specify ordinary consumer or debugging consumer.
+   * @param consumerId Id of the consumer which could be a consumer client, SimpleConsumerShell or a follower broker.
    * @return Requested offset.
    */
-  def earliestOrLatestOffset(topicAndPartition: TopicAndPartition,
-                             earliestOrLatest: Long,
-                             consumerId: Int = Request.OrdinaryConsumerId,
-                             isFromOrdinaryConsumer: Boolean = true): Long = {
-    val request =
-      if(isFromOrdinaryConsumer)
-       OffsetRequest(requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)),
-                     replicaId = consumerId)
-      else
-       OffsetRequest(requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)),
-                     replicaId = Request.DebuggingConsumerId)
-
+  def earliestOrLatestOffset(topicAndPartition: TopicAndPartition, earliestOrLatest: Long, consumerId: Int): Long = {
+    val request = OffsetRequest(requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)),
+                                replicaId = consumerId)
     val partitionErrorAndOffset = getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition)
     val offset = partitionErrorAndOffset.error match {
       case ErrorMapping.NoError => partitionErrorAndOffset.offsets.head

http://git-wip-us.apache.org/repos/asf/kafka/blob/0a928353/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index edd3164..d4f15c1 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -83,18 +83,18 @@ class ReplicaFetcherThread(name:String,
     val leaderEndOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.LatestTime, brokerConfig.brokerId)
     if (leaderEndOffset < log.logEndOffset) {
       log.truncateTo(leaderEndOffset)
-      return leaderEndOffset
+      leaderEndOffset
+    } else {
+      /**
+       * The follower could have been down for a long time and when it starts up, its end offset could be smaller than the leader's
+       * start offset because the leader has deleted old logs (log.logEndOffset < leaderStartOffset).
+       *
+       * Roll out a new log at the follower with the start offset equal to the current leader's start offset and continue fetching.
+       */
+      val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, brokerConfig.brokerId)
+      log.truncateAndStartWithNewOffset(leaderStartOffset)
+      leaderStartOffset
     }
-
-    /**
-     * The follower could have been down for a long time and when it starts up, its end offset could be smaller than the leader's
-     * start offset because the leader has deleted old logs (log.logEndOffset < leaderStartOffset).
-     *
-     * Roll out a new log at the follower with the start offset equal to the current leader's start offset and continue fetching.
-     */
-    val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, brokerConfig.brokerId)
-    log.truncateAndStartWithNewOffset(leaderStartOffset)
-    leaderStartOffset
   }
 
   // any logic for partitions whose leader has changed

http://git-wip-us.apache.org/repos/asf/kafka/blob/0a928353/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index cdfb1b5..8f274df 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -165,9 +165,8 @@ object SimpleConsumerShell extends Logging {
       val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, ConsumerConfig.SocketTimeout,
                                               ConsumerConfig.SocketBufferSize, clientId)
       try {
-        startingOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition = TopicAndPartition(topic, partitionId),
-                                                               earliestOrLatest = startingOffset,
-                                                               isFromOrdinaryConsumer = false)
+        startingOffset = simpleConsumer.earliestOrLatestOffset(TopicAndPartition(topic, partitionId), startingOffset,
+                                                               Request.DebuggingConsumerId)
       } catch {
         case t: Throwable =>
           System.err.println("Error in getting earliest or latest offset due to: " + Utils.stackTrace(t))