You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/23 00:54:07 UTC

[GitHub] [kafka] hachikuji opened a new pull request #10183: KAFKA-12267; Implement `DescribeTransactions` API

hachikuji opened a new pull request #10183:
URL: https://github.com/apache/kafka/pull/10183


   This patch implements the `DescribeTransactions` API as documented in KIP-664: https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10183: KAFKA-12267; Implement `DescribeTransactions` API

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10183:
URL: https://github.com/apache/kafka/pull/10183#discussion_r582219028



##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##########
@@ -255,6 +256,47 @@ class TransactionCoordinator(brokerId: Int,
     }
   }
 
+  def handleDescribeTransactions(
+    transactionalId: String
+  ): DescribeTransactionsResponseData.TransactionState = {
+    val transactionState = new DescribeTransactionsResponseData.TransactionState()
+      .setTransactionalId(transactionalId)
+
+    if (!isActive.get()) {

Review comment:
       That's a fair point. The difference is that none of the other APIs are batched. I thought it was simpler to let the coordinator API work only with individual transactionalIds and leave the batching aspect to `KafkaApis`. The downside of having this potentially redundant check seems not too bad.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #10183: KAFKA-12267; Implement `DescribeTransactions` API

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #10183:
URL: https://github.com/apache/kafka/pull/10183


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] twmb commented on pull request #10183: KAFKA-12267; Implement `DescribeTransactions` API

Posted by GitBox <gi...@apache.org>.
twmb commented on pull request #10183:
URL: https://github.com/apache/kafka/pull/10183#issuecomment-785548579


   Can the KIP be updated to change TransactionState from an int8 to a string (as implemented in this PR)?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10183: KAFKA-12267; Implement `DescribeTransactions` API

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10183:
URL: https://github.com/apache/kafka/pull/10183#discussion_r582213713



##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##########
@@ -255,6 +256,47 @@ class TransactionCoordinator(brokerId: Int,
     }
   }
 
+  def handleDescribeTransactions(
+    transactionalId: String
+  ): DescribeTransactionsResponseData.TransactionState = {
+    val transactionState = new DescribeTransactionsResponseData.TransactionState()
+      .setTransactionalId(transactionalId)
+
+    if (!isActive.get()) {
+      transactionState.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code)
+    } else if (transactionalId == null || transactionalId.isEmpty) {

Review comment:
       I agree it seems inconsistent. The protocol definition does not allow it to be nullable, but I think I added the check because the method is one step removed from the request. Perhaps I can change it to raise `IllegalArgumentException` if the `transactionalId` is null.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10183: KAFKA-12267; Implement `DescribeTransactions` API

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10183:
URL: https://github.com/apache/kafka/pull/10183#discussion_r580710886



##########
File path: clients/src/main/resources/common/message/DescribeTransactionsResponse.json
##########
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 65,
+  "type": "response",
+  "name": "DescribeTransactionsResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+      { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
+        "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
+      { "name": "TransactionStates", "type": "[]TransactionState", "versions": "0+", "fields": [
+        { "name": "ErrorCode", "type": "int16", "versions": "0+" },
+        { "name": "TransactionalId", "type": "string", "versions": "0+" },

Review comment:
       we can add `"entityType": "transactionalId"` to this field.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10183: KAFKA-12267; Implement `DescribeTransactions` API

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10183:
URL: https://github.com/apache/kafka/pull/10183#discussion_r581238657



##########
File path: clients/src/main/resources/common/message/DescribeProducersResponse.json
##########
@@ -35,7 +35,7 @@
         { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
           "about": "The partition error message, which may be null if no additional details are available" },
         { "name": "ActiveProducers", "type": "[]ProducerState", "versions": "0+", "fields": [
-          { "name": "ProducerId", "type": "int64", "versions": "0+" },
+          { "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId" },
           { "name": "ProducerEpoch", "type": "int32", "versions": "0+" },

Review comment:
       The type should be `int16`. That is what we use in other APIs (e.g. `AddPartitionsToTxn` and `EndTxn`). It's been a while since I wrote this, so I'm not sure what I was thinking. Perhaps I was anticipating expansion of this field. It is probably better to be consistent since we can always bump the protocol in the future if needed. I will fix it here and consider a separate patch for `DescribeProducers`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10183: KAFKA-12267; Implement `DescribeTransactions` API

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10183:
URL: https://github.com/apache/kafka/pull/10183#discussion_r581237146



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -3271,6 +3272,34 @@ class KafkaApis(val requestChannel: RequestChannel,
       "Apache ZooKeeper mode.")
   }
 
+  def handleDescribeTransactionsRequest(request: RequestChannel.Request): Unit = {
+    val describeTransactionsRequest = request.body[DescribeTransactionsRequest]
+    val response = new DescribeTransactionsResponseData()
+
+    describeTransactionsRequest.data.transactionalIds.forEach { transactionalId =>
+      val transactionState = if (!authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, transactionalId)) {
+        new DescribeTransactionsResponseData.TransactionState()
+          .setTransactionalId(transactionalId)
+          .setErrorCode(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.code)
+      } else {
+        txnCoordinator.handleDescribeTransactions(transactionalId)
+      }
+
+      // Include only partitions which the principal is authorized to describe

Review comment:
       Got it. Thanks for this nice explanation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] twmb edited a comment on pull request #10183: KAFKA-12267; Implement `DescribeTransactions` API

Posted by GitBox <gi...@apache.org>.
twmb edited a comment on pull request #10183:
URL: https://github.com/apache/kafka/pull/10183#issuecomment-785548579


   Can the KIP be updated to change TransactionState from an int8 to a string (as implemented in this PR)?
   
   Alternatively, is there value in using a string vs. an int8? The string is more descriptive, but since the states are a defined enum, it'd be smaller to serialize/deserialize the int8 and then a client can perform the int8 => string conversion.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] twmb commented on pull request #10183: KAFKA-12267; Implement `DescribeTransactions` API

Posted by GitBox <gi...@apache.org>.
twmb commented on pull request #10183:
URL: https://github.com/apache/kafka/pull/10183#issuecomment-785551804


   Also is there a reason not to include the `LastUpdateTimestamp`? It seems pretty valuable to know how long ago the transaction state was modified.
   
   I'm not sure how valuable LastProducerId or LastProducerEpoch would be, but I know those are available too, although (I think) those are in-memory only values used only for allowing retries of transactional calls.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #10183: KAFKA-12267; Implement `DescribeTransactions` API

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #10183:
URL: https://github.com/apache/kafka/pull/10183#issuecomment-783868752


   @dengziming Good call. I've added a few test cases. I think we were missing authorization of the topics included in the response. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10183: KAFKA-12267; Implement `DescribeTransactions` API

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10183:
URL: https://github.com/apache/kafka/pull/10183#discussion_r581574401



##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##########
@@ -255,6 +256,47 @@ class TransactionCoordinator(brokerId: Int,
     }
   }
 
+  def handleDescribeTransactions(
+    transactionalId: String
+  ): DescribeTransactionsResponseData.TransactionState = {
+    val transactionState = new DescribeTransactionsResponseData.TransactionState()
+      .setTransactionalId(transactionalId)
+
+    if (!isActive.get()) {
+      transactionState.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code)
+    } else if (transactionalId == null || transactionalId.isEmpty) {

Review comment:
       Is `transactionalId` nullable? If so, setting null to `DescribeTransactionsResponseData.TransactionState` can produce NPE when serializing (see line#263)

##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##########
@@ -255,6 +256,47 @@ class TransactionCoordinator(brokerId: Int,
     }
   }
 
+  def handleDescribeTransactions(
+    transactionalId: String
+  ): DescribeTransactionsResponseData.TransactionState = {
+    val transactionState = new DescribeTransactionsResponseData.TransactionState()
+      .setTransactionalId(transactionalId)
+
+    if (!isActive.get()) {

Review comment:
       Should we check `active` before looping all transaction ids? The other APIs depending on zk controller check 'active' once for whole request.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10183: KAFKA-12267; Implement `DescribeTransactions` API

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10183:
URL: https://github.com/apache/kafka/pull/10183#discussion_r580779375



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/DescribeTransactionsRequest.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.DescribeTransactionsRequestData;
+import org.apache.kafka.common.message.DescribeTransactionsResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+
+public class DescribeTransactionsRequest extends AbstractRequest {
+    public static class Builder extends AbstractRequest.Builder<DescribeTransactionsRequest> {
+        public final DescribeTransactionsRequestData data;
+
+        public Builder(DescribeTransactionsRequestData data) {
+            super(ApiKeys.DESCRIBE_TRANSACTIONS);
+            this.data = data;
+        }
+
+        @Override
+        public DescribeTransactionsRequest build(short version) {
+            return new DescribeTransactionsRequest(data, version);
+        }
+
+        @Override
+        public String toString() {
+            return data.toString();
+        }
+    }
+
+    private final DescribeTransactionsRequestData data;
+
+    private DescribeTransactionsRequest(DescribeTransactionsRequestData data, short version) {
+        super(ApiKeys.DESCRIBE_TRANSACTIONS, version);
+        this.data = data;
+    }
+
+    @Override
+    public DescribeTransactionsRequestData data() {
+        return data;
+    }
+
+    @Override
+    public DescribeTransactionsResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        Errors error = Errors.forException(e);
+        DescribeTransactionsResponseData response = new DescribeTransactionsResponseData();

Review comment:
       the `throttleTimeMs` is not added to response

##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##########
@@ -255,6 +258,45 @@ class TransactionCoordinator(brokerId: Int,
     }
   }
 
+  def handleDescribeTransactions(
+    transactionalId: String
+  ): DescribeTransactionsResponseData.TransactionState = {
+    val transactionState = new DescribeTransactionsResponseData.TransactionState()
+      .setTransactionalId(transactionalId)
+
+    if (!isActive.get()) {
+      transactionState.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code)
+    } else if (transactionalId == null || transactionalId.isEmpty) {
+      transactionState.setErrorCode(Errors.INVALID_REQUEST.code)
+    } else {
+      txnManager.getTransactionState(transactionalId) match {
+        case Left(error) =>
+          transactionState.setErrorCode(error.code)
+        case Right(None) =>
+          transactionState.setErrorCode(Errors.TRANSACTIONAL_ID_NOT_FOUND.code)
+        case Right(Some(coordinatorEpochAndMetadata)) =>
+          val txnMetadata = coordinatorEpochAndMetadata.transactionMetadata
+          txnMetadata.inLock {
+            val partitionsByTopic = CollectionUtils.groupPartitionsByTopic(txnMetadata.topicPartitions.asJava)
+            partitionsByTopic.forEach { (topic, partitions) =>
+              val topicData = new DescribeTransactionsResponseData.TopicData()
+                .setTopic(topic)
+                .setPartitions(partitions)
+              transactionState.topics.add(topicData)
+            }
+
+            transactionState
+              .setErrorCode(Errors.NONE.code)
+              .setProducerId(txnMetadata.producerId)
+              .setProducerEpoch(txnMetadata.producerEpoch)
+              .setTransactionState(txnMetadata.state.toString)

Review comment:
       As it is a part of serialized data, should we add constant string to those enums instead of calling `toString`?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -3271,6 +3272,34 @@ class KafkaApis(val requestChannel: RequestChannel,
       "Apache ZooKeeper mode.")
   }
 
+  def handleDescribeTransactionsRequest(request: RequestChannel.Request): Unit = {
+    val describeTransactionsRequest = request.body[DescribeTransactionsRequest]
+    val response = new DescribeTransactionsResponseData()
+
+    describeTransactionsRequest.data.transactionalIds.forEach { transactionalId =>
+      val transactionState = if (!authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, transactionalId)) {
+        new DescribeTransactionsResponseData.TransactionState()
+          .setTransactionalId(transactionalId)
+          .setErrorCode(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.code)
+      } else {
+        txnCoordinator.handleDescribeTransactions(transactionalId)
+      }
+
+      // Include only partitions which the principal is authorized to describe

Review comment:
       Why not converting to `TOPIC_AUTHORIZATION_FAILED`? Also, the data excluding non-authorized topics is a bit weird as it is in-completed result to callers.

##########
File path: core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##########
@@ -255,6 +258,45 @@ class TransactionCoordinator(brokerId: Int,
     }
   }
 
+  def handleDescribeTransactions(
+    transactionalId: String
+  ): DescribeTransactionsResponseData.TransactionState = {
+    val transactionState = new DescribeTransactionsResponseData.TransactionState()
+      .setTransactionalId(transactionalId)
+
+    if (!isActive.get()) {
+      transactionState.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code)
+    } else if (transactionalId == null || transactionalId.isEmpty) {
+      transactionState.setErrorCode(Errors.INVALID_REQUEST.code)
+    } else {
+      txnManager.getTransactionState(transactionalId) match {
+        case Left(error) =>
+          transactionState.setErrorCode(error.code)
+        case Right(None) =>
+          transactionState.setErrorCode(Errors.TRANSACTIONAL_ID_NOT_FOUND.code)
+        case Right(Some(coordinatorEpochAndMetadata)) =>
+          val txnMetadata = coordinatorEpochAndMetadata.transactionMetadata
+          txnMetadata.inLock {
+            val partitionsByTopic = CollectionUtils.groupPartitionsByTopic(txnMetadata.topicPartitions.asJava)

Review comment:
       How about using `mapKey` to eliminate this re-group collection?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10183: KAFKA-12267; Implement `DescribeTransactions` API

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10183:
URL: https://github.com/apache/kafka/pull/10183#discussion_r581233455



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -3271,6 +3272,34 @@ class KafkaApis(val requestChannel: RequestChannel,
       "Apache ZooKeeper mode.")
   }
 
+  def handleDescribeTransactionsRequest(request: RequestChannel.Request): Unit = {
+    val describeTransactionsRequest = request.body[DescribeTransactionsRequest]
+    val response = new DescribeTransactionsResponseData()
+
+    describeTransactionsRequest.data.transactionalIds.forEach { transactionalId =>
+      val transactionState = if (!authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, transactionalId)) {
+        new DescribeTransactionsResponseData.TransactionState()
+          .setTransactionalId(transactionalId)
+          .setErrorCode(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.code)
+      } else {
+        txnCoordinator.handleDescribeTransactions(transactionalId)
+      }
+
+      // Include only partitions which the principal is authorized to describe

Review comment:
       The idea is to avoid exposing topic existence to unauthorized principals. We do the same thing in `Metadata` for example. I agree it is a little weird and I debated it for a little while. Should describe authorization on a transactionalId automatically imply describe authorization on all of the topics that it is writing to? A similar case is `OffsetFetch`: should describe authorization on the groupId imply describe authorization for all topics? We said "no" in that case, so I decided to be consistent.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10183: KAFKA-12267; Implement `DescribeTransactions` API

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10183:
URL: https://github.com/apache/kafka/pull/10183#discussion_r580788136



##########
File path: clients/src/main/resources/common/message/DescribeProducersResponse.json
##########
@@ -35,7 +35,7 @@
         { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
           "about": "The partition error message, which may be null if no additional details are available" },
         { "name": "ActiveProducers", "type": "[]ProducerState", "versions": "0+", "fields": [
-          { "name": "ProducerId", "type": "int64", "versions": "0+" },
+          { "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId" },
           { "name": "ProducerEpoch", "type": "int32", "versions": "0+" },

Review comment:
       The type confuses me. `DescribeProducersResponse` and this protocol use `int64` but other protocols choose `int32`. Also, the type of `TransactionMetadata#producerEpoch` is `short` rather than `integer`. Which one is correct?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org