You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/14 11:38:39 UTC

[GitHub] [kafka] dajac opened a new pull request, #12848: KAFKA-14367; Add `Heartbeat` to the new `GroupCoordinator` interface

dajac opened a new pull request, #12848:
URL: https://github.com/apache/kafka/pull/12848

   This patch adds `heartbeat` to the new `GroupCoordinator` interface and updates `KafkaApis` to use it.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12848: KAFKA-14367; Add `Heartbeat` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12848:
URL: https://github.com/apache/kafka/pull/12848#discussion_r1035647825


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -2806,6 +2806,100 @@ class KafkaApisTest {
     }
   }
 
+  @ParameterizedTest

Review Comment:
   Yeah... At least no unit tests here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12848: KAFKA-14367; Add `Heartbeat` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12848:
URL: https://github.com/apache/kafka/pull/12848#discussion_r1037177201


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1764,42 +1764,38 @@ class KafkaApis(val requestChannel: RequestChannel,
     })
   }
 
-  def handleHeartbeatRequest(request: RequestChannel.Request): Unit = {
+  def handleHeartbeatRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val heartbeatRequest = request.body[HeartbeatRequest]
 
-    // the callback for sending a heartbeat response
-    def sendResponseCallback(error: Errors): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        val response = new HeartbeatResponse(
-          new HeartbeatResponseData()
-            .setThrottleTimeMs(requestThrottleMs)
-            .setErrorCode(error.code))
-        trace("Sending heartbeat response %s for correlation id %d to client %s."
-          .format(response, request.header.correlationId, request.header.clientId))
+    def sendResponse(response: AbstractResponse): Unit = {
+      trace("Sending heartbeat response %s for correlation id %d to client %s."

Review Comment:
   I agree that it makes sense to remove it as we have the request log. Let me remove them everywhere.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12848: KAFKA-14367; Add `Heartbeat` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12848:
URL: https://github.com/apache/kafka/pull/12848#discussion_r1036608643


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1764,42 +1764,38 @@ class KafkaApis(val requestChannel: RequestChannel,
     })
   }
 
-  def handleHeartbeatRequest(request: RequestChannel.Request): Unit = {
+  def handleHeartbeatRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val heartbeatRequest = request.body[HeartbeatRequest]
 
-    // the callback for sending a heartbeat response
-    def sendResponseCallback(error: Errors): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        val response = new HeartbeatResponse(
-          new HeartbeatResponseData()
-            .setThrottleTimeMs(requestThrottleMs)
-            .setErrorCode(error.code))
-        trace("Sending heartbeat response %s for correlation id %d to client %s."
-          .format(response, request.header.correlationId, request.header.clientId))
+    def sendResponse(response: AbstractResponse): Unit = {
+      trace("Sending heartbeat response %s for correlation id %d to client %s."

Review Comment:
   Can we drop this? If trace is enabled, we should have request logging, which is more verbose.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac merged pull request #12848: KAFKA-14367; Add `Heartbeat` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac merged PR #12848:
URL: https://github.com/apache/kafka/pull/12848


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #12848: KAFKA-14367; Add `Heartbeat` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
dajac commented on code in PR #12848:
URL: https://github.com/apache/kafka/pull/12848#discussion_r1037178096


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1764,42 +1764,38 @@ class KafkaApis(val requestChannel: RequestChannel,
     })
   }
 
-  def handleHeartbeatRequest(request: RequestChannel.Request): Unit = {
+  def handleHeartbeatRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val heartbeatRequest = request.body[HeartbeatRequest]
 
-    // the callback for sending a heartbeat response
-    def sendResponseCallback(error: Errors): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        val response = new HeartbeatResponse(
-          new HeartbeatResponseData()
-            .setThrottleTimeMs(requestThrottleMs)
-            .setErrorCode(error.code))
-        trace("Sending heartbeat response %s for correlation id %d to client %s."
-          .format(response, request.header.correlationId, request.header.clientId))
+    def sendResponse(response: AbstractResponse): Unit = {
+      trace("Sending heartbeat response %s for correlation id %d to client %s."
+        .format(response, request.header.correlationId, request.header.clientId))
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {

Review Comment:
   I may address this separately as I have a +1 from Jason for this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12848: KAFKA-14367; Add `Heartbeat` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12848:
URL: https://github.com/apache/kafka/pull/12848#discussion_r1026784250


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -2806,6 +2806,100 @@ class KafkaApisTest {
     }
   }
 
+  @ParameterizedTest

Review Comment:
   Were there previously no tests for this api? 😅 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12848: KAFKA-14367; Add `Heartbeat` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jeffkbkim commented on code in PR #12848:
URL: https://github.com/apache/kafka/pull/12848#discussion_r1036332346


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1764,42 +1764,38 @@ class KafkaApis(val requestChannel: RequestChannel,
     })
   }
 
-  def handleHeartbeatRequest(request: RequestChannel.Request): Unit = {
+  def handleHeartbeatRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val heartbeatRequest = request.body[HeartbeatRequest]
 
-    // the callback for sending a heartbeat response
-    def sendResponseCallback(error: Errors): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        val response = new HeartbeatResponse(
-          new HeartbeatResponseData()
-            .setThrottleTimeMs(requestThrottleMs)
-            .setErrorCode(error.code))
-        trace("Sending heartbeat response %s for correlation id %d to client %s."
-          .format(response, request.header.correlationId, request.header.clientId))
+    def sendResponse(response: AbstractResponse): Unit = {
+      trace("Sending heartbeat response %s for correlation id %d to client %s."
+        .format(response, request.header.correlationId, request.header.clientId))
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {

Review Comment:
   i'm assuming we'll be rebasing to use the new sendResponse right? like in https://github.com/apache/kafka/pull/12847/commits/57406651dfa3c1536e184865002e7340b7c69d55#diff-cc056b4960ededba37a438b1454f0f3c5ff5e8ad5e6d2ec9a08e813ca056ffebR1666



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12848: KAFKA-14367; Add `Heartbeat` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12848:
URL: https://github.com/apache/kafka/pull/12848#discussion_r1036618463


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1764,42 +1764,38 @@ class KafkaApis(val requestChannel: RequestChannel,
     })
   }
 
-  def handleHeartbeatRequest(request: RequestChannel.Request): Unit = {
+  def handleHeartbeatRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val heartbeatRequest = request.body[HeartbeatRequest]
 
-    // the callback for sending a heartbeat response
-    def sendResponseCallback(error: Errors): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        val response = new HeartbeatResponse(
-          new HeartbeatResponseData()
-            .setThrottleTimeMs(requestThrottleMs)
-            .setErrorCode(error.code))
-        trace("Sending heartbeat response %s for correlation id %d to client %s."
-          .format(response, request.header.correlationId, request.header.clientId))
+    def sendResponse(response: AbstractResponse): Unit = {
+      trace("Sending heartbeat response %s for correlation id %d to client %s."

Review Comment:
   Oops. We added trace logs to all the requests.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1764,42 +1764,38 @@ class KafkaApis(val requestChannel: RequestChannel,
     })
   }
 
-  def handleHeartbeatRequest(request: RequestChannel.Request): Unit = {
+  def handleHeartbeatRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val heartbeatRequest = request.body[HeartbeatRequest]
 
-    // the callback for sending a heartbeat response
-    def sendResponseCallback(error: Errors): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        val response = new HeartbeatResponse(
-          new HeartbeatResponseData()
-            .setThrottleTimeMs(requestThrottleMs)
-            .setErrorCode(error.code))
-        trace("Sending heartbeat response %s for correlation id %d to client %s."
-          .format(response, request.header.correlationId, request.header.clientId))
+    def sendResponse(response: AbstractResponse): Unit = {
+      trace("Sending heartbeat response %s for correlation id %d to client %s."

Review Comment:
   Oops. We added trace logs to all the responses
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #12848: KAFKA-14367; Add `Heartbeat` to the new `GroupCoordinator` interface

Posted by GitBox <gi...@apache.org>.
jolshan commented on code in PR #12848:
URL: https://github.com/apache/kafka/pull/12848#discussion_r1036350719


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1764,42 +1764,38 @@ class KafkaApis(val requestChannel: RequestChannel,
     })
   }
 
-  def handleHeartbeatRequest(request: RequestChannel.Request): Unit = {
+  def handleHeartbeatRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
     val heartbeatRequest = request.body[HeartbeatRequest]
 
-    // the callback for sending a heartbeat response
-    def sendResponseCallback(error: Errors): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        val response = new HeartbeatResponse(
-          new HeartbeatResponseData()
-            .setThrottleTimeMs(requestThrottleMs)
-            .setErrorCode(error.code))
-        trace("Sending heartbeat response %s for correlation id %d to client %s."
-          .format(response, request.header.correlationId, request.header.clientId))
+    def sendResponse(response: AbstractResponse): Unit = {
+      trace("Sending heartbeat response %s for correlation id %d to client %s."
+        .format(response, request.header.correlationId, request.header.clientId))
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {

Review Comment:
   Yeah -- I left this comment on all the other PRs. David said he would rebase.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org