You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2012/11/17 02:38:15 UTC

svn commit: r1410665 - in /incubator/kafka/branches/0.8/core/src/main/scala/kafka: consumer/ConsumerFetcherManager.scala consumer/ConsumerFetcherThread.scala server/AbstractFetcherThread.scala

Author: jjkoshy
Date: Sat Nov 17 01:38:14 2012
New Revision: 1410665

URL: http://svn.apache.org/viewvc?rev=1410665&view=rev
Log:
Fix deadlock between leader-finder-thread and consumer-fetcher-thread during broker failure; patched by Joel Koshy; reviewed by Jun Rao; KAFKA-618
pre-commit-status-crumb=5e65bf7a-f347-4600-b3ae-99eed1cd2a78

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala?rev=1410665&r1=1410664&r2=1410665&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala Sat Nov 17 01:38:14 2012
@@ -88,7 +88,9 @@ class ConsumerFetcherManager(private val
 
 
   override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
-    new ConsumerFetcherThread("ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id), config, sourceBroker, this)
+    new ConsumerFetcherThread(
+      "ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id),
+      config, sourceBroker, partitionMap, this)
   }
 
   def startConnections(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster) {
@@ -106,29 +108,15 @@ class ConsumerFetcherManager(private val
   }
 
   def stopAllConnections() {
-    // first, clear noLeaderPartitionSet so that no more fetchers can be added to leader_finder_thread
     lock.lock()
+    // first, clear noLeaderPartitionSet so that no more fetchers can be added to leader_finder_thread
     noLeaderPartitionSet.clear()
-    lock.unlock()
-
-    // second, stop all existing fetchers
-    closeAllFetchers()
-
-    // finally clear partitionMap
-    lock.lock()
+    // second, clear partitionMap
     partitionMap = null
     lock.unlock()
-  }
 
-  def getPartitionTopicInfo(topicAndPartition: TopicAndPartition) : PartitionTopicInfo = {
-    var pti :PartitionTopicInfo =null
-    lock.lock()
-    try {
-      pti = partitionMap(topicAndPartition)
-    } finally {
-      lock.unlock()
-    }
-    pti      
+    // third, stop all existing fetchers
+    closeAllFetchers()
   }
 
   def addPartitionsWithError(partitionList: Iterable[TopicAndPartition]) {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala?rev=1410665&r1=1410664&r2=1410665&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala Sat Nov 17 01:38:14 2012
@@ -27,6 +27,7 @@ import kafka.common.TopicAndPartition
 class ConsumerFetcherThread(name: String,
                             val config: ConsumerConfig,
                             sourceBroker: Broker,
+                            partitionMap: Map[TopicAndPartition, PartitionTopicInfo],
                             val consumerFetcherManager: ConsumerFetcherManager)
         extends AbstractFetcherThread(name = name, 
                                       clientId = config.clientId,
@@ -40,7 +41,7 @@ class ConsumerFetcherThread(name: String
 
   // process fetched data
   def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) {
-    val pti = consumerFetcherManager.getPartitionTopicInfo(topicAndPartition)
+    val pti = partitionMap(topicAndPartition)
     if (pti.getFetchOffset != fetchOffset)
       throw new RuntimeException("Offset doesn't match for topic %s partition: %d pti offset: %d fetch offset: %d"
                                 .format(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset, fetchOffset))
@@ -57,7 +58,7 @@ class ConsumerFetcherThread(name: String
     }
     val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(startTimestamp, 1)))
     val newOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
-    val pti = consumerFetcherManager.getPartitionTopicInfo(topicAndPartition)
+    val pti = partitionMap(topicAndPartition)
     pti.resetFetchOffset(newOffset)
     pti.resetConsumeOffset(newOffset)
     newOffset

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala?rev=1410665&r1=1410664&r2=1410665&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala Sat Nov 17 01:38:14 2012
@@ -34,7 +34,7 @@ import java.util.concurrent.TimeUnit
 /**
  *  Abstract class for fetching data from multiple partitions from the same broker.
  */
-abstract class  AbstractFetcherThread(name: String, clientId: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int,
+abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int,
                                      fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1)
   extends ShutdownableThread(name) {