You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/03/13 04:59:29 UTC
git commit: kafka-763;
Add an option to replica from the largest offset during unclean
leader election; patched by Swapnil Ghike; reviewed by Jun Rao
Updated Branches:
refs/heads/0.8 485afe646 -> 290d5e0ea
kafka-763; Add an option to replica from the largest offset during unclean leader election; patched by Swapnil Ghike; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/290d5e0e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/290d5e0e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/290d5e0e
Branch: refs/heads/0.8
Commit: 290d5e0eac38e9917c64353a131154821b899f26
Parents: 485afe6
Author: Swapnil Ghike <sg...@linkedin.com>
Authored: Tue Mar 12 20:59:09 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Mar 12 20:59:09 2013 -0700
----------------------------------------------------------------------
.../main/scala/kafka/consumer/ConsumerConfig.scala | 3 +-
.../main/scala/kafka/consumer/SimpleConsumer.scala | 79 ++++++---------
.../scala/kafka/server/ReplicaFetcherThread.scala | 43 ++++++---
.../scala/kafka/tools/SimpleConsumerShell.scala | 19 +++-
4 files changed, 78 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/290d5e0e/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
index 2ebd72a..e9cfd10 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
@@ -63,7 +63,8 @@ object ConsumerConfig extends Config {
autoOffsetReset match {
case OffsetRequest.SmallestTimeString =>
case OffsetRequest.LargestTimeString =>
- case _ => throw new InvalidConfigException("Wrong value " + autoOffsetReset + " of autoOffsetReset in ConsumerConfig")
+ case _ => throw new InvalidConfigException("Wrong value " + autoOffsetReset + " of auto.offset.reset in ConsumerConfig; " +
+ "Valid values are " + OffsetRequest.SmallestTimeString + " and " + OffsetRequest.LargestTimeString)
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/290d5e0e/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index 6b83deb..dedbb50 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -22,59 +22,10 @@ import kafka.network._
import kafka.utils._
import kafka.utils.ZkUtils._
import collection.immutable
-import kafka.common.{TopicAndPartition, KafkaException}
+import kafka.common.{ErrorMapping, TopicAndPartition, KafkaException}
import org.I0Itec.zkclient.ZkClient
import kafka.cluster.Broker
-
-object SimpleConsumer extends Logging {
- def earliestOrLatestOffset(broker: Broker,
- topic: String,
- partitionId: Int,
- earliestOrLatest: Long,
- clientId: String,
- isFromOrdinaryConsumer: Boolean): Long = {
- var simpleConsumer: SimpleConsumer = null
- var producedOffset: Long = -1L
- try {
- simpleConsumer = new SimpleConsumer(broker.host, broker.port, ConsumerConfig.SocketTimeout,
- ConsumerConfig.SocketBufferSize, clientId)
- val topicAndPartition = TopicAndPartition(topic, partitionId)
- val request = if(isFromOrdinaryConsumer)
- new OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)))
- else
- new OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)),
- 0, Request.DebuggingConsumerId)
- producedOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
- } catch {
- case e =>
- error("error in earliestOrLatestOffset() ", e)
- }
- finally {
- if (simpleConsumer != null)
- simpleConsumer.close()
- }
- producedOffset
- }
-
- def earliestOrLatestOffset(zkClient: ZkClient,
- topic: String,
- brokerId: Int,
- partitionId: Int,
- earliestOrLatest: Long,
- clientId: String,
- isFromOrdinaryConsumer: Boolean = true): Long = {
- val cluster = getCluster(zkClient)
- val broker = cluster.getBroker(brokerId) match {
- case Some(b) => b
- case None => throw new KafkaException("Broker " + brokerId + " is unavailable. Cannot issue " +
- "getOffsetsBefore request")
- }
- earliestOrLatestOffset(broker, topic, partitionId, earliestOrLatest, clientId, isFromOrdinaryConsumer)
- }
-}
-
-
/**
* A consumer of kafka messages
*/
@@ -180,5 +131,33 @@ class SimpleConsumer(val host: String,
connect()
}
}
+
+ /**
+ * Get the earliest or latest offset of a given topic, partition.
+ * @param topicAndPartition Topic and partition of which the offset is needed.
+ * @param earliestOrLatest A value to indicate earliest or latest offset.
+ * @param consumerId Id of the consumer which could be a client or a follower broker.
+ * @param isFromOrdinaryConsumer Boolean to specify ordinary consumer or debugging consumer.
+ * @return Requested offset.
+ */
+ def earliestOrLatestOffset(topicAndPartition: TopicAndPartition,
+ earliestOrLatest: Long,
+ consumerId: Int = Request.OrdinaryConsumerId,
+ isFromOrdinaryConsumer: Boolean = true): Long = {
+ val request =
+ if(isFromOrdinaryConsumer)
+ OffsetRequest(requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)),
+ replicaId = consumerId)
+ else
+ OffsetRequest(requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)),
+ replicaId = Request.DebuggingConsumerId)
+
+ val partitionErrorAndOffset = getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition)
+ val offset = partitionErrorAndOffset.error match {
+ case ErrorMapping.NoError => partitionErrorAndOffset.offsets.head
+ case _ => throw ErrorMapping.exceptionFor(partitionErrorAndOffset.error)
+ }
+ offset
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/290d5e0e/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 37b71be..edd3164 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -63,21 +63,38 @@ class ReplicaFetcherThread(name:String,
}
}
- // handle a partition whose offset is out of range and return a new fetch offset
+ /**
+ * Handle a partition whose offset is out of range and return a new fetch offset.
+ */
def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long = {
- // This means the local replica is out of date. Truncate the log and catch up from beginning.
- val request = OffsetRequest(
- replicaId = brokerConfig.brokerId,
- requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1))
- )
- val partitionErrorAndOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition)
- val offset = partitionErrorAndOffset.error match {
- case ErrorMapping.NoError => partitionErrorAndOffset.offsets.head
- case _ => throw ErrorMapping.exceptionFor(partitionErrorAndOffset.error)
- }
val replica = replicaMgr.getReplica(topicAndPartition.topic, topicAndPartition.partition).get
- replica.log.get.truncateAndStartWithNewOffset(offset)
- offset
+ val log = replica.log.get
+
+ /**
+ * Unclean leader election: A follower goes down, in the meanwhile the leader keeps appending messages. The follower comes back up
+ * and before it has completely caught up with the leader's logs, all replicas in the ISR go down. The follower is now uncleanly
+ * elected as the new leader, and it starts appending messages from the client. The old leader comes back up, becomes a follower
+ * and it may discover that the current leader's end offset is behind its own end offset.
+ *
+ * In such a case, truncate the current follower's log to the current leader's end offset and continue fetching.
+ *
+ * There is a potential for a mismatch between the logs of the two replicas here. We don't fix this mismatch as of now.
+ */
+ val leaderEndOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.LatestTime, brokerConfig.brokerId)
+ if (leaderEndOffset < log.logEndOffset) {
+ log.truncateTo(leaderEndOffset)
+ return leaderEndOffset
+ }
+
+ /**
+ * The follower could have been down for a long time and when it starts up, its end offset could be smaller than the leader's
+ * start offset because the leader has deleted old logs (log.logEndOffset < leaderStartOffset).
+ *
+ * Roll out a new log at the follower with the start offset equal to the current leader's start offset and continue fetching.
+ */
+ val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, brokerConfig.brokerId)
+ log.truncateAndStartWithNewOffset(leaderStartOffset)
+ leaderStartOffset
}
// any logic for partitions whose leader has changed
http://git-wip-us.apache.org/repos/asf/kafka/blob/290d5e0e/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index 8cdbb00..cdfb1b5 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -24,6 +24,7 @@ import kafka.client.ClientUtils
import kafka.api.{OffsetRequest, FetchRequestBuilder, Request}
import kafka.cluster.Broker
import scala.collection.JavaConversions._
+import kafka.common.TopicAndPartition
/**
* Command line program to dump out messages to standard out using the simple consumer
@@ -160,8 +161,22 @@ object SimpleConsumerShell extends Logging {
System.err.println("Invalid starting offset: %d".format(startingOffset))
System.exit(1)
}
- if(startingOffset < 0)
- startingOffset = SimpleConsumer.earliestOrLatestOffset(fetchTargetBroker, topic, partitionId, startingOffset, clientId, false)
+ if (startingOffset < 0) {
+ val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, ConsumerConfig.SocketTimeout,
+ ConsumerConfig.SocketBufferSize, clientId)
+ try {
+ startingOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition = TopicAndPartition(topic, partitionId),
+ earliestOrLatest = startingOffset,
+ isFromOrdinaryConsumer = false)
+ } catch {
+ case t: Throwable =>
+ System.err.println("Error in getting earliest or latest offset due to: " + Utils.stackTrace(t))
+ System.exit(1)
+ } finally {
+ if (simpleConsumer != null)
+ simpleConsumer.close()
+ }
+ }
// initializing formatter
val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]