You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/01/17 18:36:32 UTC

git commit: Consumer rebalance fails if no leader available for a partition and stops all fetchers; patched by Maxime Brugidou; reviewed by Jun Rao; kafka-693

Updated Branches:
  refs/heads/0.8 da7f14676 -> 214a0af46


Consumer rebalance fails if no leader available for a partition and stops all fetchers; patched by Maxime Brugidou; reviewed by Jun Rao; kafka-693


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/214a0af4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/214a0af4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/214a0af4

Branch: refs/heads/0.8
Commit: 214a0af46b98aa9eaf54d0fbe982bc8ba2ae0a74
Parents: da7f146
Author: Jun Rao <ju...@gmail.com>
Authored: Thu Jan 17 09:36:01 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Jan 17 09:36:01 2013 -0800

----------------------------------------------------------------------
 .../kafka/consumer/ConsumerFetcherManager.scala    |    8 +++-
 .../kafka/consumer/ConsumerFetcherThread.scala     |    7 +++-
 .../scala/kafka/consumer/PartitionTopicInfo.scala  |    9 +++-
 .../consumer/ZookeeperConsumerConnector.scala      |   31 ++-------------
 .../scala/kafka/server/AbstractFetcherThread.scala |   21 +++++++---
 .../scala/kafka/server/ReplicaFetcherThread.scala  |    9 +++-
 .../unit/kafka/consumer/ConsumerIteratorTest.scala |    1 -
 .../consumer/ZookeeperConsumerConnectorTest.scala  |    1 -
 .../scala/unit/kafka/integration/FetcherTest.scala |    1 -
 9 files changed, 44 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/214a0af4/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index a6cbfb6..69c6b3e 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -72,9 +72,13 @@ class ConsumerFetcherManager(private val consumerIdString: String,
           leaderForPartitionsMap.foreach{
             case(topicAndPartition, leaderBroker) =>
               val pti = partitionMap(topicAndPartition)
-              addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(), leaderBroker)
+              try {
+                  addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(), leaderBroker)
+                  noLeaderPartitionSet -= topicAndPartition
+              } catch {
+                case t => warn("Failed to add fetcher for %s to broker %s".format(topicAndPartition, leaderBroker), t)
+              }
           }
-          noLeaderPartitionSet --= leaderForPartitionsMap.keySet
 
           shutdownIdleFetcherThreads()
         } catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/214a0af4/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index 713c7c9..1135f5d 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -22,6 +22,7 @@ import kafka.server.AbstractFetcherThread
 import kafka.message.ByteBufferMessageSet
 import kafka.api.{PartitionOffsetRequestInfo, Request, OffsetRequest, FetchResponsePartitionData}
 import kafka.common.TopicAndPartition
+import kafka.common.ErrorMapping
 
 
 class ConsumerFetcherThread(name: String,
@@ -57,7 +58,11 @@ class ConsumerFetcherThread(name: String,
       case _ => startTimestamp = OffsetRequest.LatestTime
     }
     val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(startTimestamp, 1)))
-    val newOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
+    val partitionErrorAndOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition)
+    val newOffset = partitionErrorAndOffset.error match {
+      case ErrorMapping.NoError => partitionErrorAndOffset.offsets.head
+      case _ => throw ErrorMapping.exceptionFor(partitionErrorAndOffset.error)
+    }
     val pti = partitionMap(topicAndPartition)
     pti.resetFetchOffset(newOffset)
     pti.resetConsumeOffset(newOffset)

http://git-wip-us.apache.org/repos/asf/kafka/blob/214a0af4/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
index 6003cab..9792244 100644
--- a/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
+++ b/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
@@ -23,7 +23,6 @@ import kafka.message._
 import kafka.utils.Logging
 
 class PartitionTopicInfo(val topic: String,
-                         val brokerId: Int,
                          val partitionId: Int,
                          private val chunkQueue: BlockingQueue[FetchedDataChunk],
                          private val consumedOffset: AtomicLong,
@@ -70,7 +69,7 @@ class PartitionTopicInfo(val topic: String,
    * Get the next fetch offset after this message set
    */
   private def nextOffset(messages: ByteBufferMessageSet): Long = {
-    var nextOffset = -1L
+    var nextOffset = PartitionTopicInfo.InvalidOffset
     val iter = messages.shallowIterator
     while(iter.hasNext)
       nextOffset = iter.next.nextOffset
@@ -80,3 +79,9 @@ class PartitionTopicInfo(val topic: String,
   override def toString(): String = topic + ":" + partitionId.toString + ": fetched offset = " + fetchedOffset.get +
     ": consumed offset = " + consumedOffset.get
 }
+
+object PartitionTopicInfo {
+  val InvalidOffset = -1L
+
+  def isOffsetInvalid(offset: Long) = offset < 0L
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/214a0af4/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 42a9628..c1f8513 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -402,18 +402,12 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       val brokers = getAllBrokersInCluster(zkClient)
       val topicsMetadata = ClientUtils.fetchTopicMetadata(myTopicThreadIdsMap.keySet, brokers, config.clientId).topicsMetadata
       val partitionsPerTopicMap = new mutable.HashMap[String, Seq[Int]]
-      val leaderIdForPartitionsMap = new mutable.HashMap[(String, Int), Int]
       topicsMetadata.foreach(m =>{
         val topic = m.topic
         val partitions = m.partitionsMetadata.map(m1 => m1.partitionId)
         partitionsPerTopicMap.put(topic, partitions)
-        m.partitionsMetadata.foreach(pmd =>{
-          val partitionId = pmd.partitionId
-          val leaderOpt = pmd.leader
-          if(leaderOpt.isDefined)
-            leaderIdForPartitionsMap.put((topic, partitionId), leaderOpt.get.id)
-        })
       })
+
       /**
        * fetchers must be stopped to avoid data duplication, since if the current
        * rebalancing attempt fails, the partitions that are released could be owned by another consumer.
@@ -456,7 +450,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
             for (i <- startPart until startPart + nParts) {
               val partition = curPartitions(i)
               info(consumerThreadId + " attempting to claim partition " + partition)
-              addPartitionTopicInfo(currentTopicRegistry, leaderIdForPartitionsMap, topicDirs, partition, topic, consumerThreadId)
+              addPartitionTopicInfo(currentTopicRegistry, topicDirs, partition, topic, consumerThreadId)
               // record the partition ownership decision
               partitionOwnershipDecision += ((topic, partition) -> consumerThreadId)
             }
@@ -573,39 +567,22 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     }
 
     private def addPartitionTopicInfo(currentTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]],
-                                      leaderIdForPartitionsMap: Map[(String, Int), Int],
                                       topicDirs: ZKGroupTopicDirs, partition: Int,
                                       topic: String, consumerThreadId: String) {
       val partTopicInfoMap = currentTopicRegistry.get(topic)
 
-      // find the leader for this partition
-      val leaderOpt = leaderIdForPartitionsMap.get((topic, partition))
-      leaderOpt match {
-        case None => throw new NoBrokersForPartitionException("No leader available for partition %d on topic %s".
-          format(partition, topic))
-        case Some(l) => debug("Leader for partition %d for topic %s is %d".format(partition, topic, l))
-      }
-      val leader = leaderOpt.get
-
       val znode = topicDirs.consumerOffsetDir + "/" + partition
       val offsetString = readDataMaybeNull(zkClient, znode)._1
-      // If first time starting a consumer, set the initial offset based on the config
+      // If first time starting a consumer, set the initial offset to -1
       val offset =
         offsetString match {
           case Some(offsetStr) => offsetStr.toLong
-          case None =>
-            config.autoOffsetReset match {
-              case OffsetRequest.SmallestTimeString =>
-                SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.EarliestTime, config.clientId)
-              case OffsetRequest.LargestTimeString =>
-                SimpleConsumer.earliestOrLatestOffset(zkClient, topic, leader, partition, OffsetRequest.LatestTime, config.clientId)
-            }
+          case None => PartitionTopicInfo.InvalidOffset
         }
       val queue = topicThreadIdAndQueues.get((topic, consumerThreadId))
       val consumedOffset = new AtomicLong(offset)
       val fetchedOffset = new AtomicLong(offset)
       val partTopicInfo = new PartitionTopicInfo(topic,
-                                                 leader,
                                                  partition,
                                                  queue,
                                                  consumedOffset,

http://git-wip-us.apache.org/repos/asf/kafka/blob/214a0af4/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index bdb1d03..3cba743 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -18,7 +18,6 @@
 package kafka.server
 
 import kafka.cluster.Broker
-import kafka.consumer.SimpleConsumer
 import kafka.common.{ClientIdAndBroker, TopicAndPartition, ErrorMapping}
 import collection.mutable
 import kafka.message.ByteBufferMessageSet
@@ -30,6 +29,7 @@ import java.util.concurrent.atomic.AtomicLong
 import kafka.utils.{Pool, ShutdownableThread}
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.locks.ReentrantLock
+import kafka.consumer.{PartitionTopicInfo, SimpleConsumer}
 
 
 /**
@@ -125,10 +125,16 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
                   // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
                   processPartitionData(topicAndPartition, currentOffset.get, partitionData)
                 case ErrorMapping.OffsetOutOfRangeCode =>
-                  val newOffset = handleOffsetOutOfRange(topicAndPartition)
-                  partitionMap.put(topicAndPartition, newOffset)
-                  warn("current offset %d for topic %s partition %d out of range; reset offset to %d"
-                    .format(currentOffset.get, topic, partitionId, newOffset))
+                  try {
+                    val newOffset = handleOffsetOutOfRange(topicAndPartition)
+                    partitionMap.put(topicAndPartition, newOffset)
+                    warn("current offset %d for topic %s partition %d out of range; reset offset to %d"
+                      .format(currentOffset.get, topic, partitionId, newOffset))
+                  } catch {
+                    case e =>
+                      warn("error getting offset for %s %d to broker %d".format(topic, partitionId, sourceBroker.id), e)
+                      partitionsWithError += topicAndPartition
+                  }
                 case _ =>
                   warn("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.id),
                     ErrorMapping.exceptionFor(partitionData.error))
@@ -150,7 +156,10 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
   def addPartition(topic: String, partitionId: Int, initialOffset: Long) {
     partitionMapLock.lock()
     try {
-      partitionMap.put(TopicAndPartition(topic, partitionId), initialOffset)
+      val topicPartition = TopicAndPartition(topic, partitionId)
+      partitionMap.put(
+          topicPartition,
+          if (PartitionTopicInfo.isOffsetInvalid(initialOffset)) handleOffsetOutOfRange(topicPartition) else initialOffset)
       partitionMapCond.signalAll()
     } finally {
       partitionMapLock.unlock()

http://git-wip-us.apache.org/repos/asf/kafka/blob/214a0af4/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 6ae601e..79b3fa3 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -19,10 +19,9 @@ package kafka.server
 
 import kafka.cluster.Broker
 import kafka.message.ByteBufferMessageSet
-import kafka.common.TopicAndPartition
+import kafka.common.{TopicAndPartition, ErrorMapping}
 import kafka.api.{FetchRequest, PartitionOffsetRequestInfo, OffsetRequest, FetchResponsePartitionData}
 
-
 class ReplicaFetcherThread(name:String,
                            sourceBroker: Broker,
                            brokerConfig: KafkaConfig,
@@ -64,7 +63,11 @@ class ReplicaFetcherThread(name:String,
       replicaId = brokerConfig.brokerId,
       requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1))
     )
-    val offset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
+    val partitionErrorAndOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition)
+    val offset = partitionErrorAndOffset.error match {
+      case ErrorMapping.NoError => partitionErrorAndOffset.offsets.head
+      case _ => throw ErrorMapping.exceptionFor(partitionErrorAndOffset.error)
+    }
     val replica = replicaMgr.getReplica(topicAndPartition.topic, topicAndPartition.partition).get
     replica.log.get.truncateAndStartWithNewOffset(offset)
     offset

http://git-wip-us.apache.org/repos/asf/kafka/blob/214a0af4/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
index 0b5363f..8ae30ea 100644
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -50,7 +50,6 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness {
   val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, "localhost", c.port)))
   val queue = new LinkedBlockingQueue[FetchedDataChunk]
   val topicInfos = configs.map(c => new PartitionTopicInfo(topic,
-                                                           c.brokerId,
                                                            0,
                                                            queue,
                                                            new AtomicLong(consumedOffset),

http://git-wip-us.apache.org/repos/asf/kafka/blob/214a0af4/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index e12f5a7..f7ee914 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -312,7 +312,6 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     assertEquals(topic, topicRegistry.map(r => r._1).head)
     val topicsAndPartitionsInRegistry = topicRegistry.map(r => (r._1, r._2.map(p => p._2)))
     val brokerPartition = topicsAndPartitionsInRegistry.head._2.head
-    assertEquals(0, brokerPartition.brokerId)
     assertEquals(0, brokerPartition.partitionId)
 
     // also check partition ownership

http://git-wip-us.apache.org/repos/asf/kafka/blob/214a0af4/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 61d9fc9..5a57bd1 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -44,7 +44,6 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
   val shutdown = ZookeeperConsumerConnector.shutdownCommand
   val queue = new LinkedBlockingQueue[FetchedDataChunk]
   val topicInfos = configs.map(c => new PartitionTopicInfo(topic,
-                                                           c.brokerId,
                                                            0,
                                                            queue,
                                                            new AtomicLong(0),