You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/05/18 20:34:54 UTC
[kafka] branch 1.1 updated: KAFKA-6917;
Process txn completion asynchronously to avoid deadlock (#5036)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push:
new be58d73 KAFKA-6917; Process txn completion asynchronously to avoid deadlock (#5036)
be58d73 is described below
commit be58d7370667e8f0670a3e480674794e6cbb8231
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Fri May 18 21:30:12 2018 +0100
KAFKA-6917; Process txn completion asynchronously to avoid deadlock (#5036)
Reviewers: Guozhang Wang <wa...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
.../kafka/coordinator/group/GroupCoordinator.scala | 11 ++++----
.../coordinator/group/GroupMetadataManager.scala | 30 ++++++++++++++++------
core/src/main/scala/kafka/server/KafkaApis.scala | 2 +-
.../group/GroupCoordinatorConcurrencyTest.scala | 24 +++++++++--------
.../coordinator/group/GroupCoordinatorTest.scala | 23 +++++++++++------
5 files changed, 56 insertions(+), 34 deletions(-)
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 5ae8552..6cc0a41 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -81,8 +81,7 @@ class GroupCoordinator(val brokerId: Int,
*/
def startup(enableMetadataExpiration: Boolean = true) {
info("Starting up.")
- if (enableMetadataExpiration)
- groupManager.enableMetadataExpiration()
+ groupManager.startup(enableMetadataExpiration)
isActive.set(true)
info("Startup complete.")
}
@@ -488,12 +487,12 @@ class GroupCoordinator(val brokerId: Int,
}
}
- def handleTxnCompletion(producerId: Long,
- offsetsPartitions: Iterable[TopicPartition],
- transactionResult: TransactionResult) {
+ def scheduleHandleTxnCompletion(producerId: Long,
+ offsetsPartitions: Iterable[TopicPartition],
+ transactionResult: TransactionResult) {
require(offsetsPartitions.forall(_.topic == Topic.GROUP_METADATA_TOPIC_NAME))
val isCommit = transactionResult == TransactionResult.COMMIT
- groupManager.handleTxnCompletion(producerId, offsetsPartitions.map(_.partition).toSet, isCommit)
+ groupManager.scheduleHandleTxnCompletion(producerId, offsetsPartitions.map(_.partition).toSet, isCommit)
}
private def doCommitOffsets(group: GroupMetadata,
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 7d4fe03..44a9369 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -135,13 +135,14 @@ class GroupMetadataManager(brokerId: Int,
})
})
- def enableMetadataExpiration() {
+ def startup(enableMetadataExpiration: Boolean) {
scheduler.startup()
-
- scheduler.schedule(name = "delete-expired-group-metadata",
- fun = cleanupGroupMetadata,
- period = config.offsetsRetentionCheckIntervalMs,
- unit = TimeUnit.MILLISECONDS)
+ if (enableMetadataExpiration) {
+ scheduler.schedule(name = "delete-expired-group-metadata",
+ fun = cleanupGroupMetadata,
+ period = config.offsetsRetentionCheckIntervalMs,
+ unit = TimeUnit.MILLISECONDS)
+ }
}
def currentGroups: Iterable[GroupMetadata] = groupMetadataCache.values
@@ -794,7 +795,20 @@ class GroupMetadataManager(brokerId: Int,
info(s"Removed $offsetsRemoved expired offsets in ${time.milliseconds() - startMs} milliseconds.")
}
- def handleTxnCompletion(producerId: Long, completedPartitions: Set[Int], isCommit: Boolean) {
+ /**
+ * Complete pending transactional offset commits of the groups of `producerId` from the provided
+ * `completedPartitions`. This method is invoked when a commit or abort marker is fully written
+ * to the log. It may be invoked when a group lock is held by the caller, for instance when delayed
+ * operations are completed while appending offsets for a group. Since we need to acquire one or
+ * more group metadata locks to handle transaction completion, this operation is scheduled on
+ * the scheduler thread to avoid deadlocks.
+ */
+ def scheduleHandleTxnCompletion(producerId: Long, completedPartitions: Set[Int], isCommit: Boolean): Unit = {
+ scheduler.schedule(s"handleTxnCompletion-$producerId", () =>
+ handleTxnCompletion(producerId, completedPartitions, isCommit))
+ }
+
+ private[group] def handleTxnCompletion(producerId: Long, completedPartitions: Set[Int], isCommit: Boolean): Unit = {
val pendingGroups = groupsBelongingToPartitions(producerId, completedPartitions)
pendingGroups.foreach { case (groupId) =>
getGroup(groupId) match {
@@ -803,7 +817,7 @@ class GroupMetadataManager(brokerId: Int,
group.completePendingTxnOffsetCommit(producerId, isCommit)
removeProducerGroup(producerId, groupId)
}
- }
+ }
case _ =>
info(s"Group $groupId has moved away from $brokerId after transaction marker was written but before the " +
s"cache was updated. The cache on the new group owner will be updated instead.")
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 9e79afa..5d68c98 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1623,7 +1623,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// as soon as the end transaction marker has been written for a transactional offset commit,
// call to the group coordinator to materialize the offsets into the cache
try {
- groupCoordinator.handleTxnCompletion(producerId, successfulOffsetsPartitions, result)
+ groupCoordinator.scheduleHandleTxnCompletion(producerId, successfulOffsetsPartitions, result)
} catch {
case e: Exception =>
error(s"Received an exception while trying to update the offsets cache on transaction marker append", e)
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
index 44e1356..befd22a 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
@@ -27,7 +27,7 @@ import kafka.server.{ DelayedOperationPurgatory, KafkaConfig }
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{ JoinGroupRequest, TransactionResult }
+import org.apache.kafka.common.requests.JoinGroupRequest
import org.easymock.EasyMock
import org.junit.Assert._
import org.junit.{ After, Before, Test }
@@ -117,7 +117,6 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
verifyConcurrentRandomSequences(createGroupMembers, allOperationsWithTxn)
}
-
abstract class GroupOperation[R, C] extends Operation {
val responseFutures = new ConcurrentHashMap[GroupMember, Future[R]]()
@@ -228,8 +227,17 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
val offsets = immutable.Map(tp -> OffsetAndMetadata(1))
val producerId = 1000L
val producerEpoch : Short = 2
+ // When transaction offsets are appended to the log, transactions may be scheduled for
+ // completion. Since group metadata locks are acquired for transaction completion, include
+ // this in the callback to test that there are no deadlocks.
+ def callbackWithTxnCompletion(errors: Map[TopicPartition, Errors]): Unit = {
+ val offsetsPartitions = (0 to numPartitions).map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, _))
+ groupCoordinator.groupManager.scheduleHandleTxnCompletion(producerId,
+ offsetsPartitions.map(_.partition).toSet, isCommit = random.nextBoolean)
+ responseCallback(errors)
+ }
groupCoordinator.handleTxnCommitOffsets(member.group.groupId,
- producerId, producerEpoch, offsets, responseCallback)
+ producerId, producerEpoch, offsets, callbackWithTxnCompletion)
}
}
@@ -241,19 +249,14 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
override def runWithCallback(member: GroupMember, responseCallback: CompleteTxnCallback): Unit = {
val producerId = 1000L
val offsetsPartitions = (0 to numPartitions).map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, _))
- groupCoordinator.handleTxnCompletion(producerId, offsetsPartitions, transactionResult(member.group.groupId))
+ groupCoordinator.groupManager.handleTxnCompletion(producerId,
+ offsetsPartitions.map(_.partition).toSet, isCommit = random.nextBoolean)
responseCallback(Errors.NONE)
}
override def awaitAndVerify(member: GroupMember): Unit = {
val error = await(member, 500)
assertEquals(Errors.NONE, error)
}
- // Test both commit and abort. Group ids used in the test have the format <prefix><index>
- // Use the last digit of the index to decide between commit and abort.
- private def transactionResult(groupId: String): TransactionResult = {
- val lastDigit = groupId(groupId.length - 1).toInt
- if (lastDigit % 2 == 0) TransactionResult.COMMIT else TransactionResult.ABORT
- }
}
class LeaveGroupOperation extends GroupOperation[LeaveGroupCallbackParams, LeaveGroupCallback] {
@@ -273,7 +276,6 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
object GroupCoordinatorConcurrencyTest {
-
type JoinGroupCallback = JoinGroupResult => Unit
type SyncGroupCallbackParams = (Array[Byte], Errors)
type SyncGroupCallback = (Array[Byte], Errors) => Unit
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 2c9e81d..8529bf9 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -841,7 +841,7 @@ class GroupCoordinatorTest extends JUnitSuite {
val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
// Send commit marker.
- groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.COMMIT)
+ handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.COMMIT)
// Validate that committed offset is materialized.
val (secondReqError, secondReqPartitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
@@ -866,7 +866,7 @@ class GroupCoordinatorTest extends JUnitSuite {
val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
// Validate that the pending commit is discarded.
- groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.ABORT)
+ handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.ABORT)
val (secondReqError, secondReqPartitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
assertEquals(Errors.NONE, secondReqError)
@@ -888,14 +888,14 @@ class GroupCoordinatorTest extends JUnitSuite {
assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData.get(tp).map(_.offset))
val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
- groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.ABORT)
+ handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.ABORT)
val (secondReqError, secondReqPartitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
assertEquals(Errors.NONE, secondReqError)
assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), secondReqPartitionData.get(tp).map(_.offset))
// Ignore spurious commit.
- groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.COMMIT)
+ handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.COMMIT)
val (thirdReqError, thirdReqPartitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
assertEquals(Errors.NONE, thirdReqError)
@@ -932,7 +932,7 @@ class GroupCoordinatorTest extends JUnitSuite {
assertEquals(Errors.NONE, commitOffsetResults(1)(partitions(1)))
// We got a commit for only one __consumer_offsets partition. We should only materialize it's group offsets.
- groupCoordinator.handleTxnCompletion(producerId, List(offsetTopicPartitions(0)), TransactionResult.COMMIT)
+ handleTxnCompletion(producerId, List(offsetTopicPartitions(0)), TransactionResult.COMMIT)
groupCoordinator.handleFetchOffsets(groupIds(0), Some(partitions)) match {
case (error, partData) =>
errors.append(error)
@@ -958,7 +958,7 @@ class GroupCoordinatorTest extends JUnitSuite {
assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData(1).get(partitions(1)).map(_.offset))
// Now we receive the other marker.
- groupCoordinator.handleTxnCompletion(producerId, List(offsetTopicPartitions(1)), TransactionResult.COMMIT)
+ handleTxnCompletion(producerId, List(offsetTopicPartitions(1)), TransactionResult.COMMIT)
errors.clear()
partitionData.clear()
groupCoordinator.handleFetchOffsets(groupIds(0), Some(partitions)) match {
@@ -1008,7 +1008,7 @@ class GroupCoordinatorTest extends JUnitSuite {
assertEquals(Errors.NONE, commitOffsetResults(1)(partitions(1)))
// producer0 commits its transaction.
- groupCoordinator.handleTxnCompletion(producerIds(0), List(offsetTopicPartition), TransactionResult.COMMIT)
+ handleTxnCompletion(producerIds(0), List(offsetTopicPartition), TransactionResult.COMMIT)
groupCoordinator.handleFetchOffsets(groupId, Some(partitions)) match {
case (error, partData) =>
errors.append(error)
@@ -1023,7 +1023,7 @@ class GroupCoordinatorTest extends JUnitSuite {
assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData(0).get(partitions(1)).map(_.offset))
// producer1 now commits its transaction.
- groupCoordinator.handleTxnCompletion(producerIds(1), List(offsetTopicPartition), TransactionResult.COMMIT)
+ handleTxnCompletion(producerIds(1), List(offsetTopicPartition), TransactionResult.COMMIT)
groupCoordinator.handleFetchOffsets(groupId, Some(partitions)) match {
case (error, partData) =>
@@ -1629,4 +1629,11 @@ class GroupCoordinatorTest extends JUnitSuite {
Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
}
+ def handleTxnCompletion(producerId: Long,
+ offsetsPartitions: Iterable[TopicPartition],
+ transactionResult: TransactionResult): Unit = {
+ val isCommit = transactionResult == TransactionResult.COMMIT
+ groupCoordinator.groupManager.handleTxnCompletion(producerId, offsetsPartitions.map(_.partition).toSet, isCommit)
+ }
+
}
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.