You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/05/24 18:26:00 UTC

[GitHub] [kafka] mumrah opened a new pull request #10752: KAFKA-12620 Allocate Producer IDs in KRaft controller

mumrah opened a new pull request #10752:
URL: https://github.com/apache/kafka/pull/10752


   This is part 2 of [KIP-730](https://cwiki.apache.org/confluence/display/KAFKA/KIP-730%3A+Producer+ID+generation+in+KRaft+mode), part 1 was in #10504.
   
   This PR adds support on the KRaft controller for handling AllocateProducerIDs requests and managing the state of the latest producer ID block in the controller and committing this state to the metadata log.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10752: KAFKA-12620 Allocate Producer IDs in KRaft controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10752:
URL: https://github.com/apache/kafka/pull/10752#discussion_r639928477



##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -1199,6 +1213,22 @@ public void processBatchEndOffset(long offset) {
         });
     }
 
+    @Override
+    public CompletableFuture<AllocateProducerIdsResponseData> allocateProducerIds(
+            AllocateProducerIdsRequestData request) {
+        return appendWriteEvent("allocateProducerIds",
+            () -> producerIdControlManager.generateNextProducerId(request.brokerId(), request.brokerEpoch()))
+                .thenApply(resultOrError -> {

Review comment:
       I can see the reasoning behind doing it this way, but for all the other RPCs we've just been letting the future complete as an error, and making the caller handle it.  One issue with changing the pattern, I suppose, is that not all the controller functions return an RPC data structure that allows setting an error. So let's keep the current pattern here for now, where caller has to handle the future completing exceptionally.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10752: KAFKA-12620 Allocate Producer IDs in KRaft controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10752:
URL: https://github.com/apache/kafka/pull/10752#discussion_r639919977



##########
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##########
@@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.locks.ReentrantLock
 import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException}
 import java.net.InetAddress
-

Review comment:
       Can we leave this blank line? It seems like the standard is to separate out Java includes. Let's not ping/pong on this




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10752: KAFKA-12620 Allocate Producer IDs in KRaft controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10752:
URL: https://github.com/apache/kafka/pull/10752#discussion_r639922279



##########
File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
##########
@@ -259,6 +260,10 @@ class BrokerMetadataListener(
     clientQuotaManager.handleQuotaRecord(record)
   }
 
+  def handleProducerIdRecord(record: ProducerIdsRecord): Unit = {
+    // no-op

Review comment:
       Can you add a short description of why this is a no-op? For example, "Brokers get their IDs from the RPC, so we don't need this" or something like that. Otherwise it looks like a TODO (I know that's not the intention)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #10752: KAFKA-12620 Allocate Producer IDs in KRaft controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #10752:
URL: https://github.com/apache/kafka/pull/10752#issuecomment-848944712


   Thanks for this, @mumrah . Very clean PR overall.
   
   Can you add a test that the RPC fails if we don't have cluster authorization, in `ControllerApisTest.scala`? Should be very similar to `testUnauthorizedHandleAlterPartitionReassignments`. I think it's a good idea to have something like this for all the controller RPCs to prevent authorization regressions.
   
   Also, QuorumControllerTest#testSnapshotSaveAndLoad needs to be updated... it's failing now in the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #10752: KAFKA-12620 Allocate Producer IDs in KRaft controller

Posted by GitBox <gi...@apache.org>.
cmccabe merged pull request #10752:
URL: https://github.com/apache/kafka/pull/10752


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10752: KAFKA-12620 Allocate Producer IDs in KRaft controller

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10752:
URL: https://github.com/apache/kafka/pull/10752#discussion_r640070688



##########
File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
##########
@@ -1199,6 +1213,22 @@ public void processBatchEndOffset(long offset) {
         });
     }
 
+    @Override
+    public CompletableFuture<AllocateProducerIdsResponseData> allocateProducerIds(
+            AllocateProducerIdsRequestData request) {
+        return appendWriteEvent("allocateProducerIds",
+            () -> producerIdControlManager.generateNextProducerId(request.brokerId(), request.brokerEpoch()))
+                .thenApply(resultOrError -> {

Review comment:
       Sounds fine. I'll change the return type of generateNextProducerId to just be `ControllerResult<ProducerIdBlock>` and throw ApiExceptions directly instead.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10752: KAFKA-12620 Allocate Producer IDs in KRaft controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10752:
URL: https://github.com/apache/kafka/pull/10752#discussion_r639919437



##########
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##########
@@ -246,9 +245,16 @@ class BrokerServer(
 
       // Create transaction coordinator, but don't start it until we've started replica manager.

Review comment:
       This comment makes more sense if positioned next to the line that initializes the `transactionCoordinator`, right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10752: KAFKA-12620 Allocate Producer IDs in KRaft controller

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10752:
URL: https://github.com/apache/kafka/pull/10752#discussion_r639938409



##########
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##########
@@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.locks.ReentrantLock
 import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException}
 import java.net.InetAddress
-

Review comment:
       Sure, I didn't meant to remove this (at least, I don't think I did)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #10752: KAFKA-12620 Allocate Producer IDs in KRaft controller

Posted by GitBox <gi...@apache.org>.
cmccabe merged pull request #10752:
URL: https://github.com/apache/kafka/pull/10752


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org