You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/01/17 18:36:32 UTC
git commit: Consumer rebalance fails if no leader available for a
partition and stops all fetchers; patched by Maxime Brugidou;
reviewed by Jun Rao; kafka-693
Updated Branches:
refs/heads/0.8 da7f14676 -> 214a0af46
Consumer rebalance fails if no leader available for a partition and stops all fetchers; patched by Maxime Brugidou; reviewed by Jun Rao; kafka-693
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/214a0af4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/214a0af4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/214a0af4
Branch: refs/heads/0.8
Commit: 214a0af46b98aa9eaf54d0fbe982bc8ba2ae0a74
Parents: da7f146
Author: Jun Rao <ju...@gmail.com>
Authored: Thu Jan 17 09:36:01 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Jan 17 09:36:01 2013 -0800
----------------------------------------------------------------------
.../kafka/consumer/ConsumerFetcherManager.scala | 8 +++-
.../kafka/consumer/ConsumerFetcherThread.scala | 7 +++-
.../scala/kafka/consumer/PartitionTopicInfo.scala | 9 +++-
.../consumer/ZookeeperConsumerConnector.scala | 31 ++-------------
.../scala/kafka/server/AbstractFetcherThread.scala | 21 +++++++---
.../scala/kafka/server/ReplicaFetcherThread.scala | 9 +++-
.../unit/kafka/consumer/ConsumerIteratorTest.scala | 1 -
.../consumer/ZookeeperConsumerConnectorTest.scala | 1 -
.../scala/unit/kafka/integration/FetcherTest.scala | 1 -
9 files changed, 44 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/214a0af4/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index a6cbfb6..69c6b3e 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -72,9 +72,13 @@ class ConsumerFetcherManager(private val consumerIdString: String,
leaderForPartitionsMap.foreach{
case(topicAndPartition, leaderBroker) =>
val pti = partitionMap(topicAndPartition)
- addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(), leaderBroker)
+ try {
+ addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(), leaderBroker)
+ noLeaderPartitionSet -= topicAndPartition
+ } catch {
+ case t => warn("Failed to add fetcher for %s to broker %s".format(topicAndPartition, leaderBroker), t)
+ }
}
- noLeaderPartitionSet --= leaderForPartitionsMap.keySet
shutdownIdleFetcherThreads()
} catch {
http://git-wip-us.apache.org/repos/asf/kafka/blob/214a0af4/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index 713c7c9..1135f5d 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -22,6 +22,7 @@ import kafka.server.AbstractFetcherThread
import kafka.message.ByteBufferMessageSet
import kafka.api.{PartitionOffsetRequestInfo, Request, OffsetRequest, FetchResponsePartitionData}
import kafka.common.TopicAndPartition
+import kafka.common.ErrorMapping
class ConsumerFetcherThread(name: String,
@@ -57,7 +58,11 @@ class ConsumerFetcherThread(name: String,
case _ => startTimestamp = OffsetRequest.LatestTime
}
val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(startTimestamp, 1)))
- val newOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
+ val partitionErrorAndOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition)
+ val newOffset = partitionErrorAndOffset.error match {
+ case ErrorMapping.NoError => partitionErrorAndOffset.offsets.head
+ case _ => throw ErrorMapping.exceptionFor(partitionErrorAndOffset.error)
+ }
val pti = partitionMap(topicAndPartition)
pti.resetFetchOffset(newOffset)
pti.resetConsumeOffset(newOffset)
http://git-wip-us.apache.org/repos/asf/kafka/blob/214a0af4/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
index 6003cab..9792244 100644
--- a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
+++ b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
@@ -23,7 +23,6 @@ import kafka.message._
import kafka.utils.Logging
class PartitionTopicInfo(val topic: String,
- val brokerId: Int,
val partitionId: Int,
private val chunkQueue: BlockingQueue[FetchedDataChunk],
private val consumedOffset: AtomicLong,
@@ -70,7 +69,7 @@ class PartitionTopicInfo(val topic: String,
* Get the next fetch offset after this message set
*/
private def nextOffset(messages: ByteBufferMessageSet): Long = {
- var nextOffset = -1L
+ var nextOffset = PartitionTopicInfo.InvalidOffset
val iter = messages.shallowIterator
while(iter.hasNext)
nextOffset = iter.next.nextOffset
@@ -80,3 +79,9 @@ class PartitionTopicInfo(val topic: String,
override def toString(): String = topic + ":" + partitionId.toString + ": fetched offset = " + fetchedOffset.get +
": consumed offset = " + consumedOffset.get
}
+
+object PartitionTopicInfo {
+ val InvalidOffset = -1L
+
+ def isOffsetInvalid(offset: Long) = offset < 0L
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/214a0af4/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 42a9628..c1f8513 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -402,18 +402,12 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val brokers = getAllBrokersInCluster(zkClient)
val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, brokers, config.clientId).topicsMetadata
val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]]
- val leaderIdForPartitionsMap = new mutable.HashMap[(String, Int), Int]
topicsMetadata.foreach(m =>{
val topic = m.topic
val partitions = m.partitionsMetadata.map(m1 => m1.partitionId)
partitionsPerTopicMap.put(topic, partitions)
- m.partitionsMetadata.foreach(pmd =>{
- val partitionId = pmd.partitionId
- val leaderOpt = pmd.leader
- if(leaderOpt.isDefined)
- leaderIdForPartitionsMap.put((topic, partitionId), leaderOpt.get.id)
- })
})
+
/**
* fetchers must be stopped to avoid data duplication, since if the current
* rebalancing attempt fails, the partitions that are released could be owned by another consumer.
@@ -456,7 +450,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
for (i <- startPart until startPart + nParts) {
val partition = curPartitions(i)
info(consumerThreadId + " attempting to claim partition " + partition)
- addPartitionTopicInfo(currentTopicRegistry, leaderIdForPartitionsMap, topicDirs, partition, topic, consumerThreadId)
+ addPartitionTopicInfo(currentTopicRegistry, topicDirs, partition, topic, consumerThreadId)
// record the partition ownership decision
partitionOwnershipDecision += ((topic, partition) -> consumerThreadId)
}
@@ -573,39 +567,22 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
}
private def addPartitionTopicInfo(currentTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]],
- leaderIdForPartitionsMap: Map[(String, Int), Int],
topicDirs: ZKGroupTopicDirs, partition: Int,
topic: String, consumerThreadId: String) {
val partTopicInfoMap = currentTopicRegistry.get(topic)
- // find the leader for this partition
- val leaderOpt = leaderIdForPartitionsMap.get((topic, partition))
- leaderOpt match {
- case None => throw new NoBrokersForPartitionException("No leader available for partition %d on topic %s".
- format(partition, topic))
- case Some(l) => debug("Leader for partition %d for topic %s is %d".format(partition, topic, l))
- }
- val leader = leaderOpt.get
-
val znode = topicDirs.consumerOffsetDir + "/" + partition
val offsetString = readDataMaybeNull(zkClient, znode)._1
- // If first time starting a consumer, set the initial offset based on the config
+ // If first time starting a consumer, set the initial offset to -1
val offset =
offsetString match {
case Some(offsetStr) => offsetStr.toLong
- case None =>
- config.autoOffsetReset match {
- case OffsetRequest.SmallestTimeString =>
- SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.EarliestTime, config.clientId)
- case OffsetRequest.LargestTimeString =>
- SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.LatestTime, config.clientId)
- }
+ case None => PartitionTopicInfo.InvalidOffset
}
val queue = topicThreadIdAndQueues.get((topic, consumerThreadId))
val consumedOffset = new AtomicLong(offset)
val fetchedOffset = new AtomicLong(offset)
val partTopicInfo = new PartitionTopicInfo(topic,
- leader,
partition,
queue,
consumedOffset,
http://git-wip-us.apache.org/repos/asf/kafka/blob/214a0af4/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index bdb1d03..3cba743 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -18,7 +18,6 @@
package kafka.server
import kafka.cluster.Broker
-import kafka.consumer.SimpleConsumer
import kafka.common.{ClientIdAndBroker, TopicAndPartition, ErrorMapping}
import collection.mutable
import kafka.message.ByteBufferMessageSet
@@ -30,6 +29,7 @@ import java.util.concurrent.atomic.AtomicLong
import kafka.utils.{Pool, ShutdownableThread}
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
+import kafka.consumer.{PartitionTopicInfo, SimpleConsumer}
/**
@@ -125,10 +125,16 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
// Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
processPartitionData(topicAndPartition, currentOffset.get, partitionData)
case ErrorMapping.OffsetOutOfRangeCode =>
- val newOffset = handleOffsetOutOfRange(topicAndPartition)
- partitionMap.put(topicAndPartition, newOffset)
- warn("current offset %d for topic %s partition %d out of range; reset offset to %d"
- .format(currentOffset.get, topic, partitionId, newOffset))
+ try {
+ val newOffset = handleOffsetOutOfRange(topicAndPartition)
+ partitionMap.put(topicAndPartition, newOffset)
+ warn("current offset %d for topic %s partition %d out of range; reset offset to %d"
+ .format(currentOffset.get, topic, partitionId, newOffset))
+ } catch {
+ case e =>
+ warn("error getting offset for %s %d to broker %d".format(topic, partitionId, sourceBroker.id), e)
+ partitionsWithError += topicAndPartition
+ }
case _ =>
warn("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.id),
ErrorMapping.exceptionFor(partitionData.error))
@@ -150,7 +156,10 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
def addPartition(topic: String, partitionId: Int, initialOffset: Long) {
partitionMapLock.lock()
try {
- partitionMap.put(TopicAndPartition(topic, partitionId), initialOffset)
+ val topicPartition = TopicAndPartition(topic, partitionId)
+ partitionMap.put(
+ topicPartition,
+ if (PartitionTopicInfo.isOffsetInvalid(initialOffset)) handleOffsetOutOfRange(topicPartition) else initialOffset)
partitionMapCond.signalAll()
} finally {
partitionMapLock.unlock()
http://git-wip-us.apache.org/repos/asf/kafka/blob/214a0af4/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 6ae601e..79b3fa3 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -19,10 +19,9 @@ package kafka.server
import kafka.cluster.Broker
import kafka.message.ByteBufferMessageSet
-import kafka.common.TopicAndPartition
+import kafka.common.{TopicAndPartition, ErrorMapping}
import kafka.api.{FetchRequest, PartitionOffsetRequestInfo, OffsetRequest, FetchResponsePartitionData}
-
class ReplicaFetcherThread(name:String,
sourceBroker: Broker,
brokerConfig: KafkaConfig,
@@ -64,7 +63,11 @@ class ReplicaFetcherThread(name:String,
replicaId = brokerConfig.brokerId,
requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1))
)
- val offset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
+ val partitionErrorAndOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition)
+ val offset = partitionErrorAndOffset.error match {
+ case ErrorMapping.NoError => partitionErrorAndOffset.offsets.head
+ case _ => throw ErrorMapping.exceptionFor(partitionErrorAndOffset.error)
+ }
val replica = replicaMgr.getReplica(topicAndPartition.topic, topicAndPartition.partition).get
replica.log.get.truncateAndStartWithNewOffset(offset)
offset
http://git-wip-us.apache.org/repos/asf/kafka/blob/214a0af4/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
index 0b5363f..8ae30ea 100644
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -50,7 +50,6 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port)))
val queue = new LinkedBlockingQueue[FetchedDataChunk]
val topicInfos = configs.map(c => new PartitionTopicInfo(topic,
- c.brokerId,
0,
queue,
new AtomicLong(consumedOffset),
http://git-wip-us.apache.org/repos/asf/kafka/blob/214a0af4/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index e12f5a7..f7ee914 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -312,7 +312,6 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
assertEquals(topic, topicRegistry.map(r => r._1).head)
val topicsAndPartitionsInRegistry = topicRegistry.map(r => (r._1, r._2.map(p => p._2)))
val brokerPartition = topicsAndPartitionsInRegistry.head._2.head
- assertEquals(0, brokerPartition.brokerId)
assertEquals(0, brokerPartition.partitionId)
// also check partition ownership
http://git-wip-us.apache.org/repos/asf/kafka/blob/214a0af4/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 61d9fc9..5a57bd1 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -44,7 +44,6 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
val shutdown = ZookeeperConsumerConnector.shutdownCommand
val queue = new LinkedBlockingQueue[FetchedDataChunk]
val topicInfos = configs.map(c => new PartitionTopicInfo(topic,
- c.brokerId,
0,
queue,
new AtomicLong(0),