You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/03/13 04:59:29 UTC

git commit: kafka-763; Add an option to replica from the largest offset during unclean leader election; patched by Swapnil Ghike; reviewed by Jun Rao

Updated Branches:
  refs/heads/0.8 485afe646 -> 290d5e0ea


kafka-763; Add an option to replica from the largest offset during unclean leader election; patched by Swapnil Ghike; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/290d5e0e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/290d5e0e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/290d5e0e

Branch: refs/heads/0.8
Commit: 290d5e0eac38e9917c64353a131154821b899f26
Parents: 485afe6
Author: Swapnil Ghike <sg...@linkedin.com>
Authored: Tue Mar 12 20:59:09 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Mar 12 20:59:09 2013 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/consumer/ConsumerConfig.scala |    3 +-
 .../main/scala/kafka/consumer/SimpleConsumer.scala |   79 ++++++---------
 .../scala/kafka/server/ReplicaFetcherThread.scala  |   43 ++++++---
 .../scala/kafka/tools/SimpleConsumerShell.scala    |   19 +++-
 4 files changed, 78 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/290d5e0e/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
index 2ebd72a..e9cfd10 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
@@ -63,7 +63,8 @@ object ConsumerConfig extends Config {
     autoOffsetReset match {
       case OffsetRequest.SmallestTimeString =>
       case OffsetRequest.LargestTimeString =>
-      case _ => throw new InvalidConfigException("Wrong value " + autoOffsetReset + " of autoOffsetReset in ConsumerConfig")
+      case _ => throw new InvalidConfigException("Wrong value " + autoOffsetReset + " of auto.offset.reset in ConsumerConfig; " +
+                                                 "Valid values are " + OffsetRequest.SmallestTimeString + " and " + OffsetRequest.LargestTimeString)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/290d5e0e/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index 6b83deb..dedbb50 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -22,59 +22,10 @@ import kafka.network._
 import kafka.utils._
 import kafka.utils.ZkUtils._
 import collection.immutable
-import kafka.common.{TopicAndPartition, KafkaException}
+import kafka.common.{ErrorMapping, TopicAndPartition, KafkaException}
 import org.I0Itec.zkclient.ZkClient
 import kafka.cluster.Broker
 
-
-object SimpleConsumer extends Logging {
-  def earliestOrLatestOffset(broker: Broker, 
-                             topic: String, 
-                             partitionId: Int, 
-                             earliestOrLatest: Long,
-                             clientId: String, 
-                             isFromOrdinaryConsumer: Boolean): Long = {
-    var simpleConsumer: SimpleConsumer = null
-    var producedOffset: Long = -1L
-    try {
-      simpleConsumer = new SimpleConsumer(broker.host, broker.port, ConsumerConfig.SocketTimeout,
-                                          ConsumerConfig.SocketBufferSize, clientId)
-      val topicAndPartition = TopicAndPartition(topic, partitionId)
-      val request = if(isFromOrdinaryConsumer)
-        new OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)))
-      else
-        new OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)),
-                          0, Request.DebuggingConsumerId)
-      producedOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
-    } catch {
-      case e =>
-        error("error in earliestOrLatestOffset() ", e)
-    }
-    finally {
-      if (simpleConsumer != null)
-        simpleConsumer.close()
-    }
-    producedOffset
-  }
-
-  def earliestOrLatestOffset(zkClient: ZkClient, 
-                             topic: String, 
-                             brokerId: Int, 
-                             partitionId: Int,
-                             earliestOrLatest: Long, 
-                             clientId: String, 
-                             isFromOrdinaryConsumer: Boolean = true): Long = {
-    val cluster = getCluster(zkClient)
-    val broker = cluster.getBroker(brokerId) match {
-      case Some(b) => b
-      case None => throw new KafkaException("Broker " + brokerId + " is unavailable. Cannot issue " +
-                                                    "getOffsetsBefore request")
-    }
-    earliestOrLatestOffset(broker, topic, partitionId, earliestOrLatest, clientId, isFromOrdinaryConsumer)
-  }
-}
-
-
 /**
  * A consumer of kafka messages
  */
@@ -180,5 +131,33 @@ class SimpleConsumer(val host: String,
       connect()
     }
   }
+
+  /**
+   * Get the earliest or latest offset of a given topic, partition.
+   * @param topicAndPartition Topic and partition of which the offset is needed.
+   * @param earliestOrLatest A value to indicate earliest or latest offset.
+   * @param consumerId Id of the consumer which could be a client or a follower broker.
+   * @param isFromOrdinaryConsumer Boolean to specify ordinary consumer or debugging consumer.
+   * @return Requested offset.
+   */
+  def earliestOrLatestOffset(topicAndPartition: TopicAndPartition,
+                             earliestOrLatest: Long,
+                             consumerId: Int = Request.OrdinaryConsumerId,
+                             isFromOrdinaryConsumer: Boolean = true): Long = {
+    val request =
+      if(isFromOrdinaryConsumer)
+       OffsetRequest(requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)),
+                     replicaId = consumerId)
+      else
+       OffsetRequest(requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)),
+                     replicaId = Request.DebuggingConsumerId)
+
+    val partitionErrorAndOffset = getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition)
+    val offset = partitionErrorAndOffset.error match {
+      case ErrorMapping.NoError => partitionErrorAndOffset.offsets.head
+      case _ => throw ErrorMapping.exceptionFor(partitionErrorAndOffset.error)
+    }
+    offset
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/290d5e0e/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 37b71be..edd3164 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -63,21 +63,38 @@ class ReplicaFetcherThread(name:String,
     }
   }
 
-  // handle a partition whose offset is out of range and return a new fetch offset
+  /**
+   * Handle a partition whose offset is out of range and return a new fetch offset.
+   */
   def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long = {
-    // This means the local replica is out of date. Truncate the log and catch up from beginning.
-    val request = OffsetRequest(
-      replicaId = brokerConfig.brokerId,
-      requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1))
-    )
-    val partitionErrorAndOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition)
-    val offset = partitionErrorAndOffset.error match {
-      case ErrorMapping.NoError => partitionErrorAndOffset.offsets.head
-      case _ => throw ErrorMapping.exceptionFor(partitionErrorAndOffset.error)
-    }
     val replica = replicaMgr.getReplica(topicAndPartition.topic, topicAndPartition.partition).get
-    replica.log.get.truncateAndStartWithNewOffset(offset)
-    offset
+    val log = replica.log.get
+
+    /**
+     * Unclean leader election: A follower goes down, in the meanwhile the leader keeps appending messages. The follower comes back up
+     * and before it has completely caught up with the leader's logs, all replicas in the ISR go down. The follower is now uncleanly
+     * elected as the new leader, and it starts appending messages from the client. The old leader comes back up, becomes a follower
+     * and it may discover that the current leader's end offset is behind its own end offset.
+     *
+     * In such a case, truncate the current follower's log to the current leader's end offset and continue fetching.
+     *
+     * There is a potential for a mismatch between the logs of the two replicas here. We don't fix this mismatch as of now.
+     */
+    val leaderEndOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.LatestTime, brokerConfig.brokerId)
+    if (leaderEndOffset < log.logEndOffset) {
+      log.truncateTo(leaderEndOffset)
+      return leaderEndOffset
+    }
+
+    /**
+     * The follower could have been down for a long time and when it starts up, its end offset could be smaller than the leader's
+     * start offset because the leader has deleted old logs (log.logEndOffset < leaderStartOffset).
+     *
+     * Roll out a new log at the follower with the start offset equal to the current leader's start offset and continue fetching.
+     */
+    val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, brokerConfig.brokerId)
+    log.truncateAndStartWithNewOffset(leaderStartOffset)
+    leaderStartOffset
   }
 
   // any logic for partitions whose leader has changed

http://git-wip-us.apache.org/repos/asf/kafka/blob/290d5e0e/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index 8cdbb00..cdfb1b5 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -24,6 +24,7 @@ import kafka.client.ClientUtils
 import kafka.api.{OffsetRequest, FetchRequestBuilder, Request}
 import kafka.cluster.Broker
 import scala.collection.JavaConversions._
+import kafka.common.TopicAndPartition
 
 /**
  * Command line program to dump out messages to standard out using the simple consumer
@@ -160,8 +161,22 @@ object SimpleConsumerShell extends Logging {
       System.err.println("Invalid starting offset: %d".format(startingOffset))
       System.exit(1)
     }
-    if(startingOffset < 0)
-      startingOffset = SimpleConsumer.earliestOrLatestOffset(fetchTargetBroker, topic, partitionId, startingOffset, clientId, false)
+    if (startingOffset < 0) {
+      val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, ConsumerConfig.SocketTimeout,
+                                              ConsumerConfig.SocketBufferSize, clientId)
+      try {
+        startingOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition = TopicAndPartition(topic, partitionId),
+                                                               earliestOrLatest = startingOffset,
+                                                               isFromOrdinaryConsumer = false)
+      } catch {
+        case t: Throwable =>
+          System.err.println("Error in getting earliest or latest offset due to: " + Utils.stackTrace(t))
+          System.exit(1)
+      } finally {
+        if (simpleConsumer != null)
+          simpleConsumer.close()
+      }
+    }
 
     // initializing formatter
     val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]