You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2020/09/09 21:44:28 UTC
[kafka] branch trunk updated: KAFKA-8334 Make sure the thread which tries to complete delayed reque… (#8657)
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c2273ad KAFKA-8334 Make sure the thread which tries to complete delayed reque… (#8657)
c2273ad is described below
commit c2273adc25b2bab0a3ac95bf7844fedf2860b40b
Author: Chia-Ping Tsai <ch...@gmail.com>
AuthorDate: Thu Sep 10 05:42:37 2020 +0800
KAFKA-8334 Make sure the thread which tries to complete delayed reque… (#8657)
The main changes of this PR are shown below.
1. replace tryLock by lock for DelayedOperation#maybeTryComplete
2. complete the delayed requests without holding group lock
Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>
---
core/src/main/scala/kafka/cluster/Partition.scala | 22 +---
.../kafka/coordinator/group/DelayedJoin.scala | 11 +-
.../coordinator/group/GroupMetadataManager.scala | 2 +-
core/src/main/scala/kafka/log/Log.scala | 13 ++-
core/src/main/scala/kafka/server/ActionQueue.scala | 56 ++++++++++
.../main/scala/kafka/server/DelayedOperation.scala | 118 +++++++++------------
core/src/main/scala/kafka/server/KafkaApis.scala | 4 +
.../main/scala/kafka/server/ReplicaManager.scala | 31 ++++++
.../AbstractCoordinatorConcurrencyTest.scala | 10 +-
.../group/GroupCoordinatorConcurrencyTest.scala | 65 ++++++++----
.../TransactionCoordinatorConcurrencyTest.scala | 5 +-
.../unit/kafka/server/DelayedOperationTest.scala | 96 +++++++----------
12 files changed, 257 insertions(+), 176 deletions(-)
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index fb0576e..fc0852f 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -103,19 +103,7 @@ class DelayedOperations(topicPartition: TopicPartition,
fetch.checkAndComplete(TopicPartitionOperationKey(topicPartition))
}
- def checkAndCompleteProduce(): Unit = {
- produce.checkAndComplete(TopicPartitionOperationKey(topicPartition))
- }
-
- def checkAndCompleteDeleteRecords(): Unit = {
- deleteRecords.checkAndComplete(TopicPartitionOperationKey(topicPartition))
- }
-
def numDelayedDelete: Int = deleteRecords.numDelayed
-
- def numDelayedFetch: Int = fetch.numDelayed
-
- def numDelayedProduce: Int = produce.numDelayed
}
object Partition extends KafkaMetricsGroup {
@@ -1010,15 +998,7 @@ class Partition(val topicPartition: TopicPartition,
}
}
- // some delayed operations may be unblocked after HW changed
- if (leaderHWIncremented)
- tryCompleteDelayedRequests()
- else {
- // probably unblock some follower fetch requests since log end offset has been updated
- delayedOperations.checkAndCompleteFetch()
- }
-
- info
+ info.copy(leaderHwChange = if (leaderHWIncremented) LeaderHwChange.Increased else LeaderHwChange.Same)
}
def readRecords(fetchOffset: Long,
diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala b/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
index dad2b1e..92e8835 100644
--- a/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
+++ b/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
@@ -36,8 +36,15 @@ private[group] class DelayedJoin(coordinator: GroupCoordinator,
rebalanceTimeout: Long) extends DelayedOperation(rebalanceTimeout, Some(group.lock)) {
override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete _)
- override def onExpiration() = coordinator.onExpireJoin()
- override def onComplete() = coordinator.onCompleteJoin(group)
+ override def onExpiration(): Unit = {
+ coordinator.onExpireJoin()
+ // try to complete delayed actions introduced by coordinator.onCompleteJoin
+ tryToCompleteDelayedAction()
+ }
+ override def onComplete(): Unit = coordinator.onCompleteJoin(group)
+
+ // TODO: remove this ugly chain after we move the action queue to handler thread
+ private def tryToCompleteDelayedAction(): Unit = coordinator.groupManager.replicaManager.tryCompleteActions()
}
/**
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 1fcdd91..9d58b2d 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -56,7 +56,7 @@ import scala.jdk.CollectionConverters._
class GroupMetadataManager(brokerId: Int,
interBrokerProtocolVersion: ApiVersion,
config: OffsetConfig,
- replicaManager: ReplicaManager,
+ val replicaManager: ReplicaManager,
zkClient: KafkaZkClient,
time: Time,
metrics: Metrics) extends Logging with KafkaMetricsGroup {
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index fa78666..8953156 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -68,6 +68,13 @@ object LogAppendInfo {
offsetsMonotonic = false, -1L, recordErrors, errorMessage)
}
+sealed trait LeaderHwChange
+object LeaderHwChange {
+ case object Increased extends LeaderHwChange
+ case object Same extends LeaderHwChange
+ case object None extends LeaderHwChange
+}
+
/**
* Struct to hold various quantities we compute about each message set before appending to the log
*
@@ -85,6 +92,9 @@ object LogAppendInfo {
* @param validBytes The number of valid bytes
* @param offsetsMonotonic Are the offsets in this message set monotonically increasing
* @param lastOffsetOfFirstBatch The last offset of the first batch
+ * @param leaderHwChange Incremental if the high watermark needs to be increased after appending record.
+ * Same if high watermark is not changed. None is the default value and it means append failed
+ *
*/
case class LogAppendInfo(var firstOffset: Option[Long],
var lastOffset: Long,
@@ -100,7 +110,8 @@ case class LogAppendInfo(var firstOffset: Option[Long],
offsetsMonotonic: Boolean,
lastOffsetOfFirstBatch: Long,
recordErrors: Seq[RecordError] = List(),
- errorMessage: String = null) {
+ errorMessage: String = null,
+ leaderHwChange: LeaderHwChange = LeaderHwChange.None) {
/**
* Get the first offset if it exists, else get the last offset of the first batch
* For magic versions 2 and newer, this method will return first offset. For magic versions
diff --git a/core/src/main/scala/kafka/server/ActionQueue.scala b/core/src/main/scala/kafka/server/ActionQueue.scala
new file mode 100644
index 0000000..1b6b832
--- /dev/null
+++ b/core/src/main/scala/kafka/server/ActionQueue.scala
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import kafka.utils.Logging
+
+/**
+ * This queue is used to collect actions which need to be executed later. One use case is that ReplicaManager#appendRecords
+ * produces record changes so we need to check and complete delayed requests. In order to avoid conflicting locking,
+ * we add those actions to this queue and then complete them at the end of KafkaApis.handle() or DelayedJoin.onExpiration.
+ */
+class ActionQueue extends Logging {
+ private val queue = new ConcurrentLinkedQueue[() => Unit]()
+
+ /**
+ * add action to this queue.
+ * @param action action
+ */
+ def add(action: () => Unit): Unit = queue.add(action)
+
+ /**
+ * try to complete all delayed actions
+ */
+ def tryCompleteActions(): Unit = {
+ val maxToComplete = queue.size()
+ var count = 0
+ var done = false
+ while (!done && count < maxToComplete) {
+ try {
+ val action = queue.poll()
+ if (action == null) done = true
+ else action()
+ } catch {
+ case e: Throwable =>
+ error("failed to complete delayed actions", e)
+ } finally count += 1
+ }
+ }
+}
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala
index 2756b4f..09fd337 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -41,13 +41,15 @@ import scala.collection.mutable.ListBuffer
* forceComplete().
*
* A subclass of DelayedOperation needs to provide an implementation of both onComplete() and tryComplete().
+ *
+ * Noted that if you add a future delayed operation that calls ReplicaManager.appendRecords() in onComplete()
+ * like DelayedJoin, you must be aware that this operation's onExpiration() needs to call actionQueue.tryCompleteAction().
*/
abstract class DelayedOperation(override val delayMs: Long,
lockOpt: Option[Lock] = None)
extends TimerTask with Logging {
private val completed = new AtomicBoolean(false)
- private val tryCompletePending = new AtomicBoolean(false)
// Visible for testing
private[server] val lock: Lock = lockOpt.getOrElse(new ReentrantLock)
@@ -100,42 +102,24 @@ abstract class DelayedOperation(override val delayMs: Long,
def tryComplete(): Boolean
/**
- * Thread-safe variant of tryComplete() that attempts completion only if the lock can be acquired
- * without blocking.
- *
- * If threadA acquires the lock and performs the check for completion before completion criteria is met
- * and threadB satisfies the completion criteria, but fails to acquire the lock because threadA has not
- * yet released the lock, we need to ensure that completion is attempted again without blocking threadA
- * or threadB. `tryCompletePending` is set by threadB when it fails to acquire the lock and at least one
- * of threadA or threadB will attempt completion of the operation if this flag is set. This ensures that
- * every invocation of `maybeTryComplete` is followed by at least one invocation of `tryComplete` until
- * the operation is actually completed.
+ * Thread-safe variant of tryComplete() and call extra function if first tryComplete returns false
+ * @param f else function to be executed after first tryComplete returns false
+ * @return result of tryComplete
*/
- private[server] def maybeTryComplete(): Boolean = {
- var retry = false
- var done = false
- do {
- if (lock.tryLock()) {
- try {
- tryCompletePending.set(false)
- done = tryComplete()
- } finally {
- lock.unlock()
- }
- // While we were holding the lock, another thread may have invoked `maybeTryComplete` and set
- // `tryCompletePending`. In this case we should retry.
- retry = tryCompletePending.get()
- } else {
- // Another thread is holding the lock. If `tryCompletePending` is already set and this thread failed to
- // acquire the lock, then the thread that is holding the lock is guaranteed to see the flag and retry.
- // Otherwise, we should set the flag and retry on this thread since the thread holding the lock may have
- // released the lock and returned by the time the flag is set.
- retry = !tryCompletePending.getAndSet(true)
- }
- } while (!isCompleted && retry)
- done
+ private[server] def safeTryCompleteOrElse(f: => Unit): Boolean = inLock(lock) {
+ if (tryComplete()) true
+ else {
+ f
+ // last completion check
+ tryComplete()
+ }
}
+ /**
+ * Thread-safe variant of tryComplete()
+ */
+ private[server] def safeTryComplete(): Boolean = inLock(lock)(tryComplete())
+
/*
* run() method defines a task that is executed on timeout
*/
@@ -219,38 +203,38 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri
def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {
assert(watchKeys.nonEmpty, "The watch key list can't be empty")
- // The cost of tryComplete() is typically proportional to the number of keys. Calling
- // tryComplete() for each key is going to be expensive if there are many keys. Instead,
- // we do the check in the following way. Call tryComplete(). If the operation is not completed,
- // we just add the operation to all keys. Then we call tryComplete() again. At this time, if
- // the operation is still not completed, we are guaranteed that it won't miss any future triggering
- // event since the operation is already on the watcher list for all keys. This does mean that
- // if the operation is completed (by another thread) between the two tryComplete() calls, the
- // operation is unnecessarily added for watch. However, this is a less severe issue since the
- // expire reaper will clean it up periodically.
-
- // At this point the only thread that can attempt this operation is this current thread
- // Hence it is safe to tryComplete() without a lock
- var isCompletedByMe = operation.tryComplete()
- if (isCompletedByMe)
- return true
-
- var watchCreated = false
- for(key <- watchKeys) {
- // If the operation is already completed, stop adding it to the rest of the watcher list.
- if (operation.isCompleted)
- return false
- watchForOperation(key, operation)
-
- if (!watchCreated) {
- watchCreated = true
- estimatedTotalOperations.incrementAndGet()
- }
- }
-
- isCompletedByMe = operation.maybeTryComplete()
- if (isCompletedByMe)
- return true
+ // The cost of tryComplete() is typically proportional to the number of keys. Calling tryComplete() for each key is
+ // going to be expensive if there are many keys. Instead, we do the check in the following way through safeTryCompleteOrElse().
+ // If the operation is not completed, we just add the operation to all keys. Then we call tryComplete() again. At
+ // this time, if the operation is still not completed, we are guaranteed that it won't miss any future triggering
+ // event since the operation is already on the watcher list for all keys.
+ //
+ // ==============[story about lock]==============
+ // Through safeTryCompleteOrElse(), we hold the operation's lock while adding the operation to watch list and doing
+ // the tryComplete() check. This is to avoid a potential deadlock between the callers to tryCompleteElseWatch() and
+ // checkAndComplete(). For example, the following deadlock can happen if the lock is only held for the final tryComplete()
+ // 1) thread_a holds readlock of stateLock from TransactionStateManager
+ // 2) thread_a is executing tryCompleteElseWatch()
+ // 3) thread_a adds op to watch list
+ // 4) thread_b requires writelock of stateLock from TransactionStateManager (blocked by thread_a)
+ // 5) thread_c calls checkAndComplete() and holds lock of op
+ // 6) thread_c is waiting readlock of stateLock to complete op (blocked by thread_b)
+ // 7) thread_a is waiting lock of op to call the final tryComplete() (blocked by thread_c)
+ //
+ // Note that even with the current approach, deadlocks could still be introduced. For example,
+ // 1) thread_a calls tryCompleteElseWatch() and gets lock of op
+ // 2) thread_a adds op to watch list
+ // 3) thread_a calls op#tryComplete and tries to require lock_b
+ // 4) thread_b holds lock_b and calls checkAndComplete()
+ // 5) thread_b sees op from watch list
+ // 6) thread_b needs lock of op
+ // To avoid the above scenario, we recommend DelayedOperationPurgatory.checkAndComplete() be called without holding
+ // any exclusive lock. Since DelayedOperationPurgatory.checkAndComplete() completes delayed operations asynchronously,
+ // holding a exclusive lock to make the call is often unnecessary.
+ if (operation.safeTryCompleteOrElse {
+ watchKeys.foreach(key => watchForOperation(key, operation))
+ if (watchKeys.nonEmpty) estimatedTotalOperations.incrementAndGet()
+ }) return true
// if it cannot be completed by now and hence is watched, add to the expire queue also
if (!operation.isCompleted) {
@@ -375,7 +359,7 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri
if (curr.isCompleted) {
// another thread has completed this operation, just remove it
iter.remove()
- } else if (curr.maybeTryComplete()) {
+ } else if (curr.safeTryComplete()) {
iter.remove()
completed += 1
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 867ff6a..ef2df97 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -186,6 +186,10 @@ class KafkaApis(val requestChannel: RequestChannel,
case e: FatalExitError => throw e
case e: Throwable => handleError(request, e)
} finally {
+ // try to complete delayed action. In order to avoid conflicting locking, the actions to complete delayed requests
+ // are kept in a queue. We add the logic to check the ReplicaManager queue at the end of KafkaApis.handle() and the
+ // expiration thread for certain delayed operations (e.g. DelayedJoin)
+ replicaManager.tryCompleteActions()
// The local completion time may be set while processing the request. Only record it if it's unset.
if (request.apiLocalCompleteTimeNanos < 0)
request.apiLocalCompleteTimeNanos = time.nanoseconds
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 32c9aa4..1ad37ef 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -559,9 +559,20 @@ class ReplicaManager(val config: KafkaConfig,
}
/**
+ * TODO: move this action queue to handle thread so we can simplify concurrency handling
+ */
+ private val actionQueue = new ActionQueue
+
+ def tryCompleteActions(): Unit = actionQueue.tryCompleteActions()
+
+ /**
* Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas;
* the callback function will be triggered either when timeout or the required acks are satisfied;
* if the callback function itself is already synchronized on some object then pass this object to avoid deadlock.
+ *
+ * Noted that all pending delayed check operations are stored in a queue. All callers to ReplicaManager.appendRecords()
+ * are expected to call ActionQueue.tryCompleteActions for all affected partitions, without holding any conflicting
+ * locks.
*/
def appendRecords(timeout: Long,
requiredAcks: Short,
@@ -585,6 +596,26 @@ class ReplicaManager(val config: KafkaConfig,
result.info.logStartOffset, result.info.recordErrors.asJava, result.info.errorMessage)) // response status
}
+ actionQueue.add {
+ () =>
+ localProduceResults.foreach {
+ case (topicPartition, result) =>
+ val requestKey = TopicPartitionOperationKey(topicPartition)
+ result.info.leaderHwChange match {
+ case LeaderHwChange.Increased =>
+ // some delayed operations may be unblocked after HW changed
+ delayedProducePurgatory.checkAndComplete(requestKey)
+ delayedFetchPurgatory.checkAndComplete(requestKey)
+ delayedDeleteRecordsPurgatory.checkAndComplete(requestKey)
+ case LeaderHwChange.Same =>
+ // probably unblock some follower fetch requests since log end offset has been updated
+ delayedFetchPurgatory.checkAndComplete(requestKey)
+ case LeaderHwChange.None =>
+ // nothing
+ }
+ }
+ }
+
recordConversionStatsCallback(localProduceResults.map { case (k, v) => k -> v.info.recordConversionStats })
if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
diff --git a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
index c7145ce..62ee85d 100644
--- a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
@@ -17,8 +17,8 @@
package kafka.coordinator
-import java.util.{Collections, Random}
import java.util.concurrent.{ConcurrentHashMap, Executors}
+import java.util.{Collections, Random}
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.Lock
@@ -97,7 +97,7 @@ abstract class AbstractCoordinatorConcurrencyTest[M <: CoordinatorMember] {
}
def enableCompletion(): Unit = {
- replicaManager.tryCompleteDelayedRequests()
+ replicaManager.tryCompleteActions()
scheduler.tick()
}
@@ -166,9 +166,8 @@ object AbstractCoordinatorConcurrencyTest {
producePurgatory = new DelayedOperationPurgatory[DelayedProduce]("Produce", timer, 1, reaperEnabled = false)
watchKeys = Collections.newSetFromMap(new ConcurrentHashMap[TopicPartitionOperationKey, java.lang.Boolean]()).asScala
}
- def tryCompleteDelayedRequests(): Unit = {
- watchKeys.map(producePurgatory.checkAndComplete)
- }
+
+ override def tryCompleteActions(): Unit = watchKeys.map(producePurgatory.checkAndComplete)
override def appendRecords(timeout: Long,
requiredAcks: Short,
@@ -204,7 +203,6 @@ object AbstractCoordinatorConcurrencyTest {
val producerRequestKeys = entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
watchKeys ++= producerRequestKeys
producePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
- tryCompleteDelayedRequests()
}
override def getMagic(topicPartition: TopicPartition): Option[Byte] = {
Some(RecordBatch.MAGIC_VALUE_V2)
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
index 72de4a1..1f54bd5 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
@@ -18,6 +18,7 @@
package kafka.coordinator.group
import java.util.Properties
+import java.util.concurrent.locks.{Lock, ReentrantLock}
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import kafka.common.OffsetAndMetadata
@@ -60,16 +61,6 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
new LeaveGroupOperation
)
- private val allOperationsWithTxn = Seq(
- new JoinGroupOperation,
- new SyncGroupOperation,
- new OffsetFetchOperation,
- new CommitTxnOffsetsOperation,
- new CompleteTxnOperation,
- new HeartbeatOperation,
- new LeaveGroupOperation
- )
-
var heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat] = _
var joinPurgatory: DelayedOperationPurgatory[DelayedJoin] = _
var groupCoordinator: GroupCoordinator = _
@@ -119,12 +110,33 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
@Test
def testConcurrentTxnGoodPathSequence(): Unit = {
- verifyConcurrentOperations(createGroupMembers, allOperationsWithTxn)
+ verifyConcurrentOperations(createGroupMembers, Seq(
+ new JoinGroupOperation,
+ new SyncGroupOperation,
+ new OffsetFetchOperation,
+ new CommitTxnOffsetsOperation,
+ new CompleteTxnOperation,
+ new HeartbeatOperation,
+ new LeaveGroupOperation
+ ))
}
@Test
def testConcurrentRandomSequence(): Unit = {
- verifyConcurrentRandomSequences(createGroupMembers, allOperationsWithTxn)
+ /**
+ * handleTxnCommitOffsets does not complete delayed requests now so it causes error if handleTxnCompletion is executed
+ * before completing delayed request. In random mode, we use this global lock to prevent such an error.
+ */
+ val lock = new ReentrantLock()
+ verifyConcurrentRandomSequences(createGroupMembers, Seq(
+ new JoinGroupOperation,
+ new SyncGroupOperation,
+ new OffsetFetchOperation,
+ new CommitTxnOffsetsOperation(lock = Some(lock)),
+ new CompleteTxnOperation(lock = Some(lock)),
+ new HeartbeatOperation,
+ new LeaveGroupOperation
+ ))
}
@Test
@@ -198,6 +210,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
groupCoordinator.handleJoinGroup(member.groupId, member.memberId, None, requireKnownMemberId = false, "clientId", "clientHost",
DefaultRebalanceTimeout, DefaultSessionTimeout,
protocolType, protocols, responseCallback)
+ replicaManager.tryCompleteActions()
}
override def awaitAndVerify(member: GroupMember): Unit = {
val joinGroupResult = await(member, DefaultRebalanceTimeout)
@@ -221,6 +234,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
groupCoordinator.handleSyncGroup(member.groupId, member.generationId, member.memberId,
Some(protocolType), Some(protocolName), member.groupInstanceId, Map.empty[String, Array[Byte]], responseCallback)
}
+ replicaManager.tryCompleteActions()
}
override def awaitAndVerify(member: GroupMember): Unit = {
val result = await(member, DefaultSessionTimeout)
@@ -238,6 +252,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
override def runWithCallback(member: GroupMember, responseCallback: HeartbeatCallback): Unit = {
groupCoordinator.handleHeartbeat(member.groupId, member.memberId,
member.groupInstanceId, member.generationId, responseCallback)
+ replicaManager.tryCompleteActions()
}
override def awaitAndVerify(member: GroupMember): Unit = {
val error = await(member, DefaultSessionTimeout)
@@ -252,6 +267,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
}
override def runWithCallback(member: GroupMember, responseCallback: OffsetFetchCallback): Unit = {
val (error, partitionData) = groupCoordinator.handleFetchOffsets(member.groupId, requireStable = true, None)
+ replicaManager.tryCompleteActions()
responseCallback(error, partitionData)
}
override def awaitAndVerify(member: GroupMember): Unit = {
@@ -271,6 +287,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
val offsets = immutable.Map(tp -> OffsetAndMetadata(1, "", Time.SYSTEM.milliseconds()))
groupCoordinator.handleCommitOffsets(member.groupId, member.memberId,
member.groupInstanceId, member.generationId, offsets, responseCallback)
+ replicaManager.tryCompleteActions()
}
override def awaitAndVerify(member: GroupMember): Unit = {
val offsets = await(member, 500)
@@ -278,7 +295,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
}
}
- class CommitTxnOffsetsOperation extends CommitOffsetsOperation {
+ class CommitTxnOffsetsOperation(lock: Option[Lock] = None) extends CommitOffsetsOperation {
override def runWithCallback(member: GroupMember, responseCallback: CommitOffsetCallback): Unit = {
val tp = new TopicPartition("topic", 0)
val offsets = immutable.Map(tp -> OffsetAndMetadata(1, "", Time.SYSTEM.milliseconds()))
@@ -293,13 +310,17 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
offsetsPartitions.map(_.partition).toSet, isCommit = random.nextBoolean)
responseCallback(errors)
}
- groupCoordinator.handleTxnCommitOffsets(member.group.groupId, producerId, producerEpoch,
- JoinGroupRequest.UNKNOWN_MEMBER_ID, Option.empty, JoinGroupRequest.UNKNOWN_GENERATION_ID,
- offsets, callbackWithTxnCompletion)
+ lock.foreach(_.lock())
+ try {
+ groupCoordinator.handleTxnCommitOffsets(member.group.groupId, producerId, producerEpoch,
+ JoinGroupRequest.UNKNOWN_MEMBER_ID, Option.empty, JoinGroupRequest.UNKNOWN_GENERATION_ID,
+ offsets, callbackWithTxnCompletion)
+ replicaManager.tryCompleteActions()
+ } finally lock.foreach(_.unlock())
}
}
- class CompleteTxnOperation extends GroupOperation[CompleteTxnCallbackParams, CompleteTxnCallback] {
+ class CompleteTxnOperation(lock: Option[Lock] = None) extends GroupOperation[CompleteTxnCallbackParams, CompleteTxnCallback] {
override def responseCallback(responsePromise: Promise[CompleteTxnCallbackParams]): CompleteTxnCallback = {
val callback: CompleteTxnCallback = error => responsePromise.success(error)
callback
@@ -307,9 +328,13 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
override def runWithCallback(member: GroupMember, responseCallback: CompleteTxnCallback): Unit = {
val producerId = 1000L
val offsetsPartitions = (0 to numPartitions).map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, _))
- groupCoordinator.groupManager.handleTxnCompletion(producerId,
- offsetsPartitions.map(_.partition).toSet, isCommit = random.nextBoolean)
- responseCallback(Errors.NONE)
+ lock.foreach(_.lock())
+ try {
+ groupCoordinator.groupManager.handleTxnCompletion(producerId,
+ offsetsPartitions.map(_.partition).toSet, isCommit = random.nextBoolean)
+ responseCallback(Errors.NONE)
+ } finally lock.foreach(_.unlock())
+
}
override def awaitAndVerify(member: GroupMember): Unit = {
val error = await(member, 500)
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
index 1be4969..3788cb1 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
@@ -509,6 +509,7 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
class InitProducerIdOperation(val producerIdAndEpoch: Option[ProducerIdAndEpoch] = None) extends TxnOperation[InitProducerIdResult] {
override def run(txn: Transaction): Unit = {
transactionCoordinator.handleInitProducerId(txn.transactionalId, 60000, producerIdAndEpoch, resultCallback)
+ replicaManager.tryCompleteActions()
}
override def awaitAndVerify(txn: Transaction): Unit = {
val initPidResult = result.getOrElse(throw new IllegalStateException("InitProducerId has not completed"))
@@ -525,6 +526,7 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
txnMetadata.producerEpoch,
partitions,
resultCallback)
+ replicaManager.tryCompleteActions()
}
}
override def awaitAndVerify(txn: Transaction): Unit = {
@@ -597,12 +599,13 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
}
}
txnStateManager.enableTransactionalIdExpiration()
+ replicaManager.tryCompleteActions()
time.sleep(txnConfig.removeExpiredTransactionalIdsIntervalMs + 1)
}
override def await(): Unit = {
val (_, success) = TestUtils.computeUntilTrue({
- replicaManager.tryCompleteDelayedRequests()
+ replicaManager.tryCompleteActions()
transactions.forall(txn => transactionMetadata(txn).isEmpty)
})(identity)
assertTrue("Transaction not expired", success)
diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
index c29dfec..8f481f1 100644
--- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
@@ -33,12 +33,12 @@ import scala.jdk.CollectionConverters._
class DelayedOperationTest {
- var purgatory: DelayedOperationPurgatory[MockDelayedOperation] = null
+ var purgatory: DelayedOperationPurgatory[DelayedOperation] = null
var executorService: ExecutorService = null
@Before
def setUp(): Unit = {
- purgatory = DelayedOperationPurgatory[MockDelayedOperation](purgatoryName = "mock")
+ purgatory = DelayedOperationPurgatory[DelayedOperation](purgatoryName = "mock")
}
@After
@@ -49,6 +49,43 @@ class DelayedOperationTest {
}
@Test
+ def testLockInTryCompleteElseWatch(): Unit = {
+ val op = new DelayedOperation(100000L) {
+ override def onExpiration(): Unit = {}
+ override def onComplete(): Unit = {}
+ override def tryComplete(): Boolean = {
+ assertTrue(lock.asInstanceOf[ReentrantLock].isHeldByCurrentThread)
+ false
+ }
+ override def safeTryComplete(): Boolean = {
+ fail("tryCompleteElseWatch should not use safeTryComplete")
+ super.safeTryComplete()
+ }
+ }
+ purgatory.tryCompleteElseWatch(op, Seq("key"))
+ }
+
+ @Test
+ def testSafeTryCompleteOrElse(): Unit = {
+ def op(shouldComplete: Boolean) = new DelayedOperation(100000L) {
+ override def onExpiration(): Unit = {}
+ override def onComplete(): Unit = {}
+ override def tryComplete(): Boolean = {
+ assertTrue(lock.asInstanceOf[ReentrantLock].isHeldByCurrentThread)
+ shouldComplete
+ }
+ }
+ var pass = false
+ assertFalse(op(false).safeTryCompleteOrElse {
+ pass = true
+ })
+ assertTrue(pass)
+ assertTrue(op(true).safeTryCompleteOrElse {
+ fail("this method should NOT be executed")
+ })
+ }
+
+ @Test
def testRequestSatisfaction(): Unit = {
val r1 = new MockDelayedOperation(100000L)
val r2 = new MockDelayedOperation(100000L)
@@ -193,44 +230,6 @@ class DelayedOperationTest {
}
/**
- * Verify that if there is lock contention between two threads attempting to complete,
- * completion is performed without any blocking in either thread.
- */
- @Test
- def testTryCompleteLockContention(): Unit = {
- executorService = Executors.newSingleThreadExecutor()
- val completionAttemptsRemaining = new AtomicInteger(Int.MaxValue)
- val tryCompleteSemaphore = new Semaphore(1)
- val key = "key"
-
- val op = new MockDelayedOperation(100000L, None, None) {
- override def tryComplete() = {
- val shouldComplete = completionAttemptsRemaining.decrementAndGet <= 0
- tryCompleteSemaphore.acquire()
- try {
- if (shouldComplete)
- forceComplete()
- else
- false
- } finally {
- tryCompleteSemaphore.release()
- }
- }
- }
-
- purgatory.tryCompleteElseWatch(op, Seq(key))
- completionAttemptsRemaining.set(2)
- tryCompleteSemaphore.acquire()
- val future = runOnAnotherThread(purgatory.checkAndComplete(key), shouldComplete = false)
- TestUtils.waitUntilTrue(() => tryCompleteSemaphore.hasQueuedThreads, "Not attempting to complete")
- purgatory.checkAndComplete(key) // this should not block even though lock is not free
- assertFalse("Operation should not have completed", op.isCompleted)
- tryCompleteSemaphore.release()
- future.get(10, TimeUnit.SECONDS)
- assertTrue("Operation should have completed", op.isCompleted)
- }
-
- /**
* Test `tryComplete` with multiple threads to verify that there are no timing windows
* when completion is not performed even if the thread that makes the operation completable
* may not be able to acquire the operation lock. Since it is difficult to test all scenarios,
@@ -280,23 +279,6 @@ class DelayedOperationTest {
ops.foreach { op => assertTrue("Operation should have completed", op.isCompleted) }
}
- @Test
- def testDelayedOperationLock(): Unit = {
- verifyDelayedOperationLock(new MockDelayedOperation(100000L), mismatchedLocks = false)
- }
-
- @Test
- def testDelayedOperationLockOverride(): Unit = {
- def newMockOperation = {
- val lock = new ReentrantLock
- new MockDelayedOperation(100000L, Some(lock), Some(lock))
- }
- verifyDelayedOperationLock(newMockOperation, mismatchedLocks = false)
-
- verifyDelayedOperationLock(new MockDelayedOperation(100000L, None, Some(new ReentrantLock)),
- mismatchedLocks = true)
- }
-
def verifyDelayedOperationLock(mockDelayedOperation: => MockDelayedOperation, mismatchedLocks: Boolean): Unit = {
val key = "key"
executorService = Executors.newSingleThreadExecutor