You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/04/08 18:02:45 UTC

[GitHub] [kafka] mumrah opened a new pull request #10504: KAFKA-12620 Allocate producer ids on the controller

mumrah opened a new pull request #10504:
URL: https://github.com/apache/kafka/pull/10504


   This is part of the implementation for [KIP-730](https://cwiki.apache.org/confluence/display/KAFKA/KIP-730%3A+Producer+ID+generation+in+KRaft+mode) which is still under discussion. The details of the implementation may change to reflect changes in the KIP.
   
   **DO NOT MERGE**
   
   This change adds a new AllocateProducerIds RPC which is used by the broker to ask for a block of producer IDs from the controller. In particular, this PR only includes changes for the broker and zookeeper-based controller.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10504: KAFKA-12620 Allocate producer ids on the controller

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#discussion_r636104630



##########
File path: core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
##########
@@ -75,87 +61,179 @@ trait ProducerIdGenerator {
   def shutdown() : Unit = {}
 }
 
-class ProducerIdManager(val brokerId: Int, val zkClient: KafkaZkClient) extends ProducerIdGenerator with Logging {
-
-  this.logIdent = "[ProducerId Manager " + brokerId + "]: "
-
-  private var currentProducerIdBlock: ProducerIdBlock = null
-  private var nextProducerId: Long = -1L
-
-  // grab the first block of producerIds
-  this synchronized {
-    getNewProducerIdBlock()
-    nextProducerId = currentProducerIdBlock.blockStartId
-  }
-
-  private def getNewProducerIdBlock(): Unit = {
+object ZkProducerIdManager {
+  def getNewProducerIdBlock(brokerId: Int, zkClient: KafkaZkClient, logger: Logging): ProducerIdsBlock = {
+    // Get or create the existing PID block from ZK and attempt to update it. We retry in a loop here since other
+    // brokers may be generating PID blocks during a rolling upgrade
     var zkWriteComplete = false
     while (!zkWriteComplete) {
       // refresh current producerId block from zookeeper again
       val (dataOpt, zkVersion) = zkClient.getDataAndVersion(ProducerIdBlockZNode.path)
 
       // generate the new producerId block
-      currentProducerIdBlock = dataOpt match {
+      val newProducerIdBlock = dataOpt match {
         case Some(data) =>
-          val currProducerIdBlock = ProducerIdManager.parseProducerIdBlockData(data)
-          debug(s"Read current producerId block $currProducerIdBlock, Zk path version $zkVersion")
+          val currProducerIdBlock = ProducerIdBlockZNode.parseProducerIdBlockData(data)
+          logger.debug(s"Read current producerId block $currProducerIdBlock, Zk path version $zkVersion")
 
-          if (currProducerIdBlock.blockEndId > Long.MaxValue - ProducerIdManager.PidBlockSize) {
+          if (currProducerIdBlock.producerIdEnd > Long.MaxValue - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) {
             // we have exhausted all producerIds (wow!), treat it as a fatal error
-            fatal(s"Exhausted all producerIds as the next block's end producerId is will has exceeded long type limit (current block end producerId is ${currProducerIdBlock.blockEndId})")
+            logger.fatal(s"Exhausted all producerIds as the next block's end producerId is will has exceeded long type limit (current block end producerId is ${currProducerIdBlock.producerIdEnd})")
             throw new KafkaException("Have exhausted all producerIds.")
           }
 
-          ProducerIdBlock(brokerId, currProducerIdBlock.blockEndId + 1L, currProducerIdBlock.blockEndId + ProducerIdManager.PidBlockSize)
+          new ProducerIdsBlock(brokerId, currProducerIdBlock.producerIdEnd + 1L, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
         case None =>
-          debug(s"There is no producerId block yet (Zk path version $zkVersion), creating the first block")
-          ProducerIdBlock(brokerId, 0L, ProducerIdManager.PidBlockSize - 1)
+          logger.debug(s"There is no producerId block yet (Zk path version $zkVersion), creating the first block")
+          new ProducerIdsBlock(brokerId, 0L, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
       }
 
-      val newProducerIdBlockData = ProducerIdManager.generateProducerIdBlockJson(currentProducerIdBlock)
+      val newProducerIdBlockData = ProducerIdBlockZNode.generateProducerIdBlockJson(newProducerIdBlock)
 
       // try to write the new producerId block into zookeeper
-      val (succeeded, version) = zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path,
-        newProducerIdBlockData, zkVersion, Some(checkProducerIdBlockZkData))
+      val (succeeded, version) = zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path, newProducerIdBlockData, zkVersion, None)
       zkWriteComplete = succeeded
 
-      if (zkWriteComplete)
-        info(s"Acquired new producerId block $currentProducerIdBlock by writing to Zk with path version $version")
+      if (zkWriteComplete) {
+        logger.info(s"Acquired new producerId block $newProducerIdBlock by writing to Zk with path version $version")
+        return newProducerIdBlock
+      }
     }
+    throw new IllegalStateException()
   }
+}
 
-  private def checkProducerIdBlockZkData(zkClient: KafkaZkClient, path: String, expectedData: Array[Byte]): (Boolean, Int) = {
-    try {
-      val expectedPidBlock = ProducerIdManager.parseProducerIdBlockData(expectedData)
-      zkClient.getDataAndVersion(ProducerIdBlockZNode.path) match {
-        case (Some(data), zkVersion) =>
-          val currProducerIdBLock = ProducerIdManager.parseProducerIdBlockData(data)
-          (currProducerIdBLock == expectedPidBlock, zkVersion)
-        case (None, _) => (false, -1)
-      }
-    } catch {
-      case e: Exception =>
-        warn(s"Error while checking for producerId block Zk data on path $path: expected data " +
-          s"${new String(expectedData, StandardCharsets.UTF_8)}", e)
-        (false, -1)
-    }
+class ZkProducerIdManager(brokerId: Int,
+                          zkClient: KafkaZkClient) extends ProducerIdGenerator with Logging {
+
+  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
+  private var nextProducerId: Long = -1L
+
+  // grab the first block of producerIds
+  this synchronized {
+    getNewProducerIdBlock()
+    nextProducerId = currentProducerIdBlock.producerIdStart
+  }
+
+  private def getNewProducerIdBlock(): Unit = {

Review comment:
       I don't actually need to set it to `currentProducerIdBlock.producerIdStart` since it's already initialized to -1 above. I'll remove that. 
   
   I'll add the `synchronized` here as well




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10504: KAFKA-12620 Allocate producer ids on the controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#discussion_r630569991



##########
File path: clients/src/main/resources/common/message/AllocateProducerIdsResponse.json
##########
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 67,
+  "type": "response",
+  "name": "AllocateProducerIdsResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
+      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
+    { "name": "ErrorCode", "type": "int16", "versions": "0+",
+      "about": "The top level response error code" },
+    { "name": "ProducerIdStart", "type": "int64", "versions": "0+",

Review comment:
       I think this should have entity type `producerId`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #10504: KAFKA-12620 Allocate producer ids on the controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#issuecomment-839261742


   We need to gate the new behavior behind an IBP bump, right?  Otherwise we won't have access to new producer IDs while rolling a cluster.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10504: KAFKA-12620 Allocate producer ids on the controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#discussion_r635532912



##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -2376,6 +2375,82 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  def allocateProducerIds(allocateProducerIdsRequest: AllocateProducerIdsRequestData,
+                          callback: AllocateProducerIdsResponseData => Unit): Unit = {
+
+    def eventManagerCallback(results: Either[Errors, ProducerIdsBlock]): Unit = {
+      results match {
+        case Left(error) => callback.apply(new AllocateProducerIdsResponseData().setErrorCode(error.code))
+        case Right(pidBlock) => callback.apply(
+          new AllocateProducerIdsResponseData()
+            .setProducerIdStart(pidBlock.producerIdStart())
+            .setProducerIdLen(pidBlock.producerIdLen()))
+      }
+    }
+    eventManager.put(AllocateProducerIds(allocateProducerIdsRequest.brokerId,
+      allocateProducerIdsRequest.brokerEpoch, eventManagerCallback))
+  }
+
+  def processAllocateProducerIds(brokerId: Int, brokerEpoch: Long, callback: Either[Errors, ProducerIdsBlock] => Unit): Unit = {
+    // Handle a few short-circuits
+    if (!isActive) {
+      callback.apply(Left(Errors.NOT_CONTROLLER))
+      return
+    }
+
+    val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
+    if (brokerEpochOpt.isEmpty) {
+      warn(s"Ignoring AllocateProducerIds due to unknown broker $brokerId")
+      callback.apply(Left(Errors.STALE_BROKER_EPOCH))

Review comment:
       The quorum controller and the zk controller are two implementations of the same API protocol. It is always fine (in fact, it's necessary) to reuse error codes for both where appropriate.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10504: KAFKA-12620 Allocate producer ids on the controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#discussion_r635585021



##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -330,10 +332,21 @@ class KafkaServer(
         groupCoordinator = GroupCoordinator(config, replicaManager, Time.SYSTEM, metrics)
         groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions))
 
+        /* create producer ids manager */
+        val producerIdManager = if (config.interBrokerProtocolVersion.isAllocateProducerIdsSupported) {
+          ProducerIdGenerator(

Review comment:
       It seems a bit weird that both function calls both have the same name. Can't we just create the subclass we want directly here, for clarity? One is ZkWhatever, the other is ForwardingWhatever, etc.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah merged pull request #10504: KAFKA-12620 Allocate producer ids on the controller

Posted by GitBox <gi...@apache.org>.
mumrah merged pull request #10504:
URL: https://github.com/apache/kafka/pull/10504


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10504: KAFKA-12620 Allocate producer ids on the controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#discussion_r635572503



##########
File path: core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
##########
@@ -248,5 +261,17 @@ public void stop() {
                 clusterReference.get().tearDown();
             }
         }
+
+        @Override
+        public void rollingBrokerRestart() {
+            if (started.get()) {
+                for (int i = 0; i < clusterReference.get().brokerCount(); i++) {
+                    clusterReference.get().killBroker(i);
+                }
+                clusterReference.get().restartDeadBrokers(true);
+            } else {

Review comment:
       It would be a bit nicer to move this section to the front, to get rid of the nesting here.

##########
File path: core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
##########
@@ -248,5 +261,17 @@ public void stop() {
                 clusterReference.get().tearDown();
             }
         }
+
+        @Override
+        public void rollingBrokerRestart() {
+            if (started.get()) {
+                for (int i = 0; i < clusterReference.get().brokerCount(); i++) {
+                    clusterReference.get().killBroker(i);
+                }
+                clusterReference.get().restartDeadBrokers(true);
+            } else {

Review comment:
       It would be a bit nicer to move this section to the beginning, to get rid of the nesting here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10504: KAFKA-12620 Allocate producer ids on the controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#discussion_r635572907



##########
File path: core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
##########
@@ -248,5 +261,17 @@ public void stop() {
                 clusterReference.get().tearDown();
             }
         }
+
+        @Override
+        public void rollingBrokerRestart() {
+            if (started.get()) {
+                for (int i = 0; i < clusterReference.get().brokerCount(); i++) {

Review comment:
       It seems like there should be a log message here before we kill the broker, to make it easier to see what is going on?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #10504: KAFKA-12620 Allocate producer ids on the controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#issuecomment-846213905


   > Thanks for grabbing the output @cmccabe. Its possible that something other than the test is getting producer IDs during the test, or perhaps there is an occasional ZK race or other error causing a new block to be allocated. I think i'll just loosen to test case to only check for the expected number of unique producer IDs (which is really what we care about).
   
   Yeah, I think it happens if an RPC response is dropped (which could happen under high load conditions, like in jenkins) but should not be a problem, since we'll just retry.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10504: KAFKA-12620 Allocate producer ids on the controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#discussion_r630587756



##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -2376,6 +2375,82 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  def allocateProducerIds(allocateProducerIdsRequest: AllocateProducerIdsRequestData,
+                          callback: AllocateProducerIdsResponseData => Unit): Unit = {
+
+    def eventManagerCallback(results: Either[Errors, ProducerIdsBlock]): Unit = {
+      results match {
+        case Left(error) => callback.apply(new AllocateProducerIdsResponseData().setErrorCode(error.code))
+        case Right(pidBlock) => callback.apply(
+          new AllocateProducerIdsResponseData()
+            .setProducerIdStart(pidBlock.producerIdStart())
+            .setProducerIdLen(pidBlock.producerIdLen()))
+      }
+    }
+    eventManager.put(AllocateProducerIds(allocateProducerIdsRequest.brokerId,
+      allocateProducerIdsRequest.brokerEpoch, eventManagerCallback))
+  }
+
+  def processAllocateProducerIds(brokerId: Int, brokerEpoch: Long, callback: Either[Errors, ProducerIdsBlock] => Unit): Unit = {
+    // Handle a few short-circuits
+    if (!isActive) {
+      callback.apply(Left(Errors.NOT_CONTROLLER))
+      return
+    }
+
+    val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
+    if (brokerEpochOpt.isEmpty) {
+      warn(s"Ignoring AllocateProducerIds due to unknown broker $brokerId")
+      callback.apply(Left(Errors.STALE_BROKER_EPOCH))

Review comment:
       It seems like this should be BROKER_ID_NOT_REGISTERED




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10504: KAFKA-12620 Allocate producer ids on the controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#discussion_r635610364



##########
File path: core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
##########
@@ -75,87 +61,179 @@ trait ProducerIdGenerator {
   def shutdown() : Unit = {}
 }
 
-class ProducerIdManager(val brokerId: Int, val zkClient: KafkaZkClient) extends ProducerIdGenerator with Logging {
-
-  this.logIdent = "[ProducerId Manager " + brokerId + "]: "
-
-  private var currentProducerIdBlock: ProducerIdBlock = null
-  private var nextProducerId: Long = -1L
-
-  // grab the first block of producerIds
-  this synchronized {
-    getNewProducerIdBlock()
-    nextProducerId = currentProducerIdBlock.blockStartId
-  }
-
-  private def getNewProducerIdBlock(): Unit = {
+object ZkProducerIdManager {
+  def getNewProducerIdBlock(brokerId: Int, zkClient: KafkaZkClient, logger: Logging): ProducerIdsBlock = {
+    // Get or create the existing PID block from ZK and attempt to update it. We retry in a loop here since other
+    // brokers may be generating PID blocks during a rolling upgrade
     var zkWriteComplete = false
     while (!zkWriteComplete) {
       // refresh current producerId block from zookeeper again
       val (dataOpt, zkVersion) = zkClient.getDataAndVersion(ProducerIdBlockZNode.path)
 
       // generate the new producerId block
-      currentProducerIdBlock = dataOpt match {
+      val newProducerIdBlock = dataOpt match {
         case Some(data) =>
-          val currProducerIdBlock = ProducerIdManager.parseProducerIdBlockData(data)
-          debug(s"Read current producerId block $currProducerIdBlock, Zk path version $zkVersion")
+          val currProducerIdBlock = ProducerIdBlockZNode.parseProducerIdBlockData(data)
+          logger.debug(s"Read current producerId block $currProducerIdBlock, Zk path version $zkVersion")
 
-          if (currProducerIdBlock.blockEndId > Long.MaxValue - ProducerIdManager.PidBlockSize) {
+          if (currProducerIdBlock.producerIdEnd > Long.MaxValue - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) {
             // we have exhausted all producerIds (wow!), treat it as a fatal error
-            fatal(s"Exhausted all producerIds as the next block's end producerId is will has exceeded long type limit (current block end producerId is ${currProducerIdBlock.blockEndId})")
+            logger.fatal(s"Exhausted all producerIds as the next block's end producerId is will has exceeded long type limit (current block end producerId is ${currProducerIdBlock.producerIdEnd})")
             throw new KafkaException("Have exhausted all producerIds.")
           }
 
-          ProducerIdBlock(brokerId, currProducerIdBlock.blockEndId + 1L, currProducerIdBlock.blockEndId + ProducerIdManager.PidBlockSize)
+          new ProducerIdsBlock(brokerId, currProducerIdBlock.producerIdEnd + 1L, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
         case None =>
-          debug(s"There is no producerId block yet (Zk path version $zkVersion), creating the first block")
-          ProducerIdBlock(brokerId, 0L, ProducerIdManager.PidBlockSize - 1)
+          logger.debug(s"There is no producerId block yet (Zk path version $zkVersion), creating the first block")
+          new ProducerIdsBlock(brokerId, 0L, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
       }
 
-      val newProducerIdBlockData = ProducerIdManager.generateProducerIdBlockJson(currentProducerIdBlock)
+      val newProducerIdBlockData = ProducerIdBlockZNode.generateProducerIdBlockJson(newProducerIdBlock)
 
       // try to write the new producerId block into zookeeper
-      val (succeeded, version) = zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path,
-        newProducerIdBlockData, zkVersion, Some(checkProducerIdBlockZkData))
+      val (succeeded, version) = zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path, newProducerIdBlockData, zkVersion, None)
       zkWriteComplete = succeeded
 
-      if (zkWriteComplete)
-        info(s"Acquired new producerId block $currentProducerIdBlock by writing to Zk with path version $version")
+      if (zkWriteComplete) {
+        logger.info(s"Acquired new producerId block $newProducerIdBlock by writing to Zk with path version $version")
+        return newProducerIdBlock
+      }
     }
+    throw new IllegalStateException()
   }
+}
 
-  private def checkProducerIdBlockZkData(zkClient: KafkaZkClient, path: String, expectedData: Array[Byte]): (Boolean, Int) = {
-    try {
-      val expectedPidBlock = ProducerIdManager.parseProducerIdBlockData(expectedData)
-      zkClient.getDataAndVersion(ProducerIdBlockZNode.path) match {
-        case (Some(data), zkVersion) =>
-          val currProducerIdBLock = ProducerIdManager.parseProducerIdBlockData(data)
-          (currProducerIdBLock == expectedPidBlock, zkVersion)
-        case (None, _) => (false, -1)
-      }
-    } catch {
-      case e: Exception =>
-        warn(s"Error while checking for producerId block Zk data on path $path: expected data " +
-          s"${new String(expectedData, StandardCharsets.UTF_8)}", e)
-        (false, -1)
-    }
+class ZkProducerIdManager(brokerId: Int,
+                          zkClient: KafkaZkClient) extends ProducerIdGenerator with Logging {
+
+  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
+  private var nextProducerId: Long = -1L
+
+  // grab the first block of producerIds
+  this synchronized {
+    getNewProducerIdBlock()
+    nextProducerId = currentProducerIdBlock.producerIdStart
+  }
+
+  private def getNewProducerIdBlock(): Unit = {
+    currentProducerIdBlock = ZkProducerIdManager.getNewProducerIdBlock(brokerId, zkClient, this)
   }
 
   def generateProducerId(): Long = {
     this synchronized {
       // grab a new block of producerIds if this block has been exhausted
-      if (nextProducerId > currentProducerIdBlock.blockEndId) {
+      if (nextProducerId > currentProducerIdBlock.producerIdEnd) {
         getNewProducerIdBlock()
-        nextProducerId = currentProducerIdBlock.blockStartId + 1
+        nextProducerId = currentProducerIdBlock.producerIdStart + 1
       } else {
         nextProducerId += 1
       }
-
       nextProducerId - 1
     }
   }
+}
+
+class ProducerIdManager(brokerId: Int,
+                        brokerEpochSupplier: () => Long,
+                        controllerChannel: BrokerToControllerChannelManager,
+                        maxWaitMs: Int) extends ProducerIdGenerator with Logging {
+
+  this.logIdent = "[ProducerId Manager " + brokerId + "]: "
+
+  private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  private val requestInFlight = new AtomicBoolean(false)
+  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
+  private var nextProducerId: Long = -1L
+
+  override def generateProducerId(): Long = {

Review comment:
       To enlarge on the previous comment a bit: I guess the reason why I consider a condition variable to be simpler is that it involves fewer locks. A blocking queue has its own lock separate from the the lock in the manager, which can be awkward.
   
   What kind of behavior do you want on error? Do you want to deliver a given error to all waiters, or just to a single waiter? It seems like you might need to deliver it to all of them, to prevent huge pile-ups in cases where there are such errors.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10504: KAFKA-12620 Allocate producer ids on the controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#discussion_r635532912



##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -2376,6 +2375,82 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  def allocateProducerIds(allocateProducerIdsRequest: AllocateProducerIdsRequestData,
+                          callback: AllocateProducerIdsResponseData => Unit): Unit = {
+
+    def eventManagerCallback(results: Either[Errors, ProducerIdsBlock]): Unit = {
+      results match {
+        case Left(error) => callback.apply(new AllocateProducerIdsResponseData().setErrorCode(error.code))
+        case Right(pidBlock) => callback.apply(
+          new AllocateProducerIdsResponseData()
+            .setProducerIdStart(pidBlock.producerIdStart())
+            .setProducerIdLen(pidBlock.producerIdLen()))
+      }
+    }
+    eventManager.put(AllocateProducerIds(allocateProducerIdsRequest.brokerId,
+      allocateProducerIdsRequest.brokerEpoch, eventManagerCallback))
+  }
+
+  def processAllocateProducerIds(brokerId: Int, brokerEpoch: Long, callback: Either[Errors, ProducerIdsBlock] => Unit): Unit = {
+    // Handle a few short-circuits
+    if (!isActive) {
+      callback.apply(Left(Errors.NOT_CONTROLLER))
+      return
+    }
+
+    val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
+    if (brokerEpochOpt.isEmpty) {
+      warn(s"Ignoring AllocateProducerIds due to unknown broker $brokerId")
+      callback.apply(Left(Errors.STALE_BROKER_EPOCH))

Review comment:
       I think that because the quorum controller and the zk controller are two implementations of the same API protocol, they should share the same error codes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10504: KAFKA-12620 Allocate producer ids on the controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#discussion_r635582909



##########
File path: core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
##########
@@ -75,87 +61,179 @@ trait ProducerIdGenerator {
   def shutdown() : Unit = {}
 }
 
-class ProducerIdManager(val brokerId: Int, val zkClient: KafkaZkClient) extends ProducerIdGenerator with Logging {
-
-  this.logIdent = "[ProducerId Manager " + brokerId + "]: "
-
-  private var currentProducerIdBlock: ProducerIdBlock = null
-  private var nextProducerId: Long = -1L
-
-  // grab the first block of producerIds
-  this synchronized {
-    getNewProducerIdBlock()
-    nextProducerId = currentProducerIdBlock.blockStartId
-  }
-
-  private def getNewProducerIdBlock(): Unit = {
+object ZkProducerIdManager {
+  def getNewProducerIdBlock(brokerId: Int, zkClient: KafkaZkClient, logger: Logging): ProducerIdsBlock = {
+    // Get or create the existing PID block from ZK and attempt to update it. We retry in a loop here since other
+    // brokers may be generating PID blocks during a rolling upgrade
     var zkWriteComplete = false
     while (!zkWriteComplete) {
       // refresh current producerId block from zookeeper again
       val (dataOpt, zkVersion) = zkClient.getDataAndVersion(ProducerIdBlockZNode.path)
 
       // generate the new producerId block
-      currentProducerIdBlock = dataOpt match {
+      val newProducerIdBlock = dataOpt match {
         case Some(data) =>
-          val currProducerIdBlock = ProducerIdManager.parseProducerIdBlockData(data)
-          debug(s"Read current producerId block $currProducerIdBlock, Zk path version $zkVersion")
+          val currProducerIdBlock = ProducerIdBlockZNode.parseProducerIdBlockData(data)
+          logger.debug(s"Read current producerId block $currProducerIdBlock, Zk path version $zkVersion")
 
-          if (currProducerIdBlock.blockEndId > Long.MaxValue - ProducerIdManager.PidBlockSize) {
+          if (currProducerIdBlock.producerIdEnd > Long.MaxValue - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) {
             // we have exhausted all producerIds (wow!), treat it as a fatal error
-            fatal(s"Exhausted all producerIds as the next block's end producerId is will has exceeded long type limit (current block end producerId is ${currProducerIdBlock.blockEndId})")
+            logger.fatal(s"Exhausted all producerIds as the next block's end producerId is will has exceeded long type limit (current block end producerId is ${currProducerIdBlock.producerIdEnd})")
             throw new KafkaException("Have exhausted all producerIds.")
           }
 
-          ProducerIdBlock(brokerId, currProducerIdBlock.blockEndId + 1L, currProducerIdBlock.blockEndId + ProducerIdManager.PidBlockSize)
+          new ProducerIdsBlock(brokerId, currProducerIdBlock.producerIdEnd + 1L, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
         case None =>
-          debug(s"There is no producerId block yet (Zk path version $zkVersion), creating the first block")
-          ProducerIdBlock(brokerId, 0L, ProducerIdManager.PidBlockSize - 1)
+          logger.debug(s"There is no producerId block yet (Zk path version $zkVersion), creating the first block")
+          new ProducerIdsBlock(brokerId, 0L, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
       }
 
-      val newProducerIdBlockData = ProducerIdManager.generateProducerIdBlockJson(currentProducerIdBlock)
+      val newProducerIdBlockData = ProducerIdBlockZNode.generateProducerIdBlockJson(newProducerIdBlock)
 
       // try to write the new producerId block into zookeeper
-      val (succeeded, version) = zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path,
-        newProducerIdBlockData, zkVersion, Some(checkProducerIdBlockZkData))
+      val (succeeded, version) = zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path, newProducerIdBlockData, zkVersion, None)
       zkWriteComplete = succeeded
 
-      if (zkWriteComplete)
-        info(s"Acquired new producerId block $currentProducerIdBlock by writing to Zk with path version $version")
+      if (zkWriteComplete) {
+        logger.info(s"Acquired new producerId block $newProducerIdBlock by writing to Zk with path version $version")
+        return newProducerIdBlock
+      }
     }
+    throw new IllegalStateException()
   }
+}
 
-  private def checkProducerIdBlockZkData(zkClient: KafkaZkClient, path: String, expectedData: Array[Byte]): (Boolean, Int) = {
-    try {
-      val expectedPidBlock = ProducerIdManager.parseProducerIdBlockData(expectedData)
-      zkClient.getDataAndVersion(ProducerIdBlockZNode.path) match {
-        case (Some(data), zkVersion) =>
-          val currProducerIdBLock = ProducerIdManager.parseProducerIdBlockData(data)
-          (currProducerIdBLock == expectedPidBlock, zkVersion)
-        case (None, _) => (false, -1)
-      }
-    } catch {
-      case e: Exception =>
-        warn(s"Error while checking for producerId block Zk data on path $path: expected data " +
-          s"${new String(expectedData, StandardCharsets.UTF_8)}", e)
-        (false, -1)
-    }
+class ZkProducerIdManager(brokerId: Int,
+                          zkClient: KafkaZkClient) extends ProducerIdGenerator with Logging {
+
+  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
+  private var nextProducerId: Long = -1L
+
+  // grab the first block of producerIds
+  this synchronized {
+    getNewProducerIdBlock()
+    nextProducerId = currentProducerIdBlock.producerIdStart
+  }
+
+  private def getNewProducerIdBlock(): Unit = {
+    currentProducerIdBlock = ZkProducerIdManager.getNewProducerIdBlock(brokerId, zkClient, this)
   }
 
   def generateProducerId(): Long = {
     this synchronized {
       // grab a new block of producerIds if this block has been exhausted
-      if (nextProducerId > currentProducerIdBlock.blockEndId) {
+      if (nextProducerId > currentProducerIdBlock.producerIdEnd) {
         getNewProducerIdBlock()
-        nextProducerId = currentProducerIdBlock.blockStartId + 1
+        nextProducerId = currentProducerIdBlock.producerIdStart + 1
       } else {
         nextProducerId += 1
       }
-
       nextProducerId - 1
     }
   }
+}
+
+class ProducerIdManager(brokerId: Int,
+                        brokerEpochSupplier: () => Long,
+                        controllerChannel: BrokerToControllerChannelManager,
+                        maxWaitMs: Int) extends ProducerIdGenerator with Logging {
+
+  this.logIdent = "[ProducerId Manager " + brokerId + "]: "
+
+  private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  private val requestInFlight = new AtomicBoolean(false)
+  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
+  private var nextProducerId: Long = -1L
+
+  override def generateProducerId(): Long = {

Review comment:
       It would be better to use a condition variable rather than a blocking queue here. Consider the situation where the latest RPC call got an error. In that case, you want to deliver the error to all waiters, not just to one of them. You could do that with `notifyAll()`. You can't really do it with a blocking queue because there's 1 consumer and 1 producer.
   
   Removing the blocking queue also avoids using extra locks and so on. We only really need one lock and one condition variable here, I think. (wait / notify should be fine here, no need for ReentrantLock since we don't need multiple cvars).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #10504: KAFKA-12620 Allocate producer ids on the controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#issuecomment-845532005


   I'm OK with moving the condition variable refactor to a follow-on PR. Similarly, I see that you don't have any logger in ZkClusterInvocationContext.java, so we can hold off on that for now.
   
   However, it looks like `kafka.coordinator.transaction.ProducerIdsIntegrationTest` is failing here. @mumrah can you take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10504: KAFKA-12620 Allocate producer ids on the controller

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#discussion_r636104630



##########
File path: core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
##########
@@ -75,87 +61,179 @@ trait ProducerIdGenerator {
   def shutdown() : Unit = {}
 }
 
-class ProducerIdManager(val brokerId: Int, val zkClient: KafkaZkClient) extends ProducerIdGenerator with Logging {
-
-  this.logIdent = "[ProducerId Manager " + brokerId + "]: "
-
-  private var currentProducerIdBlock: ProducerIdBlock = null
-  private var nextProducerId: Long = -1L
-
-  // grab the first block of producerIds
-  this synchronized {
-    getNewProducerIdBlock()
-    nextProducerId = currentProducerIdBlock.blockStartId
-  }
-
-  private def getNewProducerIdBlock(): Unit = {
+object ZkProducerIdManager {
+  def getNewProducerIdBlock(brokerId: Int, zkClient: KafkaZkClient, logger: Logging): ProducerIdsBlock = {
+    // Get or create the existing PID block from ZK and attempt to update it. We retry in a loop here since other
+    // brokers may be generating PID blocks during a rolling upgrade
     var zkWriteComplete = false
     while (!zkWriteComplete) {
       // refresh current producerId block from zookeeper again
       val (dataOpt, zkVersion) = zkClient.getDataAndVersion(ProducerIdBlockZNode.path)
 
       // generate the new producerId block
-      currentProducerIdBlock = dataOpt match {
+      val newProducerIdBlock = dataOpt match {
         case Some(data) =>
-          val currProducerIdBlock = ProducerIdManager.parseProducerIdBlockData(data)
-          debug(s"Read current producerId block $currProducerIdBlock, Zk path version $zkVersion")
+          val currProducerIdBlock = ProducerIdBlockZNode.parseProducerIdBlockData(data)
+          logger.debug(s"Read current producerId block $currProducerIdBlock, Zk path version $zkVersion")
 
-          if (currProducerIdBlock.blockEndId > Long.MaxValue - ProducerIdManager.PidBlockSize) {
+          if (currProducerIdBlock.producerIdEnd > Long.MaxValue - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) {
             // we have exhausted all producerIds (wow!), treat it as a fatal error
-            fatal(s"Exhausted all producerIds as the next block's end producerId is will has exceeded long type limit (current block end producerId is ${currProducerIdBlock.blockEndId})")
+            logger.fatal(s"Exhausted all producerIds as the next block's end producerId is will has exceeded long type limit (current block end producerId is ${currProducerIdBlock.producerIdEnd})")
             throw new KafkaException("Have exhausted all producerIds.")
           }
 
-          ProducerIdBlock(brokerId, currProducerIdBlock.blockEndId + 1L, currProducerIdBlock.blockEndId + ProducerIdManager.PidBlockSize)
+          new ProducerIdsBlock(brokerId, currProducerIdBlock.producerIdEnd + 1L, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
         case None =>
-          debug(s"There is no producerId block yet (Zk path version $zkVersion), creating the first block")
-          ProducerIdBlock(brokerId, 0L, ProducerIdManager.PidBlockSize - 1)
+          logger.debug(s"There is no producerId block yet (Zk path version $zkVersion), creating the first block")
+          new ProducerIdsBlock(brokerId, 0L, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
       }
 
-      val newProducerIdBlockData = ProducerIdManager.generateProducerIdBlockJson(currentProducerIdBlock)
+      val newProducerIdBlockData = ProducerIdBlockZNode.generateProducerIdBlockJson(newProducerIdBlock)
 
       // try to write the new producerId block into zookeeper
-      val (succeeded, version) = zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path,
-        newProducerIdBlockData, zkVersion, Some(checkProducerIdBlockZkData))
+      val (succeeded, version) = zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path, newProducerIdBlockData, zkVersion, None)
       zkWriteComplete = succeeded
 
-      if (zkWriteComplete)
-        info(s"Acquired new producerId block $currentProducerIdBlock by writing to Zk with path version $version")
+      if (zkWriteComplete) {
+        logger.info(s"Acquired new producerId block $newProducerIdBlock by writing to Zk with path version $version")
+        return newProducerIdBlock
+      }
     }
+    throw new IllegalStateException()
   }
+}
 
-  private def checkProducerIdBlockZkData(zkClient: KafkaZkClient, path: String, expectedData: Array[Byte]): (Boolean, Int) = {
-    try {
-      val expectedPidBlock = ProducerIdManager.parseProducerIdBlockData(expectedData)
-      zkClient.getDataAndVersion(ProducerIdBlockZNode.path) match {
-        case (Some(data), zkVersion) =>
-          val currProducerIdBLock = ProducerIdManager.parseProducerIdBlockData(data)
-          (currProducerIdBLock == expectedPidBlock, zkVersion)
-        case (None, _) => (false, -1)
-      }
-    } catch {
-      case e: Exception =>
-        warn(s"Error while checking for producerId block Zk data on path $path: expected data " +
-          s"${new String(expectedData, StandardCharsets.UTF_8)}", e)
-        (false, -1)
-    }
+class ZkProducerIdManager(brokerId: Int,
+                          zkClient: KafkaZkClient) extends ProducerIdGenerator with Logging {
+
+  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
+  private var nextProducerId: Long = -1L
+
+  // grab the first block of producerIds
+  this synchronized {
+    getNewProducerIdBlock()
+    nextProducerId = currentProducerIdBlock.producerIdStart
+  }
+
+  private def getNewProducerIdBlock(): Unit = {

Review comment:
       edit: most of this code was copied from the existing logic. let me figure out what's going on with this initialization, it is a little strange
   
   Also, I'll add the `synchronized` here as well




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10504: KAFKA-12620 Allocate producer ids on the controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#discussion_r637189573



##########
File path: core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
##########
@@ -75,87 +61,179 @@ trait ProducerIdGenerator {
   def shutdown() : Unit = {}
 }
 
-class ProducerIdManager(val brokerId: Int, val zkClient: KafkaZkClient) extends ProducerIdGenerator with Logging {
-
-  this.logIdent = "[ProducerId Manager " + brokerId + "]: "
-
-  private var currentProducerIdBlock: ProducerIdBlock = null
-  private var nextProducerId: Long = -1L
-
-  // grab the first block of producerIds
-  this synchronized {
-    getNewProducerIdBlock()
-    nextProducerId = currentProducerIdBlock.blockStartId
-  }
-
-  private def getNewProducerIdBlock(): Unit = {
+object ZkProducerIdManager {
+  def getNewProducerIdBlock(brokerId: Int, zkClient: KafkaZkClient, logger: Logging): ProducerIdsBlock = {
+    // Get or create the existing PID block from ZK and attempt to update it. We retry in a loop here since other
+    // brokers may be generating PID blocks during a rolling upgrade
     var zkWriteComplete = false
     while (!zkWriteComplete) {
       // refresh current producerId block from zookeeper again
       val (dataOpt, zkVersion) = zkClient.getDataAndVersion(ProducerIdBlockZNode.path)
 
       // generate the new producerId block
-      currentProducerIdBlock = dataOpt match {
+      val newProducerIdBlock = dataOpt match {
         case Some(data) =>
-          val currProducerIdBlock = ProducerIdManager.parseProducerIdBlockData(data)
-          debug(s"Read current producerId block $currProducerIdBlock, Zk path version $zkVersion")
+          val currProducerIdBlock = ProducerIdBlockZNode.parseProducerIdBlockData(data)
+          logger.debug(s"Read current producerId block $currProducerIdBlock, Zk path version $zkVersion")
 
-          if (currProducerIdBlock.blockEndId > Long.MaxValue - ProducerIdManager.PidBlockSize) {
+          if (currProducerIdBlock.producerIdEnd > Long.MaxValue - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) {
             // we have exhausted all producerIds (wow!), treat it as a fatal error
-            fatal(s"Exhausted all producerIds as the next block's end producerId is will has exceeded long type limit (current block end producerId is ${currProducerIdBlock.blockEndId})")
+            logger.fatal(s"Exhausted all producerIds as the next block's end producerId is will has exceeded long type limit (current block end producerId is ${currProducerIdBlock.producerIdEnd})")
             throw new KafkaException("Have exhausted all producerIds.")
           }
 
-          ProducerIdBlock(brokerId, currProducerIdBlock.blockEndId + 1L, currProducerIdBlock.blockEndId + ProducerIdManager.PidBlockSize)
+          new ProducerIdsBlock(brokerId, currProducerIdBlock.producerIdEnd + 1L, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
         case None =>
-          debug(s"There is no producerId block yet (Zk path version $zkVersion), creating the first block")
-          ProducerIdBlock(brokerId, 0L, ProducerIdManager.PidBlockSize - 1)
+          logger.debug(s"There is no producerId block yet (Zk path version $zkVersion), creating the first block")
+          new ProducerIdsBlock(brokerId, 0L, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
       }
 
-      val newProducerIdBlockData = ProducerIdManager.generateProducerIdBlockJson(currentProducerIdBlock)
+      val newProducerIdBlockData = ProducerIdBlockZNode.generateProducerIdBlockJson(newProducerIdBlock)
 
       // try to write the new producerId block into zookeeper
-      val (succeeded, version) = zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path,
-        newProducerIdBlockData, zkVersion, Some(checkProducerIdBlockZkData))
+      val (succeeded, version) = zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path, newProducerIdBlockData, zkVersion, None)
       zkWriteComplete = succeeded
 
-      if (zkWriteComplete)
-        info(s"Acquired new producerId block $currentProducerIdBlock by writing to Zk with path version $version")
+      if (zkWriteComplete) {
+        logger.info(s"Acquired new producerId block $newProducerIdBlock by writing to Zk with path version $version")
+        return newProducerIdBlock
+      }
     }
+    throw new IllegalStateException()
   }
+}
 
-  private def checkProducerIdBlockZkData(zkClient: KafkaZkClient, path: String, expectedData: Array[Byte]): (Boolean, Int) = {
-    try {
-      val expectedPidBlock = ProducerIdManager.parseProducerIdBlockData(expectedData)
-      zkClient.getDataAndVersion(ProducerIdBlockZNode.path) match {
-        case (Some(data), zkVersion) =>
-          val currProducerIdBLock = ProducerIdManager.parseProducerIdBlockData(data)
-          (currProducerIdBLock == expectedPidBlock, zkVersion)
-        case (None, _) => (false, -1)
-      }
-    } catch {
-      case e: Exception =>
-        warn(s"Error while checking for producerId block Zk data on path $path: expected data " +
-          s"${new String(expectedData, StandardCharsets.UTF_8)}", e)
-        (false, -1)
-    }
+class ZkProducerIdManager(brokerId: Int,
+                          zkClient: KafkaZkClient) extends ProducerIdGenerator with Logging {
+
+  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
+  private var nextProducerId: Long = -1L
+
+  // grab the first block of producerIds
+  this synchronized {
+    getNewProducerIdBlock()
+    nextProducerId = currentProducerIdBlock.producerIdStart
+  }
+
+  private def getNewProducerIdBlock(): Unit = {
+    currentProducerIdBlock = ZkProducerIdManager.getNewProducerIdBlock(brokerId, zkClient, this)
   }
 
   def generateProducerId(): Long = {
     this synchronized {
       // grab a new block of producerIds if this block has been exhausted
-      if (nextProducerId > currentProducerIdBlock.blockEndId) {
+      if (nextProducerId > currentProducerIdBlock.producerIdEnd) {
         getNewProducerIdBlock()
-        nextProducerId = currentProducerIdBlock.blockStartId + 1
+        nextProducerId = currentProducerIdBlock.producerIdStart + 1
       } else {
         nextProducerId += 1
       }
-
       nextProducerId - 1
     }
   }
+}
+
+class ProducerIdManager(brokerId: Int,
+                        brokerEpochSupplier: () => Long,
+                        controllerChannel: BrokerToControllerChannelManager,
+                        maxWaitMs: Int) extends ProducerIdGenerator with Logging {
+
+  this.logIdent = "[ProducerId Manager " + brokerId + "]: "
+
+  private val nextProducerIdBlock = new ArrayBlockingQueue[Try[ProducerIdsBlock]](1)
+  private val requestInFlight = new AtomicBoolean(false)
+  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
+  private var nextProducerId: Long = -1L
+
+  override def generateProducerId(): Long = {

Review comment:
       Let's do this in a follow-on




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10504: KAFKA-12620 Allocate producer ids on the controller

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#discussion_r634580042



##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -2376,6 +2375,82 @@ class KafkaController(val config: KafkaConfig,
     }
   }
 
+  def allocateProducerIds(allocateProducerIdsRequest: AllocateProducerIdsRequestData,
+                          callback: AllocateProducerIdsResponseData => Unit): Unit = {
+
+    def eventManagerCallback(results: Either[Errors, ProducerIdsBlock]): Unit = {
+      results match {
+        case Left(error) => callback.apply(new AllocateProducerIdsResponseData().setErrorCode(error.code))
+        case Right(pidBlock) => callback.apply(
+          new AllocateProducerIdsResponseData()
+            .setProducerIdStart(pidBlock.producerIdStart())
+            .setProducerIdLen(pidBlock.producerIdLen()))
+      }
+    }
+    eventManager.put(AllocateProducerIds(allocateProducerIdsRequest.brokerId,
+      allocateProducerIdsRequest.brokerEpoch, eventManagerCallback))
+  }
+
+  def processAllocateProducerIds(brokerId: Int, brokerEpoch: Long, callback: Either[Errors, ProducerIdsBlock] => Unit): Unit = {
+    // Handle a few short-circuits
+    if (!isActive) {
+      callback.apply(Left(Errors.NOT_CONTROLLER))
+      return
+    }
+
+    val brokerEpochOpt = controllerContext.liveBrokerIdAndEpochs.get(brokerId)
+    if (brokerEpochOpt.isEmpty) {
+      warn(s"Ignoring AllocateProducerIds due to unknown broker $brokerId")
+      callback.apply(Left(Errors.STALE_BROKER_EPOCH))

Review comment:
       This error code was added for the quorum controller. I suppose it's fine to use with the ZK controller. WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10504: KAFKA-12620 Allocate producer ids on the controller

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#discussion_r636104630



##########
File path: core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
##########
@@ -75,87 +61,179 @@ trait ProducerIdGenerator {
   def shutdown() : Unit = {}
 }
 
-class ProducerIdManager(val brokerId: Int, val zkClient: KafkaZkClient) extends ProducerIdGenerator with Logging {
-
-  this.logIdent = "[ProducerId Manager " + brokerId + "]: "
-
-  private var currentProducerIdBlock: ProducerIdBlock = null
-  private var nextProducerId: Long = -1L
-
-  // grab the first block of producerIds
-  this synchronized {
-    getNewProducerIdBlock()
-    nextProducerId = currentProducerIdBlock.blockStartId
-  }
-
-  private def getNewProducerIdBlock(): Unit = {
+object ZkProducerIdManager {
+  def getNewProducerIdBlock(brokerId: Int, zkClient: KafkaZkClient, logger: Logging): ProducerIdsBlock = {
+    // Get or create the existing PID block from ZK and attempt to update it. We retry in a loop here since other
+    // brokers may be generating PID blocks during a rolling upgrade
     var zkWriteComplete = false
     while (!zkWriteComplete) {
       // refresh current producerId block from zookeeper again
       val (dataOpt, zkVersion) = zkClient.getDataAndVersion(ProducerIdBlockZNode.path)
 
       // generate the new producerId block
-      currentProducerIdBlock = dataOpt match {
+      val newProducerIdBlock = dataOpt match {
         case Some(data) =>
-          val currProducerIdBlock = ProducerIdManager.parseProducerIdBlockData(data)
-          debug(s"Read current producerId block $currProducerIdBlock, Zk path version $zkVersion")
+          val currProducerIdBlock = ProducerIdBlockZNode.parseProducerIdBlockData(data)
+          logger.debug(s"Read current producerId block $currProducerIdBlock, Zk path version $zkVersion")
 
-          if (currProducerIdBlock.blockEndId > Long.MaxValue - ProducerIdManager.PidBlockSize) {
+          if (currProducerIdBlock.producerIdEnd > Long.MaxValue - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) {
             // we have exhausted all producerIds (wow!), treat it as a fatal error
-            fatal(s"Exhausted all producerIds as the next block's end producerId is will has exceeded long type limit (current block end producerId is ${currProducerIdBlock.blockEndId})")
+            logger.fatal(s"Exhausted all producerIds as the next block's end producerId is will has exceeded long type limit (current block end producerId is ${currProducerIdBlock.producerIdEnd})")
             throw new KafkaException("Have exhausted all producerIds.")
           }
 
-          ProducerIdBlock(brokerId, currProducerIdBlock.blockEndId + 1L, currProducerIdBlock.blockEndId + ProducerIdManager.PidBlockSize)
+          new ProducerIdsBlock(brokerId, currProducerIdBlock.producerIdEnd + 1L, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
         case None =>
-          debug(s"There is no producerId block yet (Zk path version $zkVersion), creating the first block")
-          ProducerIdBlock(brokerId, 0L, ProducerIdManager.PidBlockSize - 1)
+          logger.debug(s"There is no producerId block yet (Zk path version $zkVersion), creating the first block")
+          new ProducerIdsBlock(brokerId, 0L, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
       }
 
-      val newProducerIdBlockData = ProducerIdManager.generateProducerIdBlockJson(currentProducerIdBlock)
+      val newProducerIdBlockData = ProducerIdBlockZNode.generateProducerIdBlockJson(newProducerIdBlock)
 
       // try to write the new producerId block into zookeeper
-      val (succeeded, version) = zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path,
-        newProducerIdBlockData, zkVersion, Some(checkProducerIdBlockZkData))
+      val (succeeded, version) = zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path, newProducerIdBlockData, zkVersion, None)
       zkWriteComplete = succeeded
 
-      if (zkWriteComplete)
-        info(s"Acquired new producerId block $currentProducerIdBlock by writing to Zk with path version $version")
+      if (zkWriteComplete) {
+        logger.info(s"Acquired new producerId block $newProducerIdBlock by writing to Zk with path version $version")
+        return newProducerIdBlock
+      }
     }
+    throw new IllegalStateException()
   }
+}
 
-  private def checkProducerIdBlockZkData(zkClient: KafkaZkClient, path: String, expectedData: Array[Byte]): (Boolean, Int) = {
-    try {
-      val expectedPidBlock = ProducerIdManager.parseProducerIdBlockData(expectedData)
-      zkClient.getDataAndVersion(ProducerIdBlockZNode.path) match {
-        case (Some(data), zkVersion) =>
-          val currProducerIdBLock = ProducerIdManager.parseProducerIdBlockData(data)
-          (currProducerIdBLock == expectedPidBlock, zkVersion)
-        case (None, _) => (false, -1)
-      }
-    } catch {
-      case e: Exception =>
-        warn(s"Error while checking for producerId block Zk data on path $path: expected data " +
-          s"${new String(expectedData, StandardCharsets.UTF_8)}", e)
-        (false, -1)
-    }
+class ZkProducerIdManager(brokerId: Int,
+                          zkClient: KafkaZkClient) extends ProducerIdGenerator with Logging {
+
+  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
+  private var nextProducerId: Long = -1L
+
+  // grab the first block of producerIds
+  this synchronized {
+    getNewProducerIdBlock()
+    nextProducerId = currentProducerIdBlock.producerIdStart
+  }
+
+  private def getNewProducerIdBlock(): Unit = {

Review comment:
       double edit: it has to do with nextProducerId leading ahead by one and how we do the check for allocating the next block. I'll clean up this code to make it more clear
   
   Also, I'll add the `synchronized` here as well




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10504: KAFKA-12620 Allocate producer ids on the controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#discussion_r635578464



##########
File path: core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
##########
@@ -75,87 +61,179 @@ trait ProducerIdGenerator {
   def shutdown() : Unit = {}
 }
 
-class ProducerIdManager(val brokerId: Int, val zkClient: KafkaZkClient) extends ProducerIdGenerator with Logging {
-
-  this.logIdent = "[ProducerId Manager " + brokerId + "]: "
-
-  private var currentProducerIdBlock: ProducerIdBlock = null
-  private var nextProducerId: Long = -1L
-
-  // grab the first block of producerIds
-  this synchronized {
-    getNewProducerIdBlock()
-    nextProducerId = currentProducerIdBlock.blockStartId
-  }
-
-  private def getNewProducerIdBlock(): Unit = {
+object ZkProducerIdManager {
+  def getNewProducerIdBlock(brokerId: Int, zkClient: KafkaZkClient, logger: Logging): ProducerIdsBlock = {
+    // Get or create the existing PID block from ZK and attempt to update it. We retry in a loop here since other
+    // brokers may be generating PID blocks during a rolling upgrade
     var zkWriteComplete = false
     while (!zkWriteComplete) {
       // refresh current producerId block from zookeeper again
       val (dataOpt, zkVersion) = zkClient.getDataAndVersion(ProducerIdBlockZNode.path)
 
       // generate the new producerId block
-      currentProducerIdBlock = dataOpt match {
+      val newProducerIdBlock = dataOpt match {
         case Some(data) =>
-          val currProducerIdBlock = ProducerIdManager.parseProducerIdBlockData(data)
-          debug(s"Read current producerId block $currProducerIdBlock, Zk path version $zkVersion")
+          val currProducerIdBlock = ProducerIdBlockZNode.parseProducerIdBlockData(data)
+          logger.debug(s"Read current producerId block $currProducerIdBlock, Zk path version $zkVersion")
 
-          if (currProducerIdBlock.blockEndId > Long.MaxValue - ProducerIdManager.PidBlockSize) {
+          if (currProducerIdBlock.producerIdEnd > Long.MaxValue - ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE) {
             // we have exhausted all producerIds (wow!), treat it as a fatal error
-            fatal(s"Exhausted all producerIds as the next block's end producerId is will has exceeded long type limit (current block end producerId is ${currProducerIdBlock.blockEndId})")
+            logger.fatal(s"Exhausted all producerIds as the next block's end producerId is will has exceeded long type limit (current block end producerId is ${currProducerIdBlock.producerIdEnd})")
             throw new KafkaException("Have exhausted all producerIds.")
           }
 
-          ProducerIdBlock(brokerId, currProducerIdBlock.blockEndId + 1L, currProducerIdBlock.blockEndId + ProducerIdManager.PidBlockSize)
+          new ProducerIdsBlock(brokerId, currProducerIdBlock.producerIdEnd + 1L, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
         case None =>
-          debug(s"There is no producerId block yet (Zk path version $zkVersion), creating the first block")
-          ProducerIdBlock(brokerId, 0L, ProducerIdManager.PidBlockSize - 1)
+          logger.debug(s"There is no producerId block yet (Zk path version $zkVersion), creating the first block")
+          new ProducerIdsBlock(brokerId, 0L, ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE)
       }
 
-      val newProducerIdBlockData = ProducerIdManager.generateProducerIdBlockJson(currentProducerIdBlock)
+      val newProducerIdBlockData = ProducerIdBlockZNode.generateProducerIdBlockJson(newProducerIdBlock)
 
       // try to write the new producerId block into zookeeper
-      val (succeeded, version) = zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path,
-        newProducerIdBlockData, zkVersion, Some(checkProducerIdBlockZkData))
+      val (succeeded, version) = zkClient.conditionalUpdatePath(ProducerIdBlockZNode.path, newProducerIdBlockData, zkVersion, None)
       zkWriteComplete = succeeded
 
-      if (zkWriteComplete)
-        info(s"Acquired new producerId block $currentProducerIdBlock by writing to Zk with path version $version")
+      if (zkWriteComplete) {
+        logger.info(s"Acquired new producerId block $newProducerIdBlock by writing to Zk with path version $version")
+        return newProducerIdBlock
+      }
     }
+    throw new IllegalStateException()
   }
+}
 
-  private def checkProducerIdBlockZkData(zkClient: KafkaZkClient, path: String, expectedData: Array[Byte]): (Boolean, Int) = {
-    try {
-      val expectedPidBlock = ProducerIdManager.parseProducerIdBlockData(expectedData)
-      zkClient.getDataAndVersion(ProducerIdBlockZNode.path) match {
-        case (Some(data), zkVersion) =>
-          val currProducerIdBLock = ProducerIdManager.parseProducerIdBlockData(data)
-          (currProducerIdBLock == expectedPidBlock, zkVersion)
-        case (None, _) => (false, -1)
-      }
-    } catch {
-      case e: Exception =>
-        warn(s"Error while checking for producerId block Zk data on path $path: expected data " +
-          s"${new String(expectedData, StandardCharsets.UTF_8)}", e)
-        (false, -1)
-    }
+class ZkProducerIdManager(brokerId: Int,
+                          zkClient: KafkaZkClient) extends ProducerIdGenerator with Logging {
+
+  private var currentProducerIdBlock: ProducerIdsBlock = ProducerIdsBlock.EMPTY
+  private var nextProducerId: Long = -1L
+
+  // grab the first block of producerIds
+  this synchronized {
+    getNewProducerIdBlock()
+    nextProducerId = currentProducerIdBlock.producerIdStart
+  }
+
+  private def getNewProducerIdBlock(): Unit = {

Review comment:
       It's probably better to make this function synchronized too, to emphasize that `currentProducerIdBlock` must be modified under the lock.
   
   Also, why does the initial synchronized block set `nextProducerId` to `currentProducerIdBlock.producerIdStart`, while the `generateProducerId` function sets it to `currentProducerIdBlock.producerIdStart + 1`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10504: KAFKA-12620 Allocate producer ids on the controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#discussion_r635585021



##########
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##########
@@ -330,10 +332,21 @@ class KafkaServer(
         groupCoordinator = GroupCoordinator(config, replicaManager, Time.SYSTEM, metrics)
         groupCoordinator.startup(() => zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions))
 
+        /* create producer ids manager */
+        val producerIdManager = if (config.interBrokerProtocolVersion.isAllocateProducerIdsSupported) {
+          ProducerIdGenerator(

Review comment:
       It seems a bit weird that both function calls both have the same name. Can't we just create the subclass we want directly here, for clarity? One is ZkWhatever, the other is RpcWhatever, etc.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #10504: KAFKA-12620 Allocate producer ids on the controller

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #10504:
URL: https://github.com/apache/kafka/pull/10504#issuecomment-845535131


   The bug (which could be a test bug?) seems intermittent and didn't re-occur when I ran the test locally. In case Jenkins purges the information before you get a chance to look, here is the failing test and error message:
   
   Build / JDK 11 and Scala 2.13 / kafka.coordinator.transaction.ProducerIdsIntegrationTest.[2] Type=ZK, Name=testUniqueProducerIds, IBP=3.0-IV0, Security=PLAINTEXT
   
   org.opentest4j.AssertionFailedError: Expected to see 4000 in 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208,
  209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 222, 223, 224, 225, 226, 227, 228, 229, 230, 231, 232, 233, 234, 235, 236, 237, 238, 239, 240, 241, 242, 243, 244, 245, 246, 247, 248, 249, 250, 251, 252, 253, 254, 255, 256, 257, 258, 259, 260, 261, 262, 263, 264, 265, 266, 267, 268, 269, 270, 271, 272, 273, 274, 275, 276, 277, 278, 279, 280, 281, 282, 283, 284, 285, 286, 287, 288, 289, 290, 291, 292, 293, 294, 295, 296, 297, 298, 299, 300, 301, 302, 303, 304, 305, 306, 307, 308, 309, 310, 311, 312, 313, 314, 315, 316, 317, 318, 319, 320, 321, 322, 323, 324, 325, 326, 327, 328, 329, 330, 331, 332, 333, 334, 335, 336, 337, 338, 339, 340, 341, 342, 343, 344, 345, 346, 347, 348, 349, 350, 351, 352, 353, 354, 355, 356, 357, 358, 359, 360, 361, 362, 363, 364, 365, 366, 367, 368, 369, 370, 371, 372, 373, 374, 375, 376, 377, 378, 379, 380, 381, 382, 383, 384, 385, 386, 387, 388, 389, 390, 391, 392, 393, 394, 395, 396, 397, 398, 399, 400, 401, 402, 403, 404, 405, 406, 407, 4
 08, 409, 410, 411, 412, 413, 414, 415, 416, 417, 418, 419, 420, 421, 422, 423, 424, 425, 426, 427, 428, 429, 430, 431, 432, 433, 434, 435, 436, 437, 438, 439, 440, 441, 442, 443, 444, 445, 446, 447, 448, 449, 450, 451, 452, 453, 454, 455, 456, 457, 458, 459, 460, 461, 462, 463, 464, 465, 466, 467, 468, 469, 470, 471, 472, 473, 474, 475, 476, 477, 478, 479, 480, 481, 482, 483, 484, 485, 486, 487, 488, 489, 490, 491, 492, 493, 494, 495, 496, 497, 498, 499, 500, 501, 502, 503, 504, 505, 506, 507, 508, 509, 510, 511, 512, 513, 514, 515, 516, 517, 518, 519, 520, 521, 522, 523, 524, 525, 526, 527, 528, 529, 530, 531, 532, 533, 534, 535, 536, 537, 538, 539, 540, 541, 542, 543, 544, 545, 546, 547, 548, 549, 550, 551, 552, 553, 554, 555, 556, 557, 558, 559, 560, 561, 562, 563, 564, 565, 566, 567, 568, 569, 570, 571, 572, 573, 574, 575, 576, 577, 578, 579, 580, 581, 582, 583, 584, 585, 586, 587, 588, 589, 590, 591, 592, 593, 594, 595, 596, 597, 598, 599, 600, 601, 602, 603, 604, 605, 606, 607
 , 608, 609, 610, 611, 612, 613, 614, 615, 616, 617, 618, 619, 620, 621, 622, 623, 624, 625, 626, 627, 628, 629, 630, 631, 632, 633, 634, 635, 636, 637, 638, 639, 640, 641, 642, 643, 644, 645, 646, 647, 648, 649, 650, 651, 652, 653, 654, 655, 656, 657, 658, 659, 660, 661, 662, 663, 664, 665, 666, 667, 668, 669, 670, 671, 672, 673, 674, 675, 676, 677, 678, 679, 680, 681, 682, 683, 684, 685, 686, 687, 688, 689, 690, 691, 692, 693, 694, 695, 696, 697, 698, 699, 700, 701, 702, 703, 704, 705, 706, 707, 708, 709, 710, 711, 712, 713, 714, 715, 716, 717, 718, 719, 720, 721, 722, 723, 724, 725, 726, 727, 728, 729, 730, 731, 732, 733, 734, 735, 736, 737, 738, 739, 740, 741, 742, 743, 744, 745, 746, 747, 748, 749, 750, 751, 752, 753, 754, 755, 756, 757, 758, 759, 760, 761, 762, 763, 764, 765, 766, 767, 768, 769, 770, 771, 772, 773, 774, 775, 776, 777, 778, 779, 780, 781, 782, 783, 784, 785, 786, 787, 788, 789, 790, 791, 792, 793, 794, 795, 796, 797, 798, 799, 800, 801, 802, 803, 804, 805, 806, 
 807, 808, 809, 810, 811, 812, 813, 814, 815, 816, 817, 818, 819, 820, 821, 822, 823, 824, 825, 826, 827, 828, 829, 830, 831, 832, 833, 834, 835, 836, 837, 838, 839, 840, 841, 842, 843, 844, 845, 846, 847, 848, 849, 850, 851, 852, 853, 854, 855, 856, 857, 858, 859, 860, 861, 862, 863, 864, 865, 866, 867, 868, 869, 870, 871, 872, 873, 874, 875, 876, 877, 878, 879, 880, 881, 882, 883, 884, 885, 886, 887, 888, 889, 890, 891, 892, 893, 894, 895, 896, 897, 898, 899, 900, 901, 902, 903, 904, 905, 906, 907, 908, 909, 910, 911, 912, 913, 914, 915, 916, 917, 918, 919, 920, 921, 922, 923, 924, 925, 926, 927, 928, 929, 930, 931, 932, 933, 934, 935, 936, 937, 938, 939, 940, 941, 942, 943, 944, 945, 946, 947, 948, 949, 950, 951, 952, 953, 954, 955, 956, 957, 958, 959, 960, 961, 962, 963, 964, 965, 966, 967, 968, 969, 970, 971, 972, 973, 974, 975, 976, 977, 978, 979, 980, 981, 982, 983, 984, 985, 986, 987, 988, 989, 990, 991, 992, 993, 994, 995, 996, 997, 998, 999, 1000, 2000, 2001, 2002, 2003, 20
 04, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024, 2025, 2026, 2027, 2028, 2029, 2030, 2031, 2032, 2033, 2034, 2035, 2036, 2037, 2038, 2039, 2040, 2041, 2042, 2043, 2044, 2045, 2046, 2047, 2048, 2049, 2050, 2051, 2052, 2053, 2054, 2055, 2056, 2057, 2058, 2059, 2060, 2061, 2062, 2063, 2064, 2065, 2066, 2067, 2068, 2069, 2070, 2071, 2072, 2073, 2074, 2075, 2076, 2077, 2078, 2079, 2080, 2081, 2082, 2083, 2084, 2085, 2086, 2087, 2088, 2089, 2090, 2091, 2092, 2093, 2094, 2095, 2096, 2097, 2098, 2099, 2100, 2101, 2102, 2103, 2104, 2105, 2106, 2107, 2108, 2109, 2110, 2111, 2112, 2113, 2114, 2115, 2116, 2117, 2118, 2119, 2120, 2121, 2122, 2123, 2124, 2125, 2126, 2127, 2128, 2129, 2130, 2131, 2132, 2133, 2134, 2135, 2136, 2137, 2138, 2139, 2140, 2141, 2142, 2143, 2144, 2145, 2146, 2147, 2148, 2149, 2150, 2151, 2152, 2153, 2154, 2155, 2156, 2157, 2158, 2159, 2160, 2161, 2162, 2163, 2164, 2165, 2166, 2167, 2168, 2169, 217
 0, 2171, 2172, 2173, 2174, 2175, 2176, 2177, 2178, 2179, 2180, 2181, 2182, 2183, 2184, 2185, 2186, 2187, 2188, 2189, 2190, 2191, 2192, 2193, 2194, 2195, 2196, 2197, 2198, 2199, 2200, 2201, 2202, 2203, 2204, 2205, 2206, 2207, 2208, 2209, 2210, 2211, 2212, 2213, 2214, 2215, 2216, 2217, 2218, 2219, 2220, 2221, 2222, 2223, 2224, 2225, 2226, 2227, 2228, 2229, 2230, 2231, 2232, 2233, 2234, 2235, 2236, 2237, 2238, 2239, 2240, 2241, 2242, 2243, 2244, 2245, 2246, 2247, 2248, 2249, 2250, 2251, 2252, 2253, 2254, 2255, 2256, 2257, 2258, 2259, 2260, 2261, 2262, 2263, 2264, 2265, 2266, 2267, 2268, 2269, 2270, 2271, 2272, 2273, 2274, 2275, 2276, 2277, 2278, 2279, 2280, 2281, 2282, 2283, 2284, 2285, 2286, 2287, 2288, 2289, 2290, 2291, 2292, 2293, 2294, 2295, 2296, 2297, 2298, 2299, 2300, 2301, 2302, 2303, 2304, 2305, 2306, 2307, 2308, 2309, 2310, 2311, 2312, 2313, 2314, 2315, 2316, 2317, 2318, 2319, 2320, 2321, 2322, 2323, 2324, 2325, 2326, 2327, 2328, 2329, 2330, 2331, 2332, 2333, 2334, 2335, 2336
 , 2337, 2338, 2339, 2340, 2341, 2342, 2343, 2344, 2345, 2346, 2347, 2348, 2349, 2350, 2351, 2352, 2353, 2354, 2355, 2356, 2357, 2358, 2359, 2360, 2361, 2362, 2363, 2364, 2365, 2366, 2367, 2368, 2369, 2370, 2371, 2372, 2373, 2374, 2375, 2376, 2377, 2378, 2379, 2380, 2381, 2382, 2383, 2384, 2385, 2386, 2387, 2388, 2389, 2390, 2391, 2392, 2393, 2394, 2395, 2396, 2397, 2398, 2399, 2400, 2401, 2402, 2403, 2404, 2405, 2406, 2407, 2408, 2409, 2410, 2411, 2412, 2413, 2414, 2415, 2416, 2417, 2418, 2419, 2420, 2421, 2422, 2423, 2424, 2425, 2426, 2427, 2428, 2429, 2430, 2431, 2432, 2433, 2434, 2435, 2436, 2437, 2438, 2439, 2440, 2441, 2442, 2443, 2444, 2445, 2446, 2447, 2448, 2449, 2450, 2451, 2452, 2453, 2454, 2455, 2456, 2457, 2458, 2459, 2460, 2461, 2462, 2463, 2464, 2465, 2466, 2467, 2468, 2469, 2470, 2471, 2472, 2473, 2474, 2475, 2476, 2477, 2478, 2479, 2480, 2481, 2482, 2483, 2484, 2485, 2486, 2487, 2488, 2489, 2490, 2491, 2492, 2493, 2494, 2495, 2496, 2497, 2498, 2499, 2500, 2501, 2502,
  2503, 2504, 2505, 2506, 2507, 2508, 2509, 2510, 2511, 2512, 2513, 2514, 2515, 2516, 2517, 2518, 2519, 2520, 2521, 2522, 2523, 2524, 2525, 2526, 2527, 2528, 2529, 2530, 2531, 2532, 2533, 2534, 2535, 2536, 2537, 2538, 2539, 2540, 2541, 2542, 2543, 2544, 2545, 2546, 2547, 2548, 2549, 2550, 2551, 2552, 2553, 2554, 2555, 2556, 2557, 2558, 2559, 2560, 2561, 2562, 2563, 2564, 2565, 2566, 2567, 2568, 2569, 2570, 2571, 2572, 2573, 2574, 2575, 2576, 2577, 2578, 2579, 2580, 2581, 2582, 2583, 2584, 2585, 2586, 2587, 2588, 2589, 2590, 2591, 2592, 2593, 2594, 2595, 2596, 2597, 2598, 2599, 2600, 2601, 2602, 2603, 2604, 2605, 2606, 2607, 2608, 2609, 2610, 2611, 2612, 2613, 2614, 2615, 2616, 2617, 2618, 2619, 2620, 2621, 2622, 2623, 2624, 2625, 2626, 2627, 2628, 2629, 2630, 2631, 2632, 2633, 2634, 2635, 2636, 2637, 2638, 2639, 2640, 2641, 2642, 2643, 2644, 2645, 2646, 2647, 2648, 2649, 2650, 2651, 2652, 2653, 2654, 2655, 2656, 2657, 2658, 2659, 2660, 2661, 2662, 2663, 2664, 2665, 2666, 2667, 2668, 
 2669, 2670, 2671, 2672, 2673, 2674, 2675, 2676, 2677, 2678, 2679, 2680, 2681, 2682, 2683, 2684, 2685, 2686, 2687, 2688, 2689, 2690, 2691, 2692, 2693, 2694, 2695, 2696, 2697, 2698, 2699, 2700, 2701, 2702, 2703, 2704, 2705, 2706, 2707, 2708, 2709, 2710, 2711, 2712, 2713, 2714, 2715, 2716, 2717, 2718, 2719, 2720, 2721, 2722, 2723, 2724, 2725, 2726, 2727, 2728, 2729, 2730, 2731, 2732, 2733, 2734, 2735, 2736, 2737, 2738, 2739, 2740, 2741, 2742, 2743, 2744, 2745, 2746, 2747, 2748, 2749, 2750, 2751, 2752, 2753, 2754, 2755, 2756, 2757, 2758, 2759, 2760, 2761, 2762, 2763, 2764, 2765, 2766, 2767, 2768, 2769, 2770, 2771, 2772, 2773, 2774, 2775, 2776, 2777, 2778, 2779, 2780, 2781, 2782, 2783, 2784, 2785, 2786, 2787, 2788, 2789, 2790, 2791, 2792, 2793, 2794, 2795, 2796, 2797, 2798, 2799, 2800, 2801, 2802, 2803, 2804, 2805, 2806, 2807, 2808, 2809, 2810, 2811, 2812, 2813, 2814, 2815, 2816, 2817, 2818, 2819, 2820, 2821, 2822, 2823, 2824, 2825, 2826, 2827, 2828, 2829, 2830, 2831, 2832, 2833, 2834, 2
 835, 2836, 2837, 2838, 2839, 2840, 2841, 2842, 2843, 2844, 2845, 2846, 2847, 2848, 2849, 2850, 2851, 2852, 2853, 2854, 2855, 2856, 2857, 2858, 2859, 2860, 2861, 2862, 2863, 2864, 2865, 2866, 2867, 2868, 2869, 2870, 2871, 2872, 2873, 2874, 2875, 2876, 2877, 2878, 2879, 2880, 2881, 2882, 2883, 2884, 2885, 2886, 2887, 2888, 2889, 2890, 2891, 2892, 2893, 2894, 2895, 2896, 2897, 2898, 2899, 2900, 2901, 2902, 2903, 2904, 2905, 2906, 2907, 2908, 2909, 2910, 2911, 2912, 2913, 2914, 2915, 2916, 2917, 2918, 2919, 2920, 2921, 2922, 2923, 2924, 2925, 2926, 2927, 2928, 2929, 2930, 2931, 2932, 2933, 2934, 2935, 2936, 2937, 2938, 2939, 2940, 2941, 2942, 2943, 2944, 2945, 2946, 2947, 2948, 2949, 2950, 2951, 2952, 2953, 2954, 2955, 2956, 2957, 2958, 2959, 2960, 2961, 2962, 2963, 2964, 2965, 2966, 2967, 2968, 2969, 2970, 2971, 2972, 2973, 2974, 2975, 2976, 2977, 2978, 2979, 2980, 2981, 2982, 2983, 2984, 2985, 2986, 2987, 2988, 2989, 2990, 2991, 2992, 2993, 2994, 2995, 2996, 2997, 2998, 2999, 3000, 50
 00, 5001, 5002, 5003, 5004, 5005, 5006, 5007, 5008, 5009, 5010, 5011, 5012, 5013, 5014, 5015, 5016, 5017, 5018, 5019, 5020, 5021, 5022, 5023, 5024, 5025, 5026, 5027, 5028, 5029, 5030, 5031, 5032, 5033, 5034, 5035, 5036, 5037, 5038, 5039, 5040, 5041, 5042, 5043, 5044, 5045, 5046, 5047, 5048, 5049, 5050, 5051, 5052, 5053, 5054, 5055, 5056, 5057, 5058, 5059, 5060, 5061, 5062, 5063, 5064, 5065, 5066, 5067, 5068, 5069, 5070, 5071, 5072, 5073, 5074, 5075, 5076, 5077, 5078, 5079, 5080, 5081, 5082, 5083, 5084, 5085, 5086, 5087, 5088, 5089, 5090, 5091, 5092, 5093, 5094, 5095, 5096, 5097, 5098, 5099, 5100, 5101, 5102, 5103, 5104, 5105, 5106, 5107, 5108, 5109, 5110, 5111, 5112, 5113, 5114, 5115, 5116, 5117, 5118, 5119, 5120, 5121, 5122, 5123, 5124, 5125, 5126, 5127, 5128, 5129, 5130, 5131, 5132, 5133, 5134, 5135, 5136, 5137, 5138, 5139, 5140, 5141, 5142, 5143, 5144, 5145, 5146, 5147, 5148, 5149, 5150, 5151, 5152, 5153, 5154, 5155, 5156, 5157, 5158, 5159, 5160, 5161, 5162, 5163, 5164, 5165, 516
 6, 5167, 5168, 5169, 5170, 5171, 5172, 5173, 5174, 5175, 5176, 5177, 5178, 5179, 5180, 5181, 5182, 5183, 5184, 5185, 5186, 5187, 5188, 5189, 5190, 5191, 5192, 5193, 5194, 5195, 5196, 5197, 5198, 5199, 5200, 5201, 5202, 5203, 5204, 5205, 5206, 5207, 5208, 5209, 5210, 5211, 5212, 5213, 5214, 5215, 5216, 5217, 5218, 5219, 5220, 5221, 5222, 5223, 5224, 5225, 5226, 5227, 5228, 5229, 5230, 5231, 5232, 5233, 5234, 5235, 5236, 5237, 5238, 5239, 5240, 5241, 5242, 5243, 5244, 5245, 5246, 5247, 5248, 5249, 5250, 5251, 5252, 5253, 5254, 5255, 5256, 5257, 5258, 5259, 5260, 5261, 5262, 5263, 5264, 5265, 5266, 5267, 5268, 5269, 5270, 5271, 5272, 5273, 5274, 5275, 5276, 5277, 5278, 5279, 5280, 5281, 5282, 5283, 5284, 5285, 5286, 5287, 5288, 5289, 5290, 5291, 5292, 5293, 5294, 5295, 5296, 5297, 5298, 5299, 5300, 5301, 5302, 5303, 5304, 5305, 5306, 5307, 5308, 5309, 5310, 5311, 5312, 5313, 5314, 5315, 5316, 5317, 5318, 5319, 5320, 5321, 5322, 5323, 5324, 5325, 5326, 5327, 5328, 5329, 5330, 5331, 5332
 , 5333, 5334, 5335, 5336, 5337, 5338, 5339, 5340, 5341, 5342, 5343, 5344, 5345, 5346, 5347, 5348, 5349, 5350, 5351, 5352, 5353, 5354, 5355, 5356, 5357, 5358, 5359, 5360, 5361, 5362, 5363, 5364, 5365, 5366, 5367, 5368, 5369, 5370, 5371, 5372, 5373, 5374, 5375, 5376, 5377, 5378, 5379, 5380, 5381, 5382, 5383, 5384, 5385, 5386, 5387, 5388, 5389, 5390, 5391, 5392, 5393, 5394, 5395, 5396, 5397, 5398, 5399, 5400, 5401, 5402, 5403, 5404, 5405, 5406, 5407, 5408, 5409, 5410, 5411, 5412, 5413, 5414, 5415, 5416, 5417, 5418, 5419, 5420, 5421, 5422, 5423, 5424, 5425, 5426, 5427, 5428, 5429, 5430, 5431, 5432, 5433, 5434, 5435, 5436, 5437, 5438, 5439, 5440, 5441, 5442, 5443, 5444, 5445, 5446, 5447, 5448, 5449, 5450, 5451, 5452, 5453, 5454, 5455, 5456, 5457, 5458, 5459, 5460, 5461, 5462, 5463, 5464, 5465, 5466, 5467, 5468, 5469, 5470, 5471, 5472, 5473, 5474, 5475, 5476, 5477, 5478, 5479, 5480, 5481, 5482, 5483, 5484, 5485, 5486, 5487, 5488, 5489, 5490, 5491, 5492, 5493, 5494, 5495, 5496, 5497, 5498,
  5499, 5500, 5501, 5502, 5503, 5504, 5505, 5506, 5507, 5508, 5509, 5510, 5511, 5512, 5513, 5514, 5515, 5516, 5517, 5518, 5519, 5520, 5521, 5522, 5523, 5524, 5525, 5526, 5527, 5528, 5529, 5530, 5531, 5532, 5533, 5534, 5535, 5536, 5537, 5538, 5539, 5540, 5541, 5542, 5543, 5544, 5545, 5546, 5547, 5548, 5549, 5550, 5551, 5552, 5553, 5554, 5555, 5556, 5557, 5558, 5559, 5560, 5561, 5562, 5563, 5564, 5565, 5566, 5567, 5568, 5569, 5570, 5571, 5572, 5573, 5574, 5575, 5576, 5577, 5578, 5579, 5580, 5581, 5582, 5583, 5584, 5585, 5586, 5587, 5588, 5589, 5590, 5591, 5592, 5593, 5594, 5595, 5596, 5597, 5598, 5599, 5600, 5601, 5602, 5603, 5604, 5605, 5606, 5607, 5608, 5609, 5610, 5611, 5612, 5613, 5614, 5615, 5616, 5617, 5618, 5619, 5620, 5621, 5622, 5623, 5624, 5625, 5626, 5627, 5628, 5629, 5630, 5631, 5632, 5633, 5634, 5635, 5636, 5637, 5638, 5639, 5640, 5641, 5642, 5643, 5644, 5645, 5646, 5647, 5648, 5649, 5650, 5651, 5652, 5653, 5654, 5655, 5656, 5657, 5658, 5659, 5660, 5661, 5662, 5663, 5664, 
 5665, 5666, 5667, 5668, 5669, 5670, 5671, 5672, 5673, 5674, 5675, 5676, 5677, 5678, 5679, 5680, 5681, 5682, 5683, 5684, 5685, 5686, 5687, 5688, 5689, 5690, 5691, 5692, 5693, 5694, 5695, 5696, 5697, 5698, 5699, 5700, 5701, 5702, 5703, 5704, 5705, 5706, 5707, 5708, 5709, 5710, 5711, 5712, 5713, 5714, 5715, 5716, 5717, 5718, 5719, 5720, 5721, 5722, 5723, 5724, 5725, 5726, 5727, 5728, 5729, 5730, 5731, 5732, 5733, 5734, 5735, 5736, 5737, 5738, 5739, 5740, 5741, 5742, 5743, 5744, 5745, 5746, 5747, 5748, 5749, 5750, 5751, 5752, 5753, 5754, 5755, 5756, 5757, 5758, 5759, 5760, 5761, 5762, 5763, 5764, 5765, 5766, 5767, 5768, 5769, 5770, 5771, 5772, 5773, 5774, 5775, 5776, 5777, 5778, 5779, 5780, 5781, 5782, 5783, 5784, 5785, 5786, 5787, 5788, 5789, 5790, 5791, 5792, 5793, 5794, 5795, 5796, 5797, 5798, 5799, 5800, 5801, 5802, 5803, 5804, 5805, 5806, 5807, 5808, 5809, 5810, 5811, 5812, 5813, 5814, 5815, 5816, 5817, 5818, 5819, 5820, 5821, 5822, 5823, 5824, 5825, 5826, 5827, 5828, 5829, 5830, 5
 831, 5832, 5833, 5834, 5835, 5836, 5837, 5838, 5839, 5840, 5841, 5842, 5843, 5844, 5845, 5846, 5847, 5848, 5849, 5850, 5851, 5852, 5853, 5854, 5855, 5856, 5857, 5858, 5859, 5860, 5861, 5862, 5863, 5864, 5865, 5866, 5867, 5868, 5869, 5870, 5871, 5872, 5873, 5874, 5875, 5876, 5877, 5878, 5879, 5880, 5881, 5882, 5883, 5884, 5885, 5886, 5887, 5888, 5889, 5890, 5891, 5892, 5893, 5894, 5895, 5896, 5897, 5898, 5899, 5900, 5901, 5902, 5903, 5904, 5905, 5906, 5907, 5908, 5909, 5910, 5911, 5912, 5913, 5914, 5915, 5916, 5917, 5918, 5919, 5920, 5921, 5922, 5923, 5924, 5925, 5926, 5927, 5928, 5929, 5930, 5931, 5932, 5933, 5934, 5935, 5936, 5937, 5938, 5939, 5940, 5941, 5942, 5943, 5944, 5945, 5946, 5947, 5948, 5949, 5950, 5951, 5952, 5953, 5954, 5955, 5956, 5957, 5958, 5959, 5960, 5961, 5962, 5963, 5964, 5965, 5966, 5967, 5968, 5969, 5970, 5971, 5972, 5973, 5974, 5975, 5976, 5977, 5978, 5979, 5980, 5981, 5982, 5983, 5984, 5985, 5986, 5987, 5988, 5989, 5990, 5991, 5992, 5993, 5994, 5995, 5996, 59
 97, 5998, 5999, 6000


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org