You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2012/11/17 02:38:15 UTC
svn commit: r1410665 - in
/incubator/kafka/branches/0.8/core/src/main/scala/kafka:
consumer/ConsumerFetcherManager.scala consumer/ConsumerFetcherThread.scala
server/AbstractFetcherThread.scala
Author: jjkoshy
Date: Sat Nov 17 01:38:14 2012
New Revision: 1410665
URL: http://svn.apache.org/viewvc?rev=1410665&view=rev
Log:
Fix deadlock between leader-finder-thread and consumer-fetcher-thread during broker failure; patched by Joel Koshy; reviewed by Jun Rao; KAFKA-618
pre-commit-status-crumb=5e65bf7a-f347-4600-b3ae-99eed1cd2a78
Modified:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala?rev=1410665&r1=1410664&r2=1410665&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala Sat Nov 17 01:38:14 2012
@@ -88,7 +88,9 @@ class ConsumerFetcherManager(private val
override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
- new ConsumerFetcherThread("ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id), config, sourceBroker, this)
+ new ConsumerFetcherThread(
+ "ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id),
+ config, sourceBroker, partitionMap, this)
}
def startConnections(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster) {
@@ -106,29 +108,15 @@ class ConsumerFetcherManager(private val
}
def stopAllConnections() {
- // first, clear noLeaderPartitionSet so that no more fetchers can be added to leader_finder_thread
lock.lock()
+ // first, clear noLeaderPartitionSet so that no more fetchers can be added to leader_finder_thread
noLeaderPartitionSet.clear()
- lock.unlock()
-
- // second, stop all existing fetchers
- closeAllFetchers()
-
- // finally clear partitionMap
- lock.lock()
+ // second, clear partitionMap
partitionMap = null
lock.unlock()
- }
- def getPartitionTopicInfo(topicAndPartition: TopicAndPartition) : PartitionTopicInfo = {
- var pti :PartitionTopicInfo =null
- lock.lock()
- try {
- pti = partitionMap(topicAndPartition)
- } finally {
- lock.unlock()
- }
- pti
+ // third, stop all existing fetchers
+ closeAllFetchers()
}
def addPartitionsWithError(partitionList: Iterable[TopicAndPartition]) {
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala?rev=1410665&r1=1410664&r2=1410665&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala Sat Nov 17 01:38:14 2012
@@ -27,6 +27,7 @@ import kafka.common.TopicAndPartition
class ConsumerFetcherThread(name: String,
val config: ConsumerConfig,
sourceBroker: Broker,
+ partitionMap: Map[TopicAndPartition, PartitionTopicInfo],
val consumerFetcherManager: ConsumerFetcherManager)
extends AbstractFetcherThread(name = name,
clientId = config.clientId,
@@ -40,7 +41,7 @@ class ConsumerFetcherThread(name: String
// process fetched data
def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) {
- val pti = consumerFetcherManager.getPartitionTopicInfo(topicAndPartition)
+ val pti = partitionMap(topicAndPartition)
if (pti.getFetchOffset != fetchOffset)
throw new RuntimeException("Offset doesn't match for topic %s partition: %d pti offset: %d fetch offset: %d"
.format(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset, fetchOffset))
@@ -57,7 +58,7 @@ class ConsumerFetcherThread(name: String
}
val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(startTimestamp, 1)))
val newOffset = simpleConsumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head
- val pti = consumerFetcherManager.getPartitionTopicInfo(topicAndPartition)
+ val pti = partitionMap(topicAndPartition)
pti.resetFetchOffset(newOffset)
pti.resetConsumeOffset(newOffset)
newOffset
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala?rev=1410665&r1=1410664&r2=1410665&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala Sat Nov 17 01:38:14 2012
@@ -34,7 +34,7 @@ import java.util.concurrent.TimeUnit
/**
* Abstract class for fetching data from multiple partitions from the same broker.
*/
-abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int,
+abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int,
fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1)
extends ShutdownableThread(name) {