You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/03/13 21:12:03 UTC
git commit: KAFKA-763 follow up changes; reviewed by Neha Narkhede
Updated Branches:
refs/heads/0.8 c5462864a -> 0a9283530
KAFKA-763 follow up changes; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0a928353
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0a928353
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0a928353
Branch: refs/heads/0.8
Commit: 0a9283530418781941b7c232a8b09e4ec071b0e9
Parents: c546286
Author: Swapnil Ghike <sg...@linkedin.com>
Authored: Wed Mar 13 13:11:47 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Wed Mar 13 13:11:54 2013 -0700
----------------------------------------------------------------------
.../main/scala/kafka/consumer/SimpleConsumer.scala | 18 +++---------
.../scala/kafka/server/ReplicaFetcherThread.scala | 22 +++++++-------
.../scala/kafka/tools/SimpleConsumerShell.scala | 5 +--
3 files changed, 17 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/0a928353/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index dedbb50..5a0784a 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -136,22 +136,12 @@ class SimpleConsumer(val host: String,
* Get the earliest or latest offset of a given topic, partition.
* @param topicAndPartition Topic and partition of which the offset is needed.
* @param earliestOrLatest A value to indicate earliest or latest offset.
- * @param consumerId Id of the consumer which could be a client or a follower broker.
- * @param isFromOrdinaryConsumer Boolean to specify ordinary consumer or debugging consumer.
+ * @param consumerId Id of the consumer which could be a consumer client, SimpleConsumerShell or a follower broker.
* @return Requested offset.
*/
- def earliestOrLatestOffset(topicAndPartition: TopicAndPartition,
- earliestOrLatest: Long,
- consumerId: Int = Request.OrdinaryConsumerId,
- isFromOrdinaryConsumer: Boolean = true): Long = {
- val request =
- if(isFromOrdinaryConsumer)
- OffsetRequest(requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)),
- replicaId = consumerId)
- else
- OffsetRequest(requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)),
- replicaId = Request.DebuggingConsumerId)
-
+ def earliestOrLatestOffset(topicAndPartition: TopicAndPartition, earliestOrLatest: Long, consumerId: Int): Long = {
+ val request = OffsetRequest(requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)),
+ replicaId = consumerId)
val partitionErrorAndOffset = getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition)
val offset = partitionErrorAndOffset.error match {
case ErrorMapping.NoError => partitionErrorAndOffset.offsets.head
http://git-wip-us.apache.org/repos/asf/kafka/blob/0a928353/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index edd3164..d4f15c1 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -83,18 +83,18 @@ class ReplicaFetcherThread(name:String,
val leaderEndOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.LatestTime, brokerConfig.brokerId)
if (leaderEndOffset < log.logEndOffset) {
log.truncateTo(leaderEndOffset)
- return leaderEndOffset
+ leaderEndOffset
+ } else {
+ /**
+ * The follower could have been down for a long time and when it starts up, its end offset could be smaller than the leader's
+ * start offset because the leader has deleted old logs (log.logEndOffset < leaderStartOffset).
+ *
+ * Roll out a new log at the follower with the start offset equal to the current leader's start offset and continue fetching.
+ */
+ val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, brokerConfig.brokerId)
+ log.truncateAndStartWithNewOffset(leaderStartOffset)
+ leaderStartOffset
}
-
- /**
- * The follower could have been down for a long time and when it starts up, its end offset could be smaller than the leader's
- * start offset because the leader has deleted old logs (log.logEndOffset < leaderStartOffset).
- *
- * Roll out a new log at the follower with the start offset equal to the current leader's start offset and continue fetching.
- */
- val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, brokerConfig.brokerId)
- log.truncateAndStartWithNewOffset(leaderStartOffset)
- leaderStartOffset
}
// any logic for partitions whose leader has changed
http://git-wip-us.apache.org/repos/asf/kafka/blob/0a928353/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
index cdfb1b5..8f274df 100644
--- a/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
+++ b/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -165,9 +165,8 @@ object SimpleConsumerShell extends Logging {
val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, ConsumerConfig.SocketTimeout,
ConsumerConfig.SocketBufferSize, clientId)
try {
- startingOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition = TopicAndPartition(topic, partitionId),
- earliestOrLatest = startingOffset,
- isFromOrdinaryConsumer = false)
+ startingOffset = simpleConsumer.earliestOrLatestOffset(TopicAndPartition(topic, partitionId), startingOffset,
+ Request.DebuggingConsumerId)
} catch {
case t: Throwable =>
System.err.println("Error in getting earliest or latest offset due to: " + Utils.stackTrace(t))