You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/19 00:30:06 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

hachikuji commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r426944679



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -369,6 +369,31 @@ class GroupCoordinator(val brokerId: Int,
     }
   }
 
+  /**
+   * try to complete produce, fetch and delete requests if the HW of partition is incremented. Otherwise, we try to complete
+   * only delayed fetch requests.
+   *
+   * Noted that this method may hold a lot of group lock so the caller should NOT hold any group lock
+   * in order to avoid deadlock
+   * @param topicPartitions topic partition and leaderHWIncremented
+   */
+  private[this] def completeDelayedRequests(topicPartitions: Map[TopicPartition, Boolean]): Unit =
+    topicPartitions.foreach {
+      case (tp, leaderHWIncremented) =>
+        if (leaderHWIncremented) groupManager.replicaManager.completeDelayedRequests(tp)
+        else groupManager.replicaManager.completeDelayedFetchRequests(tp)
+    }
+
+  /**
+   * complete the delayed join requests associated to input group keys.
+   *
+   * Noted that this method may hold a lot of group lock so the caller should NOT hold any group lock
+   * in order to avoid deadlock
+   * @param groupKeys group keys to complete
+   */
+  private[this] def completeDelayedJoinRequests(groupKeys: Set[GroupKey]): Unit =
+    groupKeys.foreach(joinPurgatory.checkAndComplete)

Review comment:
       Hmm.. Does the group purgatory suffer from the same deadlock potential? If we call `checkAndComplete` for a group "foo," I don't think we would attempt completion for any other group.

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -970,7 +970,16 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int): LogAppendInfo = {
+  /**
+   * @param completeDelayedRequests It may requires a bunch of group locks when completing delayed requests so it may
+   *                                produce deadlock if caller already holds a group lock. Hence, caller should pass
+   *                                false to disable completion and then complete the delayed requests after releasing
+   *                                held group lock
+   */
+  def appendRecordsToLeader(records: MemoryRecords,
+                            origin: AppendOrigin,
+                            requiredAcks: Int,
+                            completeDelayedRequests: Boolean): LogAppendResult = {

Review comment:
       Currently we have a somewhat convoluted model where `ReplicaManager` creates delayed operations, but we depend on lower level components like `Partition` to be aware of them and complete them. This breaks encapsulation. 
   
   Not something we should try to complete in this PR, but as an eventual goal, I think we can consider trying to factor delayed operations out of `Partition` so that they can be managed by `ReplicaManager` exclusively. If you assume that is the end state, then we could drop `completeDelayedRequests` and let `ReplicaManager` _always_ be responsible for checking delayed operations after appending to the log. 
   
   Other than `ReplicaManager`, the only caller of this method is `GroupMetadataManager` which uses it during offset expiration. I think the only reason we do this is because we didn't want to waste purgatory space. I don't think that's a good enough reason to go outside the normal flow. It would be simpler to follow the same path. Potentially we could make the callback an `Option` so that we still have a way to avoid polluting the purgatory.

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -769,20 +813,25 @@ class GroupCoordinator(val brokerId: Int,
             // on heartbeat response to eventually notify the rebalance in progress signal to the consumer
             val member = group.get(memberId)
             completeAndScheduleNextHeartbeatExpiration(group, member)
-            groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback)
+            partitionsToComplete ++= groupManager.storeOffsets(
+              group = group,
+              consumerId = memberId,
+              offsetMetadata = offsetMetadata,
+              responseCallback = responseCallback,
+              completeDelayedRequests = false)
 
           case CompletingRebalance =>
             // We should not receive a commit request if the group has not completed rebalance;
             // but since the consumer's member.id and generation is valid, it means it has received
             // the latest group generation information from the JoinResponse.
             // So let's return a REBALANCE_IN_PROGRESS to let consumer handle it gracefully.
             responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.REBALANCE_IN_PROGRESS })
-
           case _ =>
             throw new RuntimeException(s"Logic error: unexpected group state ${group.currentState}")
         }
       }
     }
+    completeDelayedRequests(partitionsToComplete)

Review comment:
       For reference, here are links to two alternative approaches that I considered earlier this year:
   
   - Async completion: https://github.com/hachikuji/kafka/commit/3bee4acc442f09e95d60bbb7ce9eb246310dcb63
   - Lock-safe offset cache: https://github.com/hachikuji/kafka/commit/3705f3303422915020c317dcee08d6c176977d63
   
   I think Jun was not satisfied with the first approach because it called for another thread pool. Its advantage though was a simpler and more intuitive API than what we have here. An idea which I never implemented was to let the request handlers also handle delayed operation completion so that we did not need another thread pool. Basically rather than calling the callback in `DelayedProduce` directly, we add a new operation to the request queues. Obviously this has its own tradeoffs. 
   
   The second commit tries to use lock-free data structures so that we do not need the lock when completing the callback. This was only a partial solution which handled offset commit appends, but not group metadata appends. I am not sure how to handle join group completion asynchronously, so I gave up on this idea.
   
   Only posting in case it's useful to see how some of these alternatives might have looked. I'm ok with the approach here, but I do wish we could come up with a simpler API. One thought I had is whether we could make the need for external completion more explicit. For example, maybe `appendRecords` could return some kind of object which encapsulates purgatory completion.
   
   ```scala
   val completion = inLock(lock) {
     replicaManager.appendRecords(...)
   }
   completion.run()
   ```
   Just a thought.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org