You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/10/22 03:18:03 UTC

kafka git commit: KAFKA-6042: Avoid deadlock between two groups with delayed operations

Repository: kafka
Updated Branches:
  refs/heads/1.0 0fb01be23 -> 5ee157126


KAFKA-6042: Avoid deadlock between two groups with delayed operations

Author: Rajini Sivaram <ra...@googlemail.com>

Reviewers: Jason Gustafson <ja...@confluent.io>, Guozhang Wang <wa...@gmail.com>

Closes #4103 from rajinisivaram/KAFKA-6042-group-deadlock


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5ee15712
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5ee15712
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5ee15712

Branch: refs/heads/1.0
Commit: 5ee157126d595b913761cf1887963460bbe12855
Parents: 0fb01be
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Sat Oct 21 20:17:58 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Sat Oct 21 20:17:58 2017 -0700

----------------------------------------------------------------------
 .../coordinator/group/DelayedHeartbeat.scala    |  2 +-
 .../kafka/coordinator/group/DelayedJoin.scala   |  4 +-
 .../coordinator/group/GroupCoordinator.scala    | 34 +++++++------
 .../kafka/coordinator/group/GroupMetadata.scala |  7 ++-
 .../group/GroupMetadataManager.scala            | 17 ++++---
 .../transaction/DelayedTxnMarker.scala          |  8 +--
 .../transaction/TransactionCoordinator.scala    |  8 +--
 .../TransactionMarkerChannelManager.scala       |  4 +-
 ...nsactionMarkerRequestCompletionHandler.scala |  2 +-
 .../transaction/TransactionMetadata.scala       |  8 ++-
 .../transaction/TransactionStateManager.scala   | 38 ++++++++++-----
 .../scala/kafka/server/DelayedOperation.scala   |  7 +--
 .../scala/kafka/server/DelayedProduce.scala     |  6 ++-
 .../scala/kafka/server/ReplicaManager.scala     |  4 +-
 .../group/GroupCoordinatorTest.scala            |  4 ++
 .../group/GroupMetadataManagerTest.scala        |  3 ++
 .../TransactionMarkerChannelManagerTest.scala   |  7 ++-
 .../TransactionStateManagerTest.scala           |  3 ++
 .../kafka/server/DelayedOperationTest.scala     | 51 ++++++++++++++++++--
 .../scala/unit/kafka/server/KafkaApisTest.scala |  3 ++
 20 files changed, 161 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
index 73d5d0f..5f16acb 100644
--- a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
+++ b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
@@ -28,7 +28,7 @@ private[group] class DelayedHeartbeat(coordinator: GroupCoordinator,
                                       member: MemberMetadata,
                                       heartbeatDeadline: Long,
                                       sessionTimeout: Long)
-  extends DelayedOperation(sessionTimeout) {
+  extends DelayedOperation(sessionTimeout, Some(group.lock)) {
 
   override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, member, heartbeatDeadline, forceComplete _)
   override def onExpiration() = coordinator.onExpireHeartbeat(group, member, heartbeatDeadline)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala b/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
index 5232287..c75c0d4 100644
--- a/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
+++ b/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
@@ -33,7 +33,7 @@ import scala.math.{max, min}
  */
 private[group] class DelayedJoin(coordinator: GroupCoordinator,
                                  group: GroupMetadata,
-                                 rebalanceTimeout: Long) extends DelayedOperation(rebalanceTimeout) {
+                                 rebalanceTimeout: Long) extends DelayedOperation(rebalanceTimeout, Some(group.lock)) {
 
   override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete _)
   override def onExpiration() = coordinator.onExpireJoin()
@@ -58,7 +58,7 @@ private[group] class InitialDelayedJoin(coordinator: GroupCoordinator,
   override def tryComplete(): Boolean = false
 
   override def onComplete(): Unit = {
-    group synchronized  {
+    group.inLock  {
       if (group.newMemberAdded && remainingMs != 0) {
         group.newMemberAdded = false
         val delay = min(configuredRebalanceDelay, remainingMs)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index bb59bcd..94622f6 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -40,6 +40,12 @@ import scala.math.max
  *
  * Each Kafka server instantiates a coordinator which is responsible for a set of
  * groups. Groups are assigned to coordinators based on their group names.
+ * <p>
+ * <b>Delayed operation locking notes:</b>
+ * Delayed operations in GroupCoordinator use `group` as the delayed operation
+ * lock. ReplicaManager.appendRecords may be invoked while holding the group lock
+ * used by its callback.  The delayed callback may acquire the group lock
+ * since the delayed operation is completed only if the group lock can be acquired.
  */
 class GroupCoordinator(val brokerId: Int,
                        val groupConfig: GroupConfig,
@@ -142,7 +148,7 @@ class GroupCoordinator(val brokerId: Int,
                           protocolType: String,
                           protocols: List[(String, Array[Byte])],
                           responseCallback: JoinCallback) {
-    group synchronized {
+    group.inLock {
       if (!group.is(Empty) && (!group.protocolType.contains(protocolType) || !group.supportsProtocols(protocols.map(_._1).toSet))) {
         // if the new member does not support the group protocol, reject it
         responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
@@ -248,7 +254,7 @@ class GroupCoordinator(val brokerId: Int,
                           memberId: String,
                           groupAssignment: Map[String, Array[Byte]],
                           responseCallback: SyncCallback) {
-    group synchronized {
+    group.inLock {
       if (!group.has(memberId)) {
         responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)
       } else if (generationId != group.generationId) {
@@ -273,7 +279,7 @@ class GroupCoordinator(val brokerId: Int,
               val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap
 
               groupManager.storeGroup(group, assignment, (error: Errors) => {
-                group synchronized {
+                group.inLock {
                   // another member may have joined the group while we were awaiting this callback,
                   // so we must ensure we are still in the AwaitingSync state and the same generation
                   // when it gets invoked. if we have transitioned to another state, then do nothing
@@ -317,7 +323,7 @@ class GroupCoordinator(val brokerId: Int,
           responseCallback(Errors.UNKNOWN_MEMBER_ID)
 
         case Some(group) =>
-          group synchronized {
+          group.inLock {
             if (group.is(Dead) || !group.has(memberId)) {
               responseCallback(Errors.UNKNOWN_MEMBER_ID)
             } else {
@@ -349,7 +355,7 @@ class GroupCoordinator(val brokerId: Int,
           responseCallback(Errors.UNKNOWN_MEMBER_ID)
 
         case Some(group) =>
-          group synchronized {
+          group.inLock {
             group.currentState match {
               case Dead =>
                 // if the group is marked as dead, it means some other thread has just removed the group
@@ -449,7 +455,7 @@ class GroupCoordinator(val brokerId: Int,
                               producerEpoch: Short,
                               offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
                               responseCallback: immutable.Map[TopicPartition, Errors] => Unit) {
-    group synchronized {
+    group.inLock {
       if (group.is(Dead)) {
         responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID))
       } else if ((generationId < 0 && group.is(Empty)) || (producerId != NO_PRODUCER_ID)) {
@@ -506,7 +512,7 @@ class GroupCoordinator(val brokerId: Int,
       groupManager.getGroup(groupId) match {
         case None => (Errors.NONE, GroupCoordinator.DeadGroup)
         case Some(group) =>
-          group synchronized {
+          group.inLock {
             (Errors.NONE, group.summary)
           }
       }
@@ -529,7 +535,7 @@ class GroupCoordinator(val brokerId: Int,
   }
 
   private def onGroupUnloaded(group: GroupMetadata) {
-    group synchronized {
+    group.inLock {
       info(s"Unloading group metadata for ${group.groupId} with generation ${group.generationId}")
       val previousState = group.currentState
       group.transitionTo(Dead)
@@ -558,7 +564,7 @@ class GroupCoordinator(val brokerId: Int,
   }
 
   private def onGroupLoaded(group: GroupMetadata) {
-    group synchronized {
+    group.inLock {
       info(s"Loading group metadata for ${group.groupId} with generation ${group.generationId}")
       assert(group.is(Stable) || group.is(Empty))
       group.allMemberMetadata.foreach(completeAndScheduleNextHeartbeatExpiration(group, _))
@@ -666,7 +672,7 @@ class GroupCoordinator(val brokerId: Int,
   }
 
   private def maybePrepareRebalance(group: GroupMetadata) {
-    group synchronized {
+    group.inLock {
       if (group.canRebalance)
         prepareRebalance(group)
     }
@@ -706,7 +712,7 @@ class GroupCoordinator(val brokerId: Int,
   }
 
   def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = {
-    group synchronized {
+    group.inLock {
       if (group.notYetRejoinedMembers.isEmpty)
         forceComplete()
       else false
@@ -718,7 +724,7 @@ class GroupCoordinator(val brokerId: Int,
   }
 
   def onCompleteJoin(group: GroupMetadata) {
-    group synchronized {
+    group.inLock {
       // remove any members who haven't joined the group yet
       group.notYetRejoinedMembers.foreach { failedMember =>
         group.remove(failedMember.memberId)
@@ -768,7 +774,7 @@ class GroupCoordinator(val brokerId: Int,
   }
 
   def tryCompleteHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long, forceComplete: () => Boolean) = {
-    group synchronized {
+    group.inLock {
       if (shouldKeepMemberAlive(member, heartbeatDeadline) || member.isLeaving)
         forceComplete()
       else false
@@ -776,7 +782,7 @@ class GroupCoordinator(val brokerId: Int,
   }
 
   def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) {
-    group synchronized {
+    group.inLock {
       if (!shouldKeepMemberAlive(member, heartbeatDeadline)) {
         info(s"Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group")
         removeMemberAndUpdateGroup(group, member)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index 18096bb..537d944 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -17,9 +17,10 @@
 package kafka.coordinator.group
 
 import java.util.UUID
+import java.util.concurrent.locks.ReentrantLock
 
 import kafka.common.OffsetAndMetadata
-import kafka.utils.{Logging, nonthreadsafe}
+import kafka.utils.{CoreUtils, Logging, nonthreadsafe}
 import org.apache.kafka.common.TopicPartition
 
 import scala.collection.{Seq, immutable, mutable}
@@ -154,6 +155,8 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
 
   private var state: GroupState = initialState
 
+  private[group] val lock = new ReentrantLock
+
   private val members = new mutable.HashMap[String, MemberMetadata]
 
   private val offsets = new mutable.HashMap[TopicPartition, CommitRecordMetadataAndOffset]
@@ -172,6 +175,8 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
   var protocol: String = null
   var newMemberAdded: Boolean = false
 
+  def inLock[T](fun: => T): T = CoreUtils.inLock(lock)(fun)
+
   def is(groupState: GroupState) = state == groupState
   def not(groupState: GroupState) = state != groupState
   def has(memberId: String) = members.contains(memberId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 7519dc4..0298509 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -85,7 +85,7 @@ class GroupMetadataManager(brokerId: Int,
   newGauge("NumOffsets",
     new Gauge[Int] {
       def value = groupMetadataCache.values.map(group => {
-        group synchronized { group.numOffsets }
+        group.inLock { group.numOffsets }
       }).sum
     }
   )
@@ -243,6 +243,7 @@ class GroupMetadataManager(brokerId: Int,
       internalTopicsAllowed = true,
       isFromClient = false,
       entriesPerPartition = records,
+      delayedProduceLock = Some(group.lock),
       responseCallback = callback)
   }
 
@@ -260,7 +261,7 @@ class GroupMetadataManager(brokerId: Int,
       validateOffsetMetadataLength(offsetAndMetadata.metadata)
     }
 
-    group synchronized {
+    group.inLock {
       if (!group.hasReceivedConsistentOffsetCommits)
         warn(s"group: ${group.groupId} with leader: ${group.leaderId} has received offset commits from consumers as well " +
           s"as transactional producers. Mixing both types of offset commits will generally result in surprises and " +
@@ -309,7 +310,7 @@ class GroupMetadataManager(brokerId: Int,
             // the offset and metadata to cache if the append status has no error
             val status = responseStatus(offsetTopicPartition)
 
-            val responseError = group synchronized {
+            val responseError = group.inLock {
               if (status.error == Errors.NONE) {
                 if (!group.is(Dead)) {
                   filteredOffsetMetadata.foreach { case (topicPartition, offsetAndMetadata) =>
@@ -369,12 +370,12 @@ class GroupMetadataManager(brokerId: Int,
           }
 
           if (isTxnOffsetCommit) {
-            group synchronized {
+            group.inLock {
               addProducerGroup(producerId, group.groupId)
               group.prepareTxnOffsetCommit(producerId, offsetMetadata)
             }
           } else {
-            group synchronized {
+            group.inLock {
               group.prepareOffsetCommit(offsetMetadata)
             }
           }
@@ -403,7 +404,7 @@ class GroupMetadataManager(brokerId: Int,
         (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE))
       }.toMap
     } else {
-      group synchronized {
+      group.inLock {
         if (group.is(Dead)) {
           topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
             (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE))
@@ -675,7 +676,7 @@ class GroupMetadataManager(brokerId: Int,
     var offsetsRemoved = 0
 
     groupMetadataCache.foreach { case (groupId, group) =>
-      val (removedOffsets, groupIsDead, generation) = group synchronized {
+      val (removedOffsets, groupIsDead, generation) = group.inLock {
         val removedOffsets = deletedTopicPartitions match {
           case Some(topicPartitions) => group.removeOffsets(topicPartitions)
           case None => group.removeExpiredOffsets(startMs)
@@ -747,7 +748,7 @@ class GroupMetadataManager(brokerId: Int,
     val pendingGroups = groupsBelongingToPartitions(producerId, completedPartitions)
     pendingGroups.foreach { case (groupId) =>
       getGroup(groupId) match {
-        case Some(group) => group synchronized {
+        case Some(group) => group.inLock {
           if (!group.is(Dead)) {
             group.completePendingTxnOffsetCommit(producerId, isCommit)
             removeProducerGroup(producerId, groupId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala b/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
index bc0f1b7..cf18b81 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
@@ -17,6 +17,7 @@
 package kafka.coordinator.transaction
 
 import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.Lock
 
 import kafka.server.DelayedOperation
 import org.apache.kafka.common.protocol.Errors
@@ -25,11 +26,12 @@ import org.apache.kafka.common.protocol.Errors
   * Delayed transaction state change operations that are added to the purgatory without timeout (i.e. these operations should never time out)
   */
 private[transaction] class DelayedTxnMarker(txnMetadata: TransactionMetadata,
-                                           completionCallback: Errors => Unit)
-  extends DelayedOperation(TimeUnit.DAYS.toMillis(100 * 365)) {
+                                           completionCallback: Errors => Unit,
+                                           lock: Lock)
+  extends DelayedOperation(TimeUnit.DAYS.toMillis(100 * 365), Some(lock)) {
 
   override def tryComplete(): Boolean = {
-    txnMetadata synchronized {
+    txnMetadata.inLock {
       if (txnMetadata.topicPartitions.isEmpty)
         forceComplete()
       else false

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 6b9b7ef..0b38dbc 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -137,7 +137,7 @@ class TransactionCoordinator(brokerId: Int,
           val coordinatorEpoch = existingEpochAndMetadata.coordinatorEpoch
           val txnMetadata = existingEpochAndMetadata.transactionMetadata
 
-          txnMetadata synchronized {
+          txnMetadata.inLock {
             prepareInitProduceIdTransit(transactionalId, transactionTimeoutMs, coordinatorEpoch, txnMetadata)
           }
       }
@@ -241,7 +241,7 @@ class TransactionCoordinator(brokerId: Int,
           val txnMetadata = epochAndMetadata.transactionMetadata
 
           // generate the new transaction metadata with added partitions
-          txnMetadata synchronized {
+          txnMetadata.inLock {
             if (txnMetadata.producerId != producerId) {
               Left(Errors.INVALID_PRODUCER_ID_MAPPING)
             } else if (txnMetadata.producerEpoch != producerEpoch) {
@@ -304,7 +304,7 @@ class TransactionCoordinator(brokerId: Int,
           val txnMetadata = epochAndTxnMetadata.transactionMetadata
           val coordinatorEpoch = epochAndTxnMetadata.coordinatorEpoch
 
-          txnMetadata synchronized {
+          txnMetadata.inLock {
             if (txnMetadata.producerId != producerId)
               Left(Errors.INVALID_PRODUCER_ID_MAPPING)
             else if (txnMetadata.producerEpoch != producerEpoch)
@@ -368,7 +368,7 @@ class TransactionCoordinator(brokerId: Int,
                 case Some(epochAndMetadata) =>
                   if (epochAndMetadata.coordinatorEpoch == coordinatorEpoch) {
                     val txnMetadata = epochAndMetadata.transactionMetadata
-                    txnMetadata synchronized {
+                    txnMetadata.inLock {
                       if (txnMetadata.producerId != producerId)
                         Left(Errors.INVALID_PRODUCER_ID_MAPPING)
                       else if (txnMetadata.producerEpoch != producerEpoch)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index 6c13de4..4fc9db2 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -263,7 +263,7 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
       }
     }
 
-    val delayedTxnMarker = new DelayedTxnMarker(txnMetadata, appendToLogCallback)
+    val delayedTxnMarker = new DelayedTxnMarker(txnMetadata, appendToLogCallback, txnStateManager.stateReadLock)
     txnMarkerPurgatory.tryCompleteElseWatch(delayedTxnMarker, Seq(transactionalId))
 
     addTxnMarkersToBrokerQueue(transactionalId, txnMetadata.producerId, txnMetadata.producerEpoch, txnResult, coordinatorEpoch, txnMetadata.topicPartitions.toSet)
@@ -340,7 +340,7 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
 
                 val txnMetadata = epochAndMetadata.transactionMetadata
 
-                txnMetadata synchronized {
+                txnMetadata.inLock {
                   topicPartitions.foreach(txnMetadata.removePartition)
                 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
index bfa25be..fefe767 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
@@ -129,7 +129,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
               txnMarkerChannelManager.removeMarkersForTxnId(transactionalId)
               abortSending = true
             } else {
-              txnMetadata synchronized {
+              txnMetadata.inLock {
                 for ((topicPartition, error) <- errors.asScala) {
                   error match {
                     case Errors.NONE =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
index 405e639..486a887 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
@@ -16,7 +16,9 @@
  */
 package kafka.coordinator.transaction
 
-import kafka.utils.{Logging, nonthreadsafe}
+import java.util.concurrent.locks.ReentrantLock
+
+import kafka.utils.{CoreUtils, Logging, nonthreadsafe}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record.RecordBatch
 
@@ -156,6 +158,10 @@ private[transaction] class TransactionMetadata(val transactionalId: String,
   // initialized as the same as the current state
   var pendingState: Option[TransactionState] = None
 
+  private[transaction] val lock = new ReentrantLock
+
+  def inLock[T](fun: => T): T = CoreUtils.inLock(lock)(fun)
+
   def addPartitions(partitions: collection.Set[TopicPartition]): Unit = {
     topicPartitions ++= partitions
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index ad5d33b..f2e25c4 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -56,6 +56,16 @@ object TransactionStateManager {
  * 1. the transaction log, which is a special internal topic.
  * 2. the transaction metadata including its ongoing transaction status.
  * 3. the background expiration of the transaction as well as the transactional id.
+ *
+ * <b>Delayed operation locking notes:</b>
+ * Delayed operations in TransactionStateManager use `stateLock.readLock` as the delayed operation
+ * lock. Delayed operations are completed only if `stateLock.readLock` can be acquired.
+ * Delayed callbacks may acquire `stateLock.readLock` or any of the `txnMetadata` locks.
+ * <ul>
+ * <li>`stateLock.readLock` must never be acquired while holding `txnMetadata` lock.</li>
+ * <li>`txnMetadata` lock must never be acquired while holding `stateLock.writeLock`.</li>
+ * <li>`ReplicaManager.appendRecords` should never be invoked while holding a `txnMetadata` lock.</li>
+ * </ul>
  */
 class TransactionStateManager(brokerId: Int,
                               zkUtils: ZkUtils,
@@ -95,6 +105,7 @@ class TransactionStateManager(brokerId: Int,
       loadingPartitions.add(partitionAndLeaderEpoch)
     }
   }
+  private[transaction] def stateReadLock = stateLock.readLock
 
   // this is best-effort expiration of an ongoing transaction which has been open for more than its
   // txn timeout value, we do not need to grab the lock on the metadata object upon checking its state
@@ -136,7 +147,7 @@ class TransactionStateManager(brokerId: Int,
             }.filter { case (_, txnMetadata) =>
               txnMetadata.txnLastUpdateTimestamp <= now - config.transactionalIdExpirationMs
             }.map { case (transactionalId, txnMetadata) =>
-              val txnMetadataTransition = txnMetadata synchronized {
+              val txnMetadataTransition = txnMetadata.inLock {
                 txnMetadata.prepareDead()
               }
               TransactionalIdCoordinatorEpochAndMetadata(transactionalId, entry.coordinatorEpoch, txnMetadataTransition)
@@ -166,7 +177,7 @@ class TransactionStateManager(brokerId: Int,
                     .foreach { txnMetadataCacheEntry =>
                       toRemove.foreach { idCoordinatorEpochAndMetadata =>
                         val txnMetadata = txnMetadataCacheEntry.metadataPerTransactionalId.get(idCoordinatorEpochAndMetadata.transactionalId)
-                        txnMetadata synchronized {
+                        txnMetadata.inLock {
                           if (txnMetadataCacheEntry.coordinatorEpoch == idCoordinatorEpochAndMetadata.coordinatorEpoch
                             && txnMetadata.pendingState.contains(Dead)
                             && txnMetadata.producerEpoch == idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch
@@ -196,7 +207,8 @@ class TransactionStateManager(brokerId: Int,
           internalTopicsAllowed = true,
           isFromClient = false,
           recordsPerPartition,
-          removeFromCacheCallback
+          removeFromCacheCallback,
+          Some(stateLock.readLock)
         )
       }
 
@@ -376,7 +388,7 @@ class TransactionStateManager(brokerId: Int,
           val transactionsPendingForCompletion = new mutable.ListBuffer[TransactionalIdCoordinatorEpochAndTransitMetadata]
           loadedTransactions.foreach {
             case (transactionalId, txnMetadata) =>
-              txnMetadata synchronized {
+              txnMetadata.inLock {
                 // if state is PrepareCommit or PrepareAbort we need to complete the transaction
                 txnMetadata.state match {
                   case PrepareAbort =>
@@ -509,7 +521,7 @@ class TransactionStateManager(brokerId: Int,
           case Right(Some(epochAndMetadata)) =>
             val metadata = epochAndMetadata.transactionMetadata
 
-            metadata synchronized {
+            metadata.inLock {
               if (epochAndMetadata.coordinatorEpoch != coordinatorEpoch) {
                 // the cache may have been changed due to txn topic partition emigration and immigration,
                 // in this case directly return NOT_COORDINATOR to client and let it to re-discover the transaction coordinator
@@ -536,7 +548,7 @@ class TransactionStateManager(brokerId: Int,
         getTransactionState(transactionalId) match {
           case Right(Some(epochAndTxnMetadata)) =>
             val metadata = epochAndTxnMetadata.transactionMetadata
-            metadata synchronized {
+            metadata.inLock {
               if (epochAndTxnMetadata.coordinatorEpoch == coordinatorEpoch) {
                 if (retryOnError(responseError)) {
                   info(s"TransactionalId ${metadata.transactionalId} append transaction log for $newMetadata transition failed due to $responseError, " +
@@ -586,24 +598,28 @@ class TransactionStateManager(brokerId: Int,
         case Right(Some(epochAndMetadata)) =>
           val metadata = epochAndMetadata.transactionMetadata
 
-          metadata synchronized {
+          val append: Boolean = metadata.inLock {
             if (epochAndMetadata.coordinatorEpoch != coordinatorEpoch) {
               // the coordinator epoch has changed, reply to client immediately with with NOT_COORDINATOR
               responseCallback(Errors.NOT_COORDINATOR)
+              false
             } else {
               // do not need to check the metadata object itself since no concurrent thread should be able to modify it
               // under the same coordinator epoch, so directly append to txn log now
-
-              replicaManager.appendRecords(
+              true
+            }
+          }
+          if (append) {
+            replicaManager.appendRecords(
                 newMetadata.txnTimeoutMs.toLong,
                 TransactionLog.EnforcedRequiredAcks,
                 internalTopicsAllowed = true,
                 isFromClient = false,
                 recordsPerPartition,
-                updateCacheCallback)
+                updateCacheCallback,
+                delayedProduceLock = Some(stateLock.readLock))
 
               trace(s"Appending new metadata $newMetadata for transaction id $transactionalId with coordinator epoch $coordinatorEpoch to the local transaction log")
-            }
           }
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/main/scala/kafka/server/DelayedOperation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala
index 86bf1ff..894d30e 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -19,7 +19,7 @@ package kafka.server
 
 import java.util.concurrent._
 import java.util.concurrent.atomic._
-import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock}
+import java.util.concurrent.locks.{Lock, ReentrantLock, ReentrantReadWriteLock}
 
 import com.yammer.metrics.core.Gauge
 import kafka.metrics.KafkaMetricsGroup
@@ -43,11 +43,12 @@ import scala.collection.mutable.ListBuffer
  *
  * A subclass of DelayedOperation needs to provide an implementation of both onComplete() and tryComplete().
  */
-abstract class DelayedOperation(override val delayMs: Long) extends TimerTask with Logging {
+abstract class DelayedOperation(override val delayMs: Long,
+    lockOpt: Option[Lock] = None) extends TimerTask with Logging {
 
   private val completed = new AtomicBoolean(false)
   // Visible for testing
-  private[server] val lock: ReentrantLock = new ReentrantLock
+  private[server] val lock: Lock = lockOpt.getOrElse(new ReentrantLock)
 
   /*
    * Force completing the delayed operation, if not already completed.

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/main/scala/kafka/server/DelayedProduce.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala
index ebbd9ee..718ed24 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -19,6 +19,7 @@ package kafka.server
 
 
 import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.Lock
 
 import com.yammer.metrics.core.Meter
 import kafka.metrics.KafkaMetricsGroup
@@ -54,8 +55,9 @@ case class ProduceMetadata(produceRequiredAcks: Short,
 class DelayedProduce(delayMs: Long,
                      produceMetadata: ProduceMetadata,
                      replicaManager: ReplicaManager,
-                     responseCallback: Map[TopicPartition, PartitionResponse] => Unit)
-  extends DelayedOperation(delayMs) {
+                     responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
+                     lockOpt: Option[Lock] = None)
+  extends DelayedOperation(delayMs, lockOpt) {
 
   // first update the acks pending variable according to the error code
   produceMetadata.produceStatus.foreach { case (topicPartition, status) =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index e5c9432..43ecc44 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -19,6 +19,7 @@ package kafka.server
 import java.io.File
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+import java.util.concurrent.locks.Lock
 
 import com.yammer.metrics.core.Gauge
 import kafka.api._
@@ -450,6 +451,7 @@ class ReplicaManager(val config: KafkaConfig,
                     isFromClient: Boolean,
                     entriesPerPartition: Map[TopicPartition, MemoryRecords],
                     responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
+                    delayedProduceLock: Option[Lock] = None,
                     processingStatsCallback: Map[TopicPartition, RecordsProcessingStats] => Unit = _ => ()) {
     if (isValidRequiredAcks(requiredAcks)) {
       val sTime = time.milliseconds
@@ -469,7 +471,7 @@ class ReplicaManager(val config: KafkaConfig,
       if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
         // create delayed produce operation
         val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
-        val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback)
+        val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)
 
         // create a list of (topic, partition) pairs to use as keys for this delayed produce operation
         val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 3fed45d..7075c63 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -28,6 +28,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetCommitRequest, OffsetFetchResponse, TransactionResult}
 import org.easymock.{Capture, EasyMock, IAnswer}
 import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.ReentrantLock
 
 import org.apache.kafka.common.internals.Topic
 import org.junit.Assert._
@@ -1367,6 +1368,7 @@ class GroupCoordinatorTest extends JUnitSuite {
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
+      EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
       EasyMock.anyObject())).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
         Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
@@ -1450,6 +1452,7 @@ class GroupCoordinatorTest extends JUnitSuite {
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
+      EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
       EasyMock.anyObject())
     ).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(
@@ -1479,6 +1482,7 @@ class GroupCoordinatorTest extends JUnitSuite {
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
+      EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
       EasyMock.anyObject())
     ).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 46a1878..78a5eaa 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -38,6 +38,7 @@ import org.apache.kafka.common.internals.Topic
 
 import scala.collection.JavaConverters._
 import scala.collection._
+import java.util.concurrent.locks.ReentrantLock
 
 class GroupMetadataManagerTest {
 
@@ -1306,6 +1307,7 @@ class GroupMetadataManagerTest {
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
+      EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
       EasyMock.anyObject())
     )
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
@@ -1320,6 +1322,7 @@ class GroupMetadataManagerTest {
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
+      EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
       EasyMock.anyObject())
     ).andAnswer(new IAnswer[Unit] {
       override def answer = capturedArgument.getValue.apply(

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
index 0bc1c9f..a039c53 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
@@ -16,6 +16,8 @@
  */
 package kafka.coordinator.transaction
 
+import java.util.concurrent.locks.ReentrantReadWriteLock
+
 import kafka.server.{DelayedOperationPurgatory, KafkaConfig, MetadataCache}
 import kafka.utils.timer.MockTimer
 import kafka.utils.TestUtils
@@ -86,7 +88,10 @@ class TransactionMarkerChannelManagerTest {
     EasyMock.expect(txnStateManager.getTransactionState(EasyMock.eq(transactionalId2)))
       .andReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata2))))
       .anyTimes()
-
+    val stateLock = new ReentrantReadWriteLock
+    EasyMock.expect(txnStateManager.stateReadLock)
+      .andReturn(stateLock.readLock)
+      .anyTimes()
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 0a2b641..7973b9a 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -17,6 +17,7 @@
 package kafka.coordinator.transaction
 
 import java.nio.ByteBuffer
+import java.util.concurrent.locks.ReentrantLock
 
 import kafka.log.Log
 import kafka.server.{FetchDataInfo, LogOffsetMetadata, ReplicaManager}
@@ -498,6 +499,7 @@ class TransactionStateManagerTest {
           EasyMock.eq(false),
           EasyMock.eq(recordsByPartition),
           EasyMock.capture(capturedArgument),
+          EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
           EasyMock.anyObject()
         )).andAnswer(new IAnswer[Unit] {
           override def answer(): Unit = {
@@ -598,6 +600,7 @@ class TransactionStateManagerTest {
       isFromClient = EasyMock.eq(false),
       EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
       EasyMock.capture(capturedArgument),
+      EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
       EasyMock.anyObject())
     ).andAnswer(new IAnswer[Unit] {
         override def answer(): Unit = capturedArgument.getValue.apply(

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
index 5b1daff..d4d79e5 100644
--- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
@@ -18,6 +18,7 @@
 package kafka.server
 
 import java.util.concurrent.{Executors, Future}
+import java.util.concurrent.locks.ReentrantLock
 
 import kafka.utils.CoreUtils.inLock
 
@@ -123,12 +124,28 @@ class DelayedOperationTest {
 
   @Test
   def testDelayedOperationLock() {
+    verifyDelayedOperationLock(new MockDelayedOperation(100000L), mismatchedLocks = false)
+  }
+
+  @Test
+  def testDelayedOperationLockOverride() {
+    def newMockOperation = {
+      val lock = new ReentrantLock
+      new MockDelayedOperation(100000L, Some(lock), Some(lock))
+    }
+    verifyDelayedOperationLock(newMockOperation, mismatchedLocks = false)
+
+    verifyDelayedOperationLock(new MockDelayedOperation(100000L, None, Some(new ReentrantLock)),
+        mismatchedLocks = true)
+  }
+
+  def verifyDelayedOperationLock(mockDelayedOperation: => MockDelayedOperation, mismatchedLocks: Boolean) {
     val key = "key"
     val executorService = Executors.newSingleThreadExecutor
     try {
       def createDelayedOperations(count: Int): Seq[MockDelayedOperation] = {
         (1 to count).map { _ =>
-          val op = new MockDelayedOperation(100000L)
+          val op = mockDelayedOperation
           purgatory.tryCompleteElseWatch(op, Seq(key))
           assertFalse("Not completable", op.isCompleted)
           op
@@ -137,7 +154,7 @@ class DelayedOperationTest {
 
       def createCompletableOperations(count: Int): Seq[MockDelayedOperation] = {
         (1 to count).map { _ =>
-          val op = new MockDelayedOperation(100000L)
+          val op = mockDelayedOperation
           op.completable = true
           op
         }
@@ -181,6 +198,27 @@ class DelayedOperationTest {
         checkAndComplete(ops, Seq(ops(1)))
       } finally {
         runOnAnotherThread(ops(0).lock.unlock(), true)
+        checkAndComplete(Seq(ops(0)), Seq(ops(0)))
+      }
+
+      // Lock acquired by response callback held by another thread, should not block
+      // if the response lock is used as operation lock, only operations
+      // that can be locked without blocking on the current thread should complete
+      ops = createDelayedOperations(2)
+      ops(0).responseLockOpt.foreach { lock =>
+        runOnAnotherThread(lock.lock(), true)
+        try {
+          try {
+            checkAndComplete(ops, Seq(ops(1)))
+            assertFalse("Should have failed with mismatched locks", mismatchedLocks)
+          } catch {
+            case e: IllegalStateException =>
+              assertTrue("Should not have failed with valid locks", mismatchedLocks)
+          }
+        } finally {
+          runOnAnotherThread(lock.unlock(), true)
+          checkAndComplete(Seq(ops(0)), Seq(ops(0)))
+        }
       }
 
       // Immediately completable operations should complete without locking
@@ -196,8 +234,9 @@ class DelayedOperationTest {
   }
 
 
-  class MockDelayedOperation(delayMs: Long)
-    extends DelayedOperation(delayMs) {
+  class MockDelayedOperation(delayMs: Long,
+      lockOpt: Option[ReentrantLock] = None,
+      val responseLockOpt: Option[ReentrantLock] = None) extends DelayedOperation(delayMs, lockOpt) {
     var completable = false
 
     def awaitExpiration() {
@@ -218,6 +257,10 @@ class DelayedOperationTest {
     }
 
     override def onComplete() {
+      responseLockOpt.foreach { lock =>
+        if (!lock.tryLock())
+          throw new IllegalStateException("Response callback lock could not be acquired in callback")
+      }
       synchronized {
         notify()
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 76ae35b..508bc35 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -180,6 +180,7 @@ class KafkaApisTest {
       EasyMock.eq(false),
       EasyMock.anyObject(),
       EasyMock.capture(responseCallback),
+      EasyMock.anyObject(),
       EasyMock.anyObject())).andAnswer(new IAnswer[Unit] {
       override def answer(): Unit = {
         responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE)))
@@ -218,6 +219,7 @@ class KafkaApisTest {
       EasyMock.eq(false),
       EasyMock.anyObject(),
       EasyMock.capture(responseCallback),
+      EasyMock.anyObject(),
       EasyMock.anyObject())).andAnswer(new IAnswer[Unit] {
       override def answer(): Unit = {
         responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE)))
@@ -248,6 +250,7 @@ class KafkaApisTest {
       EasyMock.eq(false),
       EasyMock.anyObject(),
       EasyMock.anyObject(),
+      EasyMock.anyObject(),
       EasyMock.anyObject()))
 
     EasyMock.replay(replicaManager)