You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "jeffkbkim (via GitHub)" <gi...@apache.org> on 2023/02/14 01:05:37 UTC

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

jeffkbkim commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1105130904


##########
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##########
@@ -35,21 +44,43 @@ public class AddPartitionsToTxnRequest extends AbstractRequest {
     private final AddPartitionsToTxnRequestData data;
 
     private List<TopicPartition> cachedPartitions = null;
+    
+    private Map<String, List<TopicPartition>> cachedPartitionsByTransaction = null;
+
+    private final short version;
 
     public static class Builder extends AbstractRequest.Builder<AddPartitionsToTxnRequest> {
         public final AddPartitionsToTxnRequestData data;
+        public final boolean isClientRequest;
 
-        public Builder(final AddPartitionsToTxnRequestData data) {
+        // Only used for versions < 4
+        public Builder(String transactionalId,
+                       long producerId,
+                       short producerEpoch,
+                       List<TopicPartition> partitions) {
             super(ApiKeys.ADD_PARTITIONS_TO_TXN);
-            this.data = data;
+            this.isClientRequest = true;
+
+            AddPartitionsToTxnTopicCollection topics = compileTopics(partitions);
+
+            this.data = new AddPartitionsToTxnRequestData()
+                    .setTransactionalId(transactionalId)
+                    .setProducerId(producerId)
+                    .setProducerEpoch(producerEpoch)
+                    .setTopics(topics);
         }
 
-        public Builder(final String transactionalId,
-                       final long producerId,
-                       final short producerEpoch,
-                       final List<TopicPartition> partitions) {
+        public Builder(AddPartitionsToTxnTransactionCollection transactions,
+                       boolean verifyOnly) {
             super(ApiKeys.ADD_PARTITIONS_TO_TXN);
+            this.isClientRequest = false;
 
+            this.data = new AddPartitionsToTxnRequestData()
+                    .setTransactions(transactions)

Review Comment:
   same on the indentation



##########
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##########
@@ -35,21 +44,43 @@ public class AddPartitionsToTxnRequest extends AbstractRequest {
     private final AddPartitionsToTxnRequestData data;
 
     private List<TopicPartition> cachedPartitions = null;
+    
+    private Map<String, List<TopicPartition>> cachedPartitionsByTransaction = null;
+
+    private final short version;
 
     public static class Builder extends AbstractRequest.Builder<AddPartitionsToTxnRequest> {
         public final AddPartitionsToTxnRequestData data;
+        public final boolean isClientRequest;
 
-        public Builder(final AddPartitionsToTxnRequestData data) {
+        // Only used for versions < 4
+        public Builder(String transactionalId,
+                       long producerId,
+                       short producerEpoch,
+                       List<TopicPartition> partitions) {
             super(ApiKeys.ADD_PARTITIONS_TO_TXN);
-            this.data = data;
+            this.isClientRequest = true;
+
+            AddPartitionsToTxnTopicCollection topics = compileTopics(partitions);
+
+            this.data = new AddPartitionsToTxnRequestData()
+                    .setTransactionalId(transactionalId)
+                    .setProducerId(producerId)
+                    .setProducerEpoch(producerEpoch)
+                    .setTopics(topics);

Review Comment:
   the indentation looks off here. i think intellij does this to me too



##########
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##########
@@ -101,15 +130,61 @@ public String toString() {
     public AddPartitionsToTxnRequest(final AddPartitionsToTxnRequestData data, short version) {
         super(ApiKeys.ADD_PARTITIONS_TO_TXN, version);
         this.data = data;
+        this.version = version;
     }
-
+    
+    // Only used for versions < 4
     public List<TopicPartition> partitions() {
         if (cachedPartitions != null) {
             return cachedPartitions;
         }
         cachedPartitions = Builder.getPartitions(data);
         return cachedPartitions;
     }
+    
+    private List<TopicPartition> partitionsForTransaction(String transaction) {
+        if (cachedPartitionsByTransaction == null) {
+            cachedPartitionsByTransaction = new HashMap<>();
+        }
+        
+        return cachedPartitionsByTransaction.computeIfAbsent(transaction, txn -> {
+            List<TopicPartition> partitions = new ArrayList<>();
+            for (AddPartitionsToTxnTopic topicCollection : data.transactions().find(txn).topics()) {
+                for (Integer partition : topicCollection.partitions()) {
+                    partitions.add(new TopicPartition(topicCollection.name(), partition));
+                }
+            }
+            return partitions;
+        });
+    }
+    
+    public Map<String, List<TopicPartition>> partitionsByTransaction() {
+        if (cachedPartitionsByTransaction != null && cachedPartitionsByTransaction.size() == data.transactions().size()) {

Review Comment:
   can there be any case where the two maps have the same size but store different items?



##########
clients/src/test/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponseTest.java:
##########
@@ -84,16 +88,59 @@ public void testParse() {
 
         topicCollection.add(topicResult);
 
-        AddPartitionsToTxnResponseData data = new AddPartitionsToTxnResponseData()
-                                                  .setResults(topicCollection)
-                                                  .setThrottleTimeMs(throttleTimeMs);
-        AddPartitionsToTxnResponse response = new AddPartitionsToTxnResponse(data);
-
         for (short version : ApiKeys.ADD_PARTITIONS_TO_TXN.allVersions()) {

Review Comment:
   can we use the parameterized test for the api versions?



##########
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java:
##########
@@ -49,28 +52,37 @@ public class AddPartitionsToTxnResponse extends AbstractResponse {
     private final AddPartitionsToTxnResponseData data;
 
     private Map<TopicPartition, Errors> cachedErrorsMap = null;
+    
+    private Map<String, Map<TopicPartition, Errors>> cachedAllErrorsMap = null;
 
     public AddPartitionsToTxnResponse(AddPartitionsToTxnResponseData data) {
         super(ApiKeys.ADD_PARTITIONS_TO_TXN);
         this.data = data;
     }
 
+    // Only used for versions < 4
     public AddPartitionsToTxnResponse(int throttleTimeMs, Map<TopicPartition, Errors> errors) {
         super(ApiKeys.ADD_PARTITIONS_TO_TXN);
 
+        this.data = new AddPartitionsToTxnResponseData()
+                .setThrottleTimeMs(throttleTimeMs)
+                .setResults(topicCollectionForErrors(errors));
+    }
+    
+    private static AddPartitionsToTxnTopicResultCollection topicCollectionForErrors(Map<TopicPartition, Errors> errors) {
         Map<String, AddPartitionsToTxnPartitionResultCollection> resultMap = new HashMap<>();
-
+        

Review Comment:
   nit: maybe just a newline?



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##########
@@ -352,7 +353,12 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
               // this is an optimization: if the partitions are already in the metadata reply OK immediately
               Left(Errors.NONE)
             } else {
-              Right(coordinatorEpoch, txnMetadata.prepareAddPartitions(partitions.toSet, time.milliseconds()))
+              // If verifyOnly, we should have returned in the step above. If we didn't the partitions are not present in the transaction.

Review Comment:
   does this comment meant that if the request was to verifyOnly, then we should have returned with Left(Errors.NONE) because there should have been a matching transaction with all of the partitions?



##########
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##########
@@ -35,21 +44,43 @@ public class AddPartitionsToTxnRequest extends AbstractRequest {
     private final AddPartitionsToTxnRequestData data;
 
     private List<TopicPartition> cachedPartitions = null;
+    
+    private Map<String, List<TopicPartition>> cachedPartitionsByTransaction = null;
+
+    private final short version;
 
     public static class Builder extends AbstractRequest.Builder<AddPartitionsToTxnRequest> {
         public final AddPartitionsToTxnRequestData data;
+        public final boolean isClientRequest;
 
-        public Builder(final AddPartitionsToTxnRequestData data) {
+        // Only used for versions < 4

Review Comment:
   does this mean this constructor is only used for less than 4? how are we enforcing this?



##########
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##########
@@ -101,15 +130,61 @@ public String toString() {
     public AddPartitionsToTxnRequest(final AddPartitionsToTxnRequestData data, short version) {
         super(ApiKeys.ADD_PARTITIONS_TO_TXN, version);
         this.data = data;
+        this.version = version;
     }
-
+    
+    // Only used for versions < 4
     public List<TopicPartition> partitions() {
         if (cachedPartitions != null) {
             return cachedPartitions;
         }
         cachedPartitions = Builder.getPartitions(data);
         return cachedPartitions;
     }
+    
+    private List<TopicPartition> partitionsForTransaction(String transaction) {

Review Comment:
   are we using this return type anywhere?



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2383,68 +2384,101 @@ class KafkaApis(val requestChannel: RequestChannel,
     if (config.interBrokerProtocolVersion.isLessThan(version))
       throw new UnsupportedVersionException(s"inter.broker.protocol.version: ${config.interBrokerProtocolVersion.version} is less than the required version: ${version.version}")
   }
-
-  def handleAddPartitionToTxnRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
+  def handleAddPartitionsToTxnRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
     ensureInterBrokerVersion(IBP_0_11_0_IV0)
-    val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest]
-    val transactionalId = addPartitionsToTxnRequest.data.transactionalId
-    val partitionsToAdd = addPartitionsToTxnRequest.partitions.asScala
-    if (!authHelper.authorize(request.context, WRITE, TRANSACTIONAL_ID, transactionalId))
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-        addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception))
-    else {
-      val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-      val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-      val authorizedPartitions = mutable.Set[TopicPartition]()
-
-      val authorizedTopics = authHelper.filterByAuthorized(request.context, WRITE, TOPIC,
-        partitionsToAdd.filterNot(tp => Topic.isInternal(tp.topic)))(_.topic)
-      for (topicPartition <- partitionsToAdd) {
-        if (!authorizedTopics.contains(topicPartition.topic))
-          unauthorizedTopicErrors += topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED
-        else if (!metadataCache.contains(topicPartition))
-          nonExistingTopicErrors += topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION
-        else
-          authorizedPartitions.add(topicPartition)
+    val lock = new Object
+    val addPartitionsToTxnRequest = if (request.context.apiVersion() < 4) request.body[AddPartitionsToTxnRequest].normalizeRequest() else request.body[AddPartitionsToTxnRequest]
+    val version = addPartitionsToTxnRequest.version
+    val responses = new AddPartitionsToTxnResultCollection()
+    val partitionsByTransaction = addPartitionsToTxnRequest.partitionsByTransaction()
+    
+    // Newer versions of the request should only come from other brokers.
+    if (version >= 4) authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
+
+    // V4 requests introduced batches of transactions. We need all transactions to be handled before sending the 
+    // response so there are a few differences in handling errors and sending responses.
+    def createResponse(requestThrottleMs: Int): AbstractResponse = {
+      if (version < 4) {
+        // There will only be one response in data. Add it to the response data object.
+        val data = new AddPartitionsToTxnResponseData()
+        responses.forEach(result => {
+          data.setResults(result.topicResults())
+          data.setThrottleTimeMs(requestThrottleMs)
+        })
+        new AddPartitionsToTxnResponse(data)
+      } else {
+        new AddPartitionsToTxnResponse(new AddPartitionsToTxnResponseData().setThrottleTimeMs(requestThrottleMs).setResultsByTransaction(responses))
       }
+    }
 
-      if (unauthorizedTopicErrors.nonEmpty || nonExistingTopicErrors.nonEmpty) {
-        // Any failed partition check causes the entire request to fail. We send the appropriate error codes for the
-        // partitions which failed, and an 'OPERATION_NOT_ATTEMPTED' error code for the partitions which succeeded
-        // the authorization check to indicate that they were not added to the transaction.
-        val partitionErrors = unauthorizedTopicErrors ++ nonExistingTopicErrors ++
-          authorizedPartitions.map(_ -> Errors.OPERATION_NOT_ATTEMPTED)
-        requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-          new AddPartitionsToTxnResponse(requestThrottleMs, partitionErrors.asJava))
+    val txns = addPartitionsToTxnRequest.data.transactions
+    def maybeSendResponse(): Unit = {
+      lock synchronized {
+        if (responses.size() == txns.size()) {
+          requestHelper.sendResponseMaybeThrottle(request, createResponse)

Review Comment:
   do we need to hold the lock when sending the response?



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2383,68 +2384,101 @@ class KafkaApis(val requestChannel: RequestChannel,
     if (config.interBrokerProtocolVersion.isLessThan(version))
       throw new UnsupportedVersionException(s"inter.broker.protocol.version: ${config.interBrokerProtocolVersion.version} is less than the required version: ${version.version}")
   }
-
-  def handleAddPartitionToTxnRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
+  def handleAddPartitionsToTxnRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
     ensureInterBrokerVersion(IBP_0_11_0_IV0)
-    val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest]
-    val transactionalId = addPartitionsToTxnRequest.data.transactionalId
-    val partitionsToAdd = addPartitionsToTxnRequest.partitions.asScala
-    if (!authHelper.authorize(request.context, WRITE, TRANSACTIONAL_ID, transactionalId))
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-        addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception))
-    else {
-      val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-      val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-      val authorizedPartitions = mutable.Set[TopicPartition]()
-
-      val authorizedTopics = authHelper.filterByAuthorized(request.context, WRITE, TOPIC,
-        partitionsToAdd.filterNot(tp => Topic.isInternal(tp.topic)))(_.topic)
-      for (topicPartition <- partitionsToAdd) {
-        if (!authorizedTopics.contains(topicPartition.topic))
-          unauthorizedTopicErrors += topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED
-        else if (!metadataCache.contains(topicPartition))
-          nonExistingTopicErrors += topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION
-        else
-          authorizedPartitions.add(topicPartition)
+    val lock = new Object
+    val addPartitionsToTxnRequest = if (request.context.apiVersion() < 4) request.body[AddPartitionsToTxnRequest].normalizeRequest() else request.body[AddPartitionsToTxnRequest]

Review Comment:
   nit: i think we can separate the if else here



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2383,68 +2384,101 @@ class KafkaApis(val requestChannel: RequestChannel,
     if (config.interBrokerProtocolVersion.isLessThan(version))
       throw new UnsupportedVersionException(s"inter.broker.protocol.version: ${config.interBrokerProtocolVersion.version} is less than the required version: ${version.version}")
   }
-
-  def handleAddPartitionToTxnRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
+  def handleAddPartitionsToTxnRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
     ensureInterBrokerVersion(IBP_0_11_0_IV0)
-    val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest]
-    val transactionalId = addPartitionsToTxnRequest.data.transactionalId
-    val partitionsToAdd = addPartitionsToTxnRequest.partitions.asScala
-    if (!authHelper.authorize(request.context, WRITE, TRANSACTIONAL_ID, transactionalId))
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-        addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception))
-    else {
-      val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-      val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-      val authorizedPartitions = mutable.Set[TopicPartition]()
-
-      val authorizedTopics = authHelper.filterByAuthorized(request.context, WRITE, TOPIC,
-        partitionsToAdd.filterNot(tp => Topic.isInternal(tp.topic)))(_.topic)
-      for (topicPartition <- partitionsToAdd) {
-        if (!authorizedTopics.contains(topicPartition.topic))
-          unauthorizedTopicErrors += topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED
-        else if (!metadataCache.contains(topicPartition))
-          nonExistingTopicErrors += topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION
-        else
-          authorizedPartitions.add(topicPartition)
+    val lock = new Object
+    val addPartitionsToTxnRequest = if (request.context.apiVersion() < 4) request.body[AddPartitionsToTxnRequest].normalizeRequest() else request.body[AddPartitionsToTxnRequest]
+    val version = addPartitionsToTxnRequest.version
+    val responses = new AddPartitionsToTxnResultCollection()
+    val partitionsByTransaction = addPartitionsToTxnRequest.partitionsByTransaction()
+    
+    // Newer versions of the request should only come from other brokers.
+    if (version >= 4) authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
+
+    // V4 requests introduced batches of transactions. We need all transactions to be handled before sending the 
+    // response so there are a few differences in handling errors and sending responses.
+    def createResponse(requestThrottleMs: Int): AbstractResponse = {
+      if (version < 4) {
+        // There will only be one response in data. Add it to the response data object.
+        val data = new AddPartitionsToTxnResponseData()
+        responses.forEach(result => {
+          data.setResults(result.topicResults())
+          data.setThrottleTimeMs(requestThrottleMs)
+        })
+        new AddPartitionsToTxnResponse(data)
+      } else {
+        new AddPartitionsToTxnResponse(new AddPartitionsToTxnResponseData().setThrottleTimeMs(requestThrottleMs).setResultsByTransaction(responses))
       }
+    }
 
-      if (unauthorizedTopicErrors.nonEmpty || nonExistingTopicErrors.nonEmpty) {
-        // Any failed partition check causes the entire request to fail. We send the appropriate error codes for the
-        // partitions which failed, and an 'OPERATION_NOT_ATTEMPTED' error code for the partitions which succeeded
-        // the authorization check to indicate that they were not added to the transaction.
-        val partitionErrors = unauthorizedTopicErrors ++ nonExistingTopicErrors ++
-          authorizedPartitions.map(_ -> Errors.OPERATION_NOT_ATTEMPTED)
-        requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-          new AddPartitionsToTxnResponse(requestThrottleMs, partitionErrors.asJava))
+    val txns = addPartitionsToTxnRequest.data.transactions
+    def maybeSendResponse(): Unit = {
+      lock synchronized {
+        if (responses.size() == txns.size()) {
+          requestHelper.sendResponseMaybeThrottle(request, createResponse)
+        }
+      }
+    }
+
+    txns.forEach( transaction => {
+      val transactionalId = transaction.transactionalId()

Review Comment:
   nit: parentheses



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2383,68 +2384,101 @@ class KafkaApis(val requestChannel: RequestChannel,
     if (config.interBrokerProtocolVersion.isLessThan(version))
       throw new UnsupportedVersionException(s"inter.broker.protocol.version: ${config.interBrokerProtocolVersion.version} is less than the required version: ${version.version}")
   }
-
-  def handleAddPartitionToTxnRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
+  def handleAddPartitionsToTxnRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
     ensureInterBrokerVersion(IBP_0_11_0_IV0)
-    val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest]
-    val transactionalId = addPartitionsToTxnRequest.data.transactionalId
-    val partitionsToAdd = addPartitionsToTxnRequest.partitions.asScala
-    if (!authHelper.authorize(request.context, WRITE, TRANSACTIONAL_ID, transactionalId))
-      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-        addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception))
-    else {
-      val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-      val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-      val authorizedPartitions = mutable.Set[TopicPartition]()
-
-      val authorizedTopics = authHelper.filterByAuthorized(request.context, WRITE, TOPIC,
-        partitionsToAdd.filterNot(tp => Topic.isInternal(tp.topic)))(_.topic)
-      for (topicPartition <- partitionsToAdd) {
-        if (!authorizedTopics.contains(topicPartition.topic))
-          unauthorizedTopicErrors += topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED
-        else if (!metadataCache.contains(topicPartition))
-          nonExistingTopicErrors += topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION
-        else
-          authorizedPartitions.add(topicPartition)
+    val lock = new Object

Review Comment:
   any reason we don't use `val responses` as the synchronization object?



##########
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala:
##########
@@ -302,17 +302,42 @@ class TransactionCoordinatorTest {
       any()
     )
   }
-
+  

Review Comment:
   nit: indentation



##########
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##########
@@ -66,24 +97,22 @@ public Builder(final String transactionalId,
             AddPartitionsToTxnTopicCollection topics = new AddPartitionsToTxnTopicCollection();
             for (Map.Entry<String, List<Integer>> partitionEntry : partitionMap.entrySet()) {
                 topics.add(new AddPartitionsToTxnTopic()
-                               .setName(partitionEntry.getKey())
-                               .setPartitions(partitionEntry.getValue()));
+                        .setName(partitionEntry.getKey())
+                        .setPartitions(partitionEntry.getValue()));

Review Comment:
   i'm seeing a bunch of places with double indentation - not sure if it's intentional or it's done by your editor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org