You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/10/22 03:18:03 UTC
kafka git commit: KAFKA-6042: Avoid deadlock between two groups with
delayed operations
Repository: kafka
Updated Branches:
refs/heads/1.0 0fb01be23 -> 5ee157126
KAFKA-6042: Avoid deadlock between two groups with delayed operations
Author: Rajini Sivaram <ra...@googlemail.com>
Reviewers: Jason Gustafson <ja...@confluent.io>, Guozhang Wang <wa...@gmail.com>
Closes #4103 from rajinisivaram/KAFKA-6042-group-deadlock
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5ee15712
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5ee15712
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5ee15712
Branch: refs/heads/1.0
Commit: 5ee157126d595b913761cf1887963460bbe12855
Parents: 0fb01be
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Sat Oct 21 20:17:58 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Sat Oct 21 20:17:58 2017 -0700
----------------------------------------------------------------------
.../coordinator/group/DelayedHeartbeat.scala | 2 +-
.../kafka/coordinator/group/DelayedJoin.scala | 4 +-
.../coordinator/group/GroupCoordinator.scala | 34 +++++++------
.../kafka/coordinator/group/GroupMetadata.scala | 7 ++-
.../group/GroupMetadataManager.scala | 17 ++++---
.../transaction/DelayedTxnMarker.scala | 8 +--
.../transaction/TransactionCoordinator.scala | 8 +--
.../TransactionMarkerChannelManager.scala | 4 +-
...nsactionMarkerRequestCompletionHandler.scala | 2 +-
.../transaction/TransactionMetadata.scala | 8 ++-
.../transaction/TransactionStateManager.scala | 38 ++++++++++-----
.../scala/kafka/server/DelayedOperation.scala | 7 +--
.../scala/kafka/server/DelayedProduce.scala | 6 ++-
.../scala/kafka/server/ReplicaManager.scala | 4 +-
.../group/GroupCoordinatorTest.scala | 4 ++
.../group/GroupMetadataManagerTest.scala | 3 ++
.../TransactionMarkerChannelManagerTest.scala | 7 ++-
.../TransactionStateManagerTest.scala | 3 ++
.../kafka/server/DelayedOperationTest.scala | 51 ++++++++++++++++++--
.../scala/unit/kafka/server/KafkaApisTest.scala | 3 ++
20 files changed, 161 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
index 73d5d0f..5f16acb 100644
--- a/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
+++ b/core/src/main/scala/kafka/coordinator/group/DelayedHeartbeat.scala
@@ -28,7 +28,7 @@ private[group] class DelayedHeartbeat(coordinator: GroupCoordinator,
member: MemberMetadata,
heartbeatDeadline: Long,
sessionTimeout: Long)
- extends DelayedOperation(sessionTimeout) {
+ extends DelayedOperation(sessionTimeout, Some(group.lock)) {
override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, member, heartbeatDeadline, forceComplete _)
override def onExpiration() = coordinator.onExpireHeartbeat(group, member, heartbeatDeadline)
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala b/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
index 5232287..c75c0d4 100644
--- a/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
+++ b/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
@@ -33,7 +33,7 @@ import scala.math.{max, min}
*/
private[group] class DelayedJoin(coordinator: GroupCoordinator,
group: GroupMetadata,
- rebalanceTimeout: Long) extends DelayedOperation(rebalanceTimeout) {
+ rebalanceTimeout: Long) extends DelayedOperation(rebalanceTimeout, Some(group.lock)) {
override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete _)
override def onExpiration() = coordinator.onExpireJoin()
@@ -58,7 +58,7 @@ private[group] class InitialDelayedJoin(coordinator: GroupCoordinator,
override def tryComplete(): Boolean = false
override def onComplete(): Unit = {
- group synchronized {
+ group.inLock {
if (group.newMemberAdded && remainingMs != 0) {
group.newMemberAdded = false
val delay = min(configuredRebalanceDelay, remainingMs)
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index bb59bcd..94622f6 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -40,6 +40,12 @@ import scala.math.max
*
* Each Kafka server instantiates a coordinator which is responsible for a set of
* groups. Groups are assigned to coordinators based on their group names.
+ * <p>
+ * <b>Delayed operation locking notes:</b>
+ * Delayed operations in GroupCoordinator use `group` as the delayed operation
+ * lock. ReplicaManager.appendRecords may be invoked while holding the group lock
+ * used by its callback. The delayed callback may acquire the group lock
+ * since the delayed operation is completed only if the group lock can be acquired.
*/
class GroupCoordinator(val brokerId: Int,
val groupConfig: GroupConfig,
@@ -142,7 +148,7 @@ class GroupCoordinator(val brokerId: Int,
protocolType: String,
protocols: List[(String, Array[Byte])],
responseCallback: JoinCallback) {
- group synchronized {
+ group.inLock {
if (!group.is(Empty) && (!group.protocolType.contains(protocolType) || !group.supportsProtocols(protocols.map(_._1).toSet))) {
// if the new member does not support the group protocol, reject it
responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
@@ -248,7 +254,7 @@ class GroupCoordinator(val brokerId: Int,
memberId: String,
groupAssignment: Map[String, Array[Byte]],
responseCallback: SyncCallback) {
- group synchronized {
+ group.inLock {
if (!group.has(memberId)) {
responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)
} else if (generationId != group.generationId) {
@@ -273,7 +279,7 @@ class GroupCoordinator(val brokerId: Int,
val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap
groupManager.storeGroup(group, assignment, (error: Errors) => {
- group synchronized {
+ group.inLock {
// another member may have joined the group while we were awaiting this callback,
// so we must ensure we are still in the AwaitingSync state and the same generation
// when it gets invoked. if we have transitioned to another state, then do nothing
@@ -317,7 +323,7 @@ class GroupCoordinator(val brokerId: Int,
responseCallback(Errors.UNKNOWN_MEMBER_ID)
case Some(group) =>
- group synchronized {
+ group.inLock {
if (group.is(Dead) || !group.has(memberId)) {
responseCallback(Errors.UNKNOWN_MEMBER_ID)
} else {
@@ -349,7 +355,7 @@ class GroupCoordinator(val brokerId: Int,
responseCallback(Errors.UNKNOWN_MEMBER_ID)
case Some(group) =>
- group synchronized {
+ group.inLock {
group.currentState match {
case Dead =>
// if the group is marked as dead, it means some other thread has just removed the group
@@ -449,7 +455,7 @@ class GroupCoordinator(val brokerId: Int,
producerEpoch: Short,
offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicPartition, Errors] => Unit) {
- group synchronized {
+ group.inLock {
if (group.is(Dead)) {
responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID))
} else if ((generationId < 0 && group.is(Empty)) || (producerId != NO_PRODUCER_ID)) {
@@ -506,7 +512,7 @@ class GroupCoordinator(val brokerId: Int,
groupManager.getGroup(groupId) match {
case None => (Errors.NONE, GroupCoordinator.DeadGroup)
case Some(group) =>
- group synchronized {
+ group.inLock {
(Errors.NONE, group.summary)
}
}
@@ -529,7 +535,7 @@ class GroupCoordinator(val brokerId: Int,
}
private def onGroupUnloaded(group: GroupMetadata) {
- group synchronized {
+ group.inLock {
info(s"Unloading group metadata for ${group.groupId} with generation ${group.generationId}")
val previousState = group.currentState
group.transitionTo(Dead)
@@ -558,7 +564,7 @@ class GroupCoordinator(val brokerId: Int,
}
private def onGroupLoaded(group: GroupMetadata) {
- group synchronized {
+ group.inLock {
info(s"Loading group metadata for ${group.groupId} with generation ${group.generationId}")
assert(group.is(Stable) || group.is(Empty))
group.allMemberMetadata.foreach(completeAndScheduleNextHeartbeatExpiration(group, _))
@@ -666,7 +672,7 @@ class GroupCoordinator(val brokerId: Int,
}
private def maybePrepareRebalance(group: GroupMetadata) {
- group synchronized {
+ group.inLock {
if (group.canRebalance)
prepareRebalance(group)
}
@@ -706,7 +712,7 @@ class GroupCoordinator(val brokerId: Int,
}
def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = {
- group synchronized {
+ group.inLock {
if (group.notYetRejoinedMembers.isEmpty)
forceComplete()
else false
@@ -718,7 +724,7 @@ class GroupCoordinator(val brokerId: Int,
}
def onCompleteJoin(group: GroupMetadata) {
- group synchronized {
+ group.inLock {
// remove any members who haven't joined the group yet
group.notYetRejoinedMembers.foreach { failedMember =>
group.remove(failedMember.memberId)
@@ -768,7 +774,7 @@ class GroupCoordinator(val brokerId: Int,
}
def tryCompleteHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long, forceComplete: () => Boolean) = {
- group synchronized {
+ group.inLock {
if (shouldKeepMemberAlive(member, heartbeatDeadline) || member.isLeaving)
forceComplete()
else false
@@ -776,7 +782,7 @@ class GroupCoordinator(val brokerId: Int,
}
def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) {
- group synchronized {
+ group.inLock {
if (!shouldKeepMemberAlive(member, heartbeatDeadline)) {
info(s"Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group")
removeMemberAndUpdateGroup(group, member)
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index 18096bb..537d944 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -17,9 +17,10 @@
package kafka.coordinator.group
import java.util.UUID
+import java.util.concurrent.locks.ReentrantLock
import kafka.common.OffsetAndMetadata
-import kafka.utils.{Logging, nonthreadsafe}
+import kafka.utils.{CoreUtils, Logging, nonthreadsafe}
import org.apache.kafka.common.TopicPartition
import scala.collection.{Seq, immutable, mutable}
@@ -154,6 +155,8 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
private var state: GroupState = initialState
+ private[group] val lock = new ReentrantLock
+
private val members = new mutable.HashMap[String, MemberMetadata]
private val offsets = new mutable.HashMap[TopicPartition, CommitRecordMetadataAndOffset]
@@ -172,6 +175,8 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
var protocol: String = null
var newMemberAdded: Boolean = false
+ def inLock[T](fun: => T): T = CoreUtils.inLock(lock)(fun)
+
def is(groupState: GroupState) = state == groupState
def not(groupState: GroupState) = state != groupState
def has(memberId: String) = members.contains(memberId)
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 7519dc4..0298509 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -85,7 +85,7 @@ class GroupMetadataManager(brokerId: Int,
newGauge("NumOffsets",
new Gauge[Int] {
def value = groupMetadataCache.values.map(group => {
- group synchronized { group.numOffsets }
+ group.inLock { group.numOffsets }
}).sum
}
)
@@ -243,6 +243,7 @@ class GroupMetadataManager(brokerId: Int,
internalTopicsAllowed = true,
isFromClient = false,
entriesPerPartition = records,
+ delayedProduceLock = Some(group.lock),
responseCallback = callback)
}
@@ -260,7 +261,7 @@ class GroupMetadataManager(brokerId: Int,
validateOffsetMetadataLength(offsetAndMetadata.metadata)
}
- group synchronized {
+ group.inLock {
if (!group.hasReceivedConsistentOffsetCommits)
warn(s"group: ${group.groupId} with leader: ${group.leaderId} has received offset commits from consumers as well " +
s"as transactional producers. Mixing both types of offset commits will generally result in surprises and " +
@@ -309,7 +310,7 @@ class GroupMetadataManager(brokerId: Int,
// the offset and metadata to cache if the append status has no error
val status = responseStatus(offsetTopicPartition)
- val responseError = group synchronized {
+ val responseError = group.inLock {
if (status.error == Errors.NONE) {
if (!group.is(Dead)) {
filteredOffsetMetadata.foreach { case (topicPartition, offsetAndMetadata) =>
@@ -369,12 +370,12 @@ class GroupMetadataManager(brokerId: Int,
}
if (isTxnOffsetCommit) {
- group synchronized {
+ group.inLock {
addProducerGroup(producerId, group.groupId)
group.prepareTxnOffsetCommit(producerId, offsetMetadata)
}
} else {
- group synchronized {
+ group.inLock {
group.prepareOffsetCommit(offsetMetadata)
}
}
@@ -403,7 +404,7 @@ class GroupMetadataManager(brokerId: Int,
(topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE))
}.toMap
} else {
- group synchronized {
+ group.inLock {
if (group.is(Dead)) {
topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
(topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE))
@@ -675,7 +676,7 @@ class GroupMetadataManager(brokerId: Int,
var offsetsRemoved = 0
groupMetadataCache.foreach { case (groupId, group) =>
- val (removedOffsets, groupIsDead, generation) = group synchronized {
+ val (removedOffsets, groupIsDead, generation) = group.inLock {
val removedOffsets = deletedTopicPartitions match {
case Some(topicPartitions) => group.removeOffsets(topicPartitions)
case None => group.removeExpiredOffsets(startMs)
@@ -747,7 +748,7 @@ class GroupMetadataManager(brokerId: Int,
val pendingGroups = groupsBelongingToPartitions(producerId, completedPartitions)
pendingGroups.foreach { case (groupId) =>
getGroup(groupId) match {
- case Some(group) => group synchronized {
+ case Some(group) => group.inLock {
if (!group.is(Dead)) {
group.completePendingTxnOffsetCommit(producerId, isCommit)
removeProducerGroup(producerId, groupId)
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala b/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
index bc0f1b7..cf18b81 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/DelayedTxnMarker.scala
@@ -17,6 +17,7 @@
package kafka.coordinator.transaction
import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.Lock
import kafka.server.DelayedOperation
import org.apache.kafka.common.protocol.Errors
@@ -25,11 +26,12 @@ import org.apache.kafka.common.protocol.Errors
* Delayed transaction state change operations that are added to the purgatory without timeout (i.e. these operations should never time out)
*/
private[transaction] class DelayedTxnMarker(txnMetadata: TransactionMetadata,
- completionCallback: Errors => Unit)
- extends DelayedOperation(TimeUnit.DAYS.toMillis(100 * 365)) {
+ completionCallback: Errors => Unit,
+ lock: Lock)
+ extends DelayedOperation(TimeUnit.DAYS.toMillis(100 * 365), Some(lock)) {
override def tryComplete(): Boolean = {
- txnMetadata synchronized {
+ txnMetadata.inLock {
if (txnMetadata.topicPartitions.isEmpty)
forceComplete()
else false
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 6b9b7ef..0b38dbc 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -137,7 +137,7 @@ class TransactionCoordinator(brokerId: Int,
val coordinatorEpoch = existingEpochAndMetadata.coordinatorEpoch
val txnMetadata = existingEpochAndMetadata.transactionMetadata
- txnMetadata synchronized {
+ txnMetadata.inLock {
prepareInitProduceIdTransit(transactionalId, transactionTimeoutMs, coordinatorEpoch, txnMetadata)
}
}
@@ -241,7 +241,7 @@ class TransactionCoordinator(brokerId: Int,
val txnMetadata = epochAndMetadata.transactionMetadata
// generate the new transaction metadata with added partitions
- txnMetadata synchronized {
+ txnMetadata.inLock {
if (txnMetadata.producerId != producerId) {
Left(Errors.INVALID_PRODUCER_ID_MAPPING)
} else if (txnMetadata.producerEpoch != producerEpoch) {
@@ -304,7 +304,7 @@ class TransactionCoordinator(brokerId: Int,
val txnMetadata = epochAndTxnMetadata.transactionMetadata
val coordinatorEpoch = epochAndTxnMetadata.coordinatorEpoch
- txnMetadata synchronized {
+ txnMetadata.inLock {
if (txnMetadata.producerId != producerId)
Left(Errors.INVALID_PRODUCER_ID_MAPPING)
else if (txnMetadata.producerEpoch != producerEpoch)
@@ -368,7 +368,7 @@ class TransactionCoordinator(brokerId: Int,
case Some(epochAndMetadata) =>
if (epochAndMetadata.coordinatorEpoch == coordinatorEpoch) {
val txnMetadata = epochAndMetadata.transactionMetadata
- txnMetadata synchronized {
+ txnMetadata.inLock {
if (txnMetadata.producerId != producerId)
Left(Errors.INVALID_PRODUCER_ID_MAPPING)
else if (txnMetadata.producerEpoch != producerEpoch)
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index 6c13de4..4fc9db2 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -263,7 +263,7 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
}
}
- val delayedTxnMarker = new DelayedTxnMarker(txnMetadata, appendToLogCallback)
+ val delayedTxnMarker = new DelayedTxnMarker(txnMetadata, appendToLogCallback, txnStateManager.stateReadLock)
txnMarkerPurgatory.tryCompleteElseWatch(delayedTxnMarker, Seq(transactionalId))
addTxnMarkersToBrokerQueue(transactionalId, txnMetadata.producerId, txnMetadata.producerEpoch, txnResult, coordinatorEpoch, txnMetadata.topicPartitions.toSet)
@@ -340,7 +340,7 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
val txnMetadata = epochAndMetadata.transactionMetadata
- txnMetadata synchronized {
+ txnMetadata.inLock {
topicPartitions.foreach(txnMetadata.removePartition)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
index bfa25be..fefe767 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
@@ -129,7 +129,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
txnMarkerChannelManager.removeMarkersForTxnId(transactionalId)
abortSending = true
} else {
- txnMetadata synchronized {
+ txnMetadata.inLock {
for ((topicPartition, error) <- errors.asScala) {
error match {
case Errors.NONE =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
index 405e639..486a887 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
@@ -16,7 +16,9 @@
*/
package kafka.coordinator.transaction
-import kafka.utils.{Logging, nonthreadsafe}
+import java.util.concurrent.locks.ReentrantLock
+
+import kafka.utils.{CoreUtils, Logging, nonthreadsafe}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.record.RecordBatch
@@ -156,6 +158,10 @@ private[transaction] class TransactionMetadata(val transactionalId: String,
// initialized as the same as the current state
var pendingState: Option[TransactionState] = None
+ private[transaction] val lock = new ReentrantLock
+
+ def inLock[T](fun: => T): T = CoreUtils.inLock(lock)(fun)
+
def addPartitions(partitions: collection.Set[TopicPartition]): Unit = {
topicPartitions ++= partitions
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index ad5d33b..f2e25c4 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -56,6 +56,16 @@ object TransactionStateManager {
* 1. the transaction log, which is a special internal topic.
* 2. the transaction metadata including its ongoing transaction status.
* 3. the background expiration of the transaction as well as the transactional id.
+ *
+ * <b>Delayed operation locking notes:</b>
+ * Delayed operations in TransactionStateManager use `stateLock.readLock` as the delayed operation
+ * lock. Delayed operations are completed only if `stateLock.readLock` can be acquired.
+ * Delayed callbacks may acquire `stateLock.readLock` or any of the `txnMetadata` locks.
+ * <ul>
+ * <li>`stateLock.readLock` must never be acquired while holding `txnMetadata` lock.</li>
+ * <li>`txnMetadata` lock must never be acquired while holding `stateLock.writeLock`.</li>
+ * <li>`ReplicaManager.appendRecords` should never be invoked while holding a `txnMetadata` lock.</li>
+ * </ul>
*/
class TransactionStateManager(brokerId: Int,
zkUtils: ZkUtils,
@@ -95,6 +105,7 @@ class TransactionStateManager(brokerId: Int,
loadingPartitions.add(partitionAndLeaderEpoch)
}
}
+ private[transaction] def stateReadLock = stateLock.readLock
// this is best-effort expiration of an ongoing transaction which has been open for more than its
// txn timeout value, we do not need to grab the lock on the metadata object upon checking its state
@@ -136,7 +147,7 @@ class TransactionStateManager(brokerId: Int,
}.filter { case (_, txnMetadata) =>
txnMetadata.txnLastUpdateTimestamp <= now - config.transactionalIdExpirationMs
}.map { case (transactionalId, txnMetadata) =>
- val txnMetadataTransition = txnMetadata synchronized {
+ val txnMetadataTransition = txnMetadata.inLock {
txnMetadata.prepareDead()
}
TransactionalIdCoordinatorEpochAndMetadata(transactionalId, entry.coordinatorEpoch, txnMetadataTransition)
@@ -166,7 +177,7 @@ class TransactionStateManager(brokerId: Int,
.foreach { txnMetadataCacheEntry =>
toRemove.foreach { idCoordinatorEpochAndMetadata =>
val txnMetadata = txnMetadataCacheEntry.metadataPerTransactionalId.get(idCoordinatorEpochAndMetadata.transactionalId)
- txnMetadata synchronized {
+ txnMetadata.inLock {
if (txnMetadataCacheEntry.coordinatorEpoch == idCoordinatorEpochAndMetadata.coordinatorEpoch
&& txnMetadata.pendingState.contains(Dead)
&& txnMetadata.producerEpoch == idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch
@@ -196,7 +207,8 @@ class TransactionStateManager(brokerId: Int,
internalTopicsAllowed = true,
isFromClient = false,
recordsPerPartition,
- removeFromCacheCallback
+ removeFromCacheCallback,
+ Some(stateLock.readLock)
)
}
@@ -376,7 +388,7 @@ class TransactionStateManager(brokerId: Int,
val transactionsPendingForCompletion = new mutable.ListBuffer[TransactionalIdCoordinatorEpochAndTransitMetadata]
loadedTransactions.foreach {
case (transactionalId, txnMetadata) =>
- txnMetadata synchronized {
+ txnMetadata.inLock {
// if state is PrepareCommit or PrepareAbort we need to complete the transaction
txnMetadata.state match {
case PrepareAbort =>
@@ -509,7 +521,7 @@ class TransactionStateManager(brokerId: Int,
case Right(Some(epochAndMetadata)) =>
val metadata = epochAndMetadata.transactionMetadata
- metadata synchronized {
+ metadata.inLock {
if (epochAndMetadata.coordinatorEpoch != coordinatorEpoch) {
// the cache may have been changed due to txn topic partition emigration and immigration,
// in this case directly return NOT_COORDINATOR to client and let it to re-discover the transaction coordinator
@@ -536,7 +548,7 @@ class TransactionStateManager(brokerId: Int,
getTransactionState(transactionalId) match {
case Right(Some(epochAndTxnMetadata)) =>
val metadata = epochAndTxnMetadata.transactionMetadata
- metadata synchronized {
+ metadata.inLock {
if (epochAndTxnMetadata.coordinatorEpoch == coordinatorEpoch) {
if (retryOnError(responseError)) {
info(s"TransactionalId ${metadata.transactionalId} append transaction log for $newMetadata transition failed due to $responseError, " +
@@ -586,24 +598,28 @@ class TransactionStateManager(brokerId: Int,
case Right(Some(epochAndMetadata)) =>
val metadata = epochAndMetadata.transactionMetadata
- metadata synchronized {
+ val append: Boolean = metadata.inLock {
if (epochAndMetadata.coordinatorEpoch != coordinatorEpoch) {
// the coordinator epoch has changed, reply to client immediately with with NOT_COORDINATOR
responseCallback(Errors.NOT_COORDINATOR)
+ false
} else {
// do not need to check the metadata object itself since no concurrent thread should be able to modify it
// under the same coordinator epoch, so directly append to txn log now
-
- replicaManager.appendRecords(
+ true
+ }
+ }
+ if (append) {
+ replicaManager.appendRecords(
newMetadata.txnTimeoutMs.toLong,
TransactionLog.EnforcedRequiredAcks,
internalTopicsAllowed = true,
isFromClient = false,
recordsPerPartition,
- updateCacheCallback)
+ updateCacheCallback,
+ delayedProduceLock = Some(stateLock.readLock))
trace(s"Appending new metadata $newMetadata for transaction id $transactionalId with coordinator epoch $coordinatorEpoch to the local transaction log")
- }
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/main/scala/kafka/server/DelayedOperation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala
index 86bf1ff..894d30e 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -19,7 +19,7 @@ package kafka.server
import java.util.concurrent._
import java.util.concurrent.atomic._
-import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock}
+import java.util.concurrent.locks.{Lock, ReentrantLock, ReentrantReadWriteLock}
import com.yammer.metrics.core.Gauge
import kafka.metrics.KafkaMetricsGroup
@@ -43,11 +43,12 @@ import scala.collection.mutable.ListBuffer
*
* A subclass of DelayedOperation needs to provide an implementation of both onComplete() and tryComplete().
*/
-abstract class DelayedOperation(override val delayMs: Long) extends TimerTask with Logging {
+abstract class DelayedOperation(override val delayMs: Long,
+ lockOpt: Option[Lock] = None) extends TimerTask with Logging {
private val completed = new AtomicBoolean(false)
// Visible for testing
- private[server] val lock: ReentrantLock = new ReentrantLock
+ private[server] val lock: Lock = lockOpt.getOrElse(new ReentrantLock)
/*
* Force completing the delayed operation, if not already completed.
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/main/scala/kafka/server/DelayedProduce.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala
index ebbd9ee..718ed24 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -19,6 +19,7 @@ package kafka.server
import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.Lock
import com.yammer.metrics.core.Meter
import kafka.metrics.KafkaMetricsGroup
@@ -54,8 +55,9 @@ case class ProduceMetadata(produceRequiredAcks: Short,
class DelayedProduce(delayMs: Long,
produceMetadata: ProduceMetadata,
replicaManager: ReplicaManager,
- responseCallback: Map[TopicPartition, PartitionResponse] => Unit)
- extends DelayedOperation(delayMs) {
+ responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
+ lockOpt: Option[Lock] = None)
+ extends DelayedOperation(delayMs, lockOpt) {
// first update the acks pending variable according to the error code
produceMetadata.produceStatus.foreach { case (topicPartition, status) =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index e5c9432..43ecc44 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -19,6 +19,7 @@ package kafka.server
import java.io.File
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+import java.util.concurrent.locks.Lock
import com.yammer.metrics.core.Gauge
import kafka.api._
@@ -450,6 +451,7 @@ class ReplicaManager(val config: KafkaConfig,
isFromClient: Boolean,
entriesPerPartition: Map[TopicPartition, MemoryRecords],
responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
+ delayedProduceLock: Option[Lock] = None,
processingStatsCallback: Map[TopicPartition, RecordsProcessingStats] => Unit = _ => ()) {
if (isValidRequiredAcks(requiredAcks)) {
val sTime = time.milliseconds
@@ -469,7 +471,7 @@ class ReplicaManager(val config: KafkaConfig,
if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
// create delayed produce operation
val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
- val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback)
+ val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)
// create a list of (topic, partition) pairs to use as keys for this delayed produce operation
val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 3fed45d..7075c63 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -28,6 +28,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetCommitRequest, OffsetFetchResponse, TransactionResult}
import org.easymock.{Capture, EasyMock, IAnswer}
import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.ReentrantLock
import org.apache.kafka.common.internals.Topic
import org.junit.Assert._
@@ -1367,6 +1368,7 @@ class GroupCoordinatorTest extends JUnitSuite {
isFromClient = EasyMock.eq(false),
EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
EasyMock.capture(capturedArgument),
+ EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
EasyMock.anyObject())).andAnswer(new IAnswer[Unit] {
override def answer = capturedArgument.getValue.apply(
Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
@@ -1450,6 +1452,7 @@ class GroupCoordinatorTest extends JUnitSuite {
isFromClient = EasyMock.eq(false),
EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
EasyMock.capture(capturedArgument),
+ EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
EasyMock.anyObject())
).andAnswer(new IAnswer[Unit] {
override def answer = capturedArgument.getValue.apply(
@@ -1479,6 +1482,7 @@ class GroupCoordinatorTest extends JUnitSuite {
isFromClient = EasyMock.eq(false),
EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
EasyMock.capture(capturedArgument),
+ EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
EasyMock.anyObject())
).andAnswer(new IAnswer[Unit] {
override def answer = capturedArgument.getValue.apply(
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 46a1878..78a5eaa 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -38,6 +38,7 @@ import org.apache.kafka.common.internals.Topic
import scala.collection.JavaConverters._
import scala.collection._
+import java.util.concurrent.locks.ReentrantLock
class GroupMetadataManagerTest {
@@ -1306,6 +1307,7 @@ class GroupMetadataManagerTest {
isFromClient = EasyMock.eq(false),
EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
EasyMock.capture(capturedArgument),
+ EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
EasyMock.anyObject())
)
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
@@ -1320,6 +1322,7 @@ class GroupMetadataManagerTest {
isFromClient = EasyMock.eq(false),
EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
EasyMock.capture(capturedArgument),
+ EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
EasyMock.anyObject())
).andAnswer(new IAnswer[Unit] {
override def answer = capturedArgument.getValue.apply(
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
index 0bc1c9f..a039c53 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
@@ -16,6 +16,8 @@
*/
package kafka.coordinator.transaction
+import java.util.concurrent.locks.ReentrantReadWriteLock
+
import kafka.server.{DelayedOperationPurgatory, KafkaConfig, MetadataCache}
import kafka.utils.timer.MockTimer
import kafka.utils.TestUtils
@@ -86,7 +88,10 @@ class TransactionMarkerChannelManagerTest {
EasyMock.expect(txnStateManager.getTransactionState(EasyMock.eq(transactionalId2)))
.andReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata2))))
.anyTimes()
-
+ val stateLock = new ReentrantReadWriteLock
+ EasyMock.expect(txnStateManager.stateReadLock)
+ .andReturn(stateLock.readLock)
+ .anyTimes()
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 0a2b641..7973b9a 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -17,6 +17,7 @@
package kafka.coordinator.transaction
import java.nio.ByteBuffer
+import java.util.concurrent.locks.ReentrantLock
import kafka.log.Log
import kafka.server.{FetchDataInfo, LogOffsetMetadata, ReplicaManager}
@@ -498,6 +499,7 @@ class TransactionStateManagerTest {
EasyMock.eq(false),
EasyMock.eq(recordsByPartition),
EasyMock.capture(capturedArgument),
+ EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
EasyMock.anyObject()
)).andAnswer(new IAnswer[Unit] {
override def answer(): Unit = {
@@ -598,6 +600,7 @@ class TransactionStateManagerTest {
isFromClient = EasyMock.eq(false),
EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
EasyMock.capture(capturedArgument),
+ EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]],
EasyMock.anyObject())
).andAnswer(new IAnswer[Unit] {
override def answer(): Unit = capturedArgument.getValue.apply(
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
index 5b1daff..d4d79e5 100644
--- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
@@ -18,6 +18,7 @@
package kafka.server
import java.util.concurrent.{Executors, Future}
+import java.util.concurrent.locks.ReentrantLock
import kafka.utils.CoreUtils.inLock
@@ -123,12 +124,28 @@ class DelayedOperationTest {
@Test
def testDelayedOperationLock() {
+ verifyDelayedOperationLock(new MockDelayedOperation(100000L), mismatchedLocks = false)
+ }
+
+ @Test
+ def testDelayedOperationLockOverride() {
+ def newMockOperation = {
+ val lock = new ReentrantLock
+ new MockDelayedOperation(100000L, Some(lock), Some(lock))
+ }
+ verifyDelayedOperationLock(newMockOperation, mismatchedLocks = false)
+
+ verifyDelayedOperationLock(new MockDelayedOperation(100000L, None, Some(new ReentrantLock)),
+ mismatchedLocks = true)
+ }
+
+ def verifyDelayedOperationLock(mockDelayedOperation: => MockDelayedOperation, mismatchedLocks: Boolean) {
val key = "key"
val executorService = Executors.newSingleThreadExecutor
try {
def createDelayedOperations(count: Int): Seq[MockDelayedOperation] = {
(1 to count).map { _ =>
- val op = new MockDelayedOperation(100000L)
+ val op = mockDelayedOperation
purgatory.tryCompleteElseWatch(op, Seq(key))
assertFalse("Not completable", op.isCompleted)
op
@@ -137,7 +154,7 @@ class DelayedOperationTest {
def createCompletableOperations(count: Int): Seq[MockDelayedOperation] = {
(1 to count).map { _ =>
- val op = new MockDelayedOperation(100000L)
+ val op = mockDelayedOperation
op.completable = true
op
}
@@ -181,6 +198,27 @@ class DelayedOperationTest {
checkAndComplete(ops, Seq(ops(1)))
} finally {
runOnAnotherThread(ops(0).lock.unlock(), true)
+ checkAndComplete(Seq(ops(0)), Seq(ops(0)))
+ }
+
+ // Lock acquired by response callback held by another thread, should not block
+ // if the response lock is used as operation lock, only operations
+ // that can be locked without blocking on the current thread should complete
+ ops = createDelayedOperations(2)
+ ops(0).responseLockOpt.foreach { lock =>
+ runOnAnotherThread(lock.lock(), true)
+ try {
+ try {
+ checkAndComplete(ops, Seq(ops(1)))
+ assertFalse("Should have failed with mismatched locks", mismatchedLocks)
+ } catch {
+ case e: IllegalStateException =>
+ assertTrue("Should not have failed with valid locks", mismatchedLocks)
+ }
+ } finally {
+ runOnAnotherThread(lock.unlock(), true)
+ checkAndComplete(Seq(ops(0)), Seq(ops(0)))
+ }
}
// Immediately completable operations should complete without locking
@@ -196,8 +234,9 @@ class DelayedOperationTest {
}
- class MockDelayedOperation(delayMs: Long)
- extends DelayedOperation(delayMs) {
+ class MockDelayedOperation(delayMs: Long,
+ lockOpt: Option[ReentrantLock] = None,
+ val responseLockOpt: Option[ReentrantLock] = None) extends DelayedOperation(delayMs, lockOpt) {
var completable = false
def awaitExpiration() {
@@ -218,6 +257,10 @@ class DelayedOperationTest {
}
override def onComplete() {
+ responseLockOpt.foreach { lock =>
+ if (!lock.tryLock())
+ throw new IllegalStateException("Response callback lock could not be acquired in callback")
+ }
synchronized {
notify()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5ee15712/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 76ae35b..508bc35 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -180,6 +180,7 @@ class KafkaApisTest {
EasyMock.eq(false),
EasyMock.anyObject(),
EasyMock.capture(responseCallback),
+ EasyMock.anyObject(),
EasyMock.anyObject())).andAnswer(new IAnswer[Unit] {
override def answer(): Unit = {
responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE)))
@@ -218,6 +219,7 @@ class KafkaApisTest {
EasyMock.eq(false),
EasyMock.anyObject(),
EasyMock.capture(responseCallback),
+ EasyMock.anyObject(),
EasyMock.anyObject())).andAnswer(new IAnswer[Unit] {
override def answer(): Unit = {
responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE)))
@@ -248,6 +250,7 @@ class KafkaApisTest {
EasyMock.eq(false),
EasyMock.anyObject(),
EasyMock.anyObject(),
+ EasyMock.anyObject(),
EasyMock.anyObject()))
EasyMock.replay(replicaManager)