You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/04/06 01:55:07 UTC

[GitHub] [kafka] hachikuji opened a new pull request #10483: KAFKA-12586; Add `DescribeTransactions` Admin API

hachikuji opened a new pull request #10483:
URL: https://github.com/apache/kafka/pull/10483


   This patch contains the `Admin` implementation of the `DescribeTransactions` APIs described in KIP-664: KIP-664: https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10483: KAFKA-12586; Add `DescribeTransactions` Admin API

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10483:
URL: https://github.com/apache/kafka/pull/10483#discussion_r610956477



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandler.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.admin.TransactionDescription;
+import org.apache.kafka.clients.admin.TransactionState;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
+import org.apache.kafka.common.errors.TransactionalIdNotFoundException;
+import org.apache.kafka.common.message.DescribeTransactionsRequestData;
+import org.apache.kafka.common.message.DescribeTransactionsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.DescribeTransactionsRequest;
+import org.apache.kafka.common.requests.DescribeTransactionsResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class DescribeTransactionsHandler implements AdminApiHandler<CoordinatorKey, TransactionDescription> {
+    private final LogContext logContext;
+    private final Logger log;
+    private final Set<CoordinatorKey> keys;
+
+    public DescribeTransactionsHandler(
+        Collection<String> transactionalIds,
+        LogContext logContext
+    ) {
+        this.keys = buildKeySet(transactionalIds);
+        this.log = logContext.logger(DescribeTransactionsHandler.class);
+        this.logContext = logContext;
+    }
+
+    private static Set<CoordinatorKey> buildKeySet(Collection<String> transactionalIds) {
+        return transactionalIds.stream()
+            .map(DescribeTransactionsHandler::asCoordinatorKey)
+            .collect(Collectors.toSet());
+    }
+
+    @Override
+    public String apiName() {
+        return "describeTransactions";
+    }
+
+    @Override
+    public Keys<CoordinatorKey> initializeKeys() {
+        return Keys.dynamicMapped(keys, new CoordinatorStrategy(logContext));
+    }
+
+    @Override
+    public DescribeTransactionsRequest.Builder buildRequest(
+        Integer brokerId,
+        Set<CoordinatorKey> keys
+    ) {
+        DescribeTransactionsRequestData request = new DescribeTransactionsRequestData();
+        List<String> transactionalIds = keys.stream().map(key -> key.idValue).collect(Collectors.toList());
+        request.setTransactionalIds(transactionalIds);
+        return new DescribeTransactionsRequest.Builder(request);
+    }
+
+    @Override
+    public ApiResult<CoordinatorKey, TransactionDescription> handleResponse(
+        Integer brokerId,
+        Set<CoordinatorKey> keys,
+        AbstractResponse abstractResponse
+    ) {
+        DescribeTransactionsResponse response = (DescribeTransactionsResponse) abstractResponse;
+        Map<CoordinatorKey, TransactionDescription> completed = new HashMap<>();
+        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+        List<CoordinatorKey> unmapped = new ArrayList<>();
+
+        for (DescribeTransactionsResponseData.TransactionState transactionState : response.data().transactionStates()) {
+            CoordinatorKey transactionalIdKey = asCoordinatorKey(transactionState.transactionalId());
+            Errors error = Errors.forCode(transactionState.errorCode());
+
+            if (error != Errors.NONE) {
+                handleError(transactionalIdKey, error, failed, unmapped);
+                continue;
+            }
+
+            OptionalLong transactionStartTimeMs = transactionState.transactionStartTimeMs() < 0 ?
+                OptionalLong.empty() :
+                OptionalLong.of(transactionState.transactionStartTimeMs());
+
+            completed.put(transactionalIdKey, new TransactionDescription(
+                brokerId,
+                TransactionState.parse(transactionState.transactionState()),
+                transactionState.producerId(),
+                transactionState.producerEpoch(),
+                transactionState.transactionTimeoutMs(),
+                transactionStartTimeMs,
+                collectTopicPartitions(transactionState)
+            ));
+        }
+
+        return new ApiResult<>(completed, failed, unmapped);
+    }
+
+    private Set<TopicPartition> collectTopicPartitions(
+        DescribeTransactionsResponseData.TransactionState transactionState
+    ) {
+        Set<TopicPartition> res = new HashSet<>();
+        for (DescribeTransactionsResponseData.TopicData topicData : transactionState.topics()) {
+            String topic = topicData.topic();
+            for (Integer partitionId : topicData.partitions()) {
+                res.add(new TopicPartition(topic, partitionId));
+            }
+        }
+        return res;
+    }
+
+    public static CoordinatorKey asCoordinatorKey(String transactionalId) {
+        return new CoordinatorKey(transactionalId, FindCoordinatorRequest.CoordinatorType.TRANSACTION);
+    }
+
+    private void handleError(
+        CoordinatorKey transactionalIdKey,
+        Errors error,
+        Map<CoordinatorKey, Throwable> failed,
+        List<CoordinatorKey> unmapped
+    ) {
+        switch (error) {
+            case TRANSACTIONAL_ID_AUTHORIZATION_FAILED:
+                failed.put(transactionalIdKey, new TransactionalIdAuthorizationException(
+                    "DescribeTransactions request for transactionalId `" + transactionalIdKey.idValue + "` " +
+                        "failed due to authorization failure"));
+                break;
+
+            case TRANSACTIONAL_ID_NOT_FOUND:
+                failed.put(transactionalIdKey, new TransactionalIdNotFoundException(
+                    "DescribeTransactions request for transactionalId `" + transactionalIdKey.idValue + "` " +
+                        "failed because the ID could not be found"));
+                break;
+
+            case COORDINATOR_LOAD_IN_PROGRESS:
+                // If the coordinator is in the middle of loading, then we just need to retry
+                log.debug("DescribeTransactions request for transactionalId `{}` failed because the " +
+                        "coordinator is still in the process of loading state. Will retry",
+                    transactionalIdKey.idValue);
+                break;
+
+            case NOT_COORDINATOR:
+            case COORDINATOR_NOT_AVAILABLE:
+                // If the coordinator is unavailable or there was a coordinator change, then we unmap
+                // the key so that we retry the `FindCoordinator` request
+                unmapped.add(transactionalIdKey);
+                log.debug("DescribeTransactions request for transactionalId `{}` returned error {}. Will retry",
+                    transactionalIdKey.idValue, error);
+                break;
+
+            default:
+                failed.put(transactionalIdKey, error.exception("DescribeTransactions request for " +
+                    "transactionalId `" + transactionalIdKey.idValue + "` failed due to unexpected error"));

Review comment:
       It is using `error.exception`. Do you think is not sufficient?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10483: KAFKA-12586; Add `DescribeTransactions` Admin API

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10483:
URL: https://github.com/apache/kafka/pull/10483#discussion_r611717669



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandler.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.admin.TransactionDescription;
+import org.apache.kafka.clients.admin.TransactionState;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
+import org.apache.kafka.common.errors.TransactionalIdNotFoundException;
+import org.apache.kafka.common.message.DescribeTransactionsRequestData;
+import org.apache.kafka.common.message.DescribeTransactionsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.DescribeTransactionsRequest;
+import org.apache.kafka.common.requests.DescribeTransactionsResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class DescribeTransactionsHandler implements AdminApiHandler<CoordinatorKey, TransactionDescription> {
+    private final LogContext logContext;
+    private final Logger log;
+    private final Set<CoordinatorKey> keys;
+
+    public DescribeTransactionsHandler(
+        Collection<String> transactionalIds,
+        LogContext logContext
+    ) {
+        this.keys = buildKeySet(transactionalIds);
+        this.log = logContext.logger(DescribeTransactionsHandler.class);
+        this.logContext = logContext;
+    }
+
+    private static Set<CoordinatorKey> buildKeySet(Collection<String> transactionalIds) {
+        return transactionalIds.stream()
+            .map(DescribeTransactionsHandler::asCoordinatorKey)
+            .collect(Collectors.toSet());
+    }
+
+    @Override
+    public String apiName() {
+        return "describeTransactions";
+    }
+
+    @Override
+    public Keys<CoordinatorKey> initializeKeys() {
+        return Keys.dynamicMapped(keys, new CoordinatorStrategy(logContext));
+    }
+
+    @Override
+    public DescribeTransactionsRequest.Builder buildRequest(
+        Integer brokerId,
+        Set<CoordinatorKey> keys
+    ) {
+        DescribeTransactionsRequestData request = new DescribeTransactionsRequestData();
+        List<String> transactionalIds = keys.stream().map(key -> key.idValue).collect(Collectors.toList());
+        request.setTransactionalIds(transactionalIds);
+        return new DescribeTransactionsRequest.Builder(request);
+    }
+
+    @Override
+    public ApiResult<CoordinatorKey, TransactionDescription> handleResponse(
+        Integer brokerId,
+        Set<CoordinatorKey> keys,
+        AbstractResponse abstractResponse
+    ) {
+        DescribeTransactionsResponse response = (DescribeTransactionsResponse) abstractResponse;
+        Map<CoordinatorKey, TransactionDescription> completed = new HashMap<>();
+        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+        List<CoordinatorKey> unmapped = new ArrayList<>();
+
+        for (DescribeTransactionsResponseData.TransactionState transactionState : response.data().transactionStates()) {
+            CoordinatorKey transactionalIdKey = asCoordinatorKey(transactionState.transactionalId());
+            Errors error = Errors.forCode(transactionState.errorCode());
+
+            if (error != Errors.NONE) {
+                handleError(transactionalIdKey, error, failed, unmapped);
+                continue;
+            }
+
+            OptionalLong transactionStartTimeMs = transactionState.transactionStartTimeMs() < 0 ?
+                OptionalLong.empty() :
+                OptionalLong.of(transactionState.transactionStartTimeMs());
+
+            completed.put(transactionalIdKey, new TransactionDescription(
+                brokerId,
+                TransactionState.parse(transactionState.transactionState()),
+                transactionState.producerId(),
+                transactionState.producerEpoch(),
+                transactionState.transactionTimeoutMs(),
+                transactionStartTimeMs,
+                collectTopicPartitions(transactionState)
+            ));
+        }
+
+        return new ApiResult<>(completed, failed, unmapped);
+    }
+
+    private Set<TopicPartition> collectTopicPartitions(
+        DescribeTransactionsResponseData.TransactionState transactionState
+    ) {
+        Set<TopicPartition> res = new HashSet<>();
+        for (DescribeTransactionsResponseData.TopicData topicData : transactionState.topics()) {
+            String topic = topicData.topic();
+            for (Integer partitionId : topicData.partitions()) {
+                res.add(new TopicPartition(topic, partitionId));
+            }
+        }
+        return res;
+    }
+
+    public static CoordinatorKey asCoordinatorKey(String transactionalId) {
+        return new CoordinatorKey(transactionalId, FindCoordinatorRequest.CoordinatorType.TRANSACTION);
+    }
+
+    private void handleError(
+        CoordinatorKey transactionalIdKey,
+        Errors error,
+        Map<CoordinatorKey, Throwable> failed,
+        List<CoordinatorKey> unmapped
+    ) {
+        switch (error) {
+            case TRANSACTIONAL_ID_AUTHORIZATION_FAILED:
+                failed.put(transactionalIdKey, new TransactionalIdAuthorizationException(
+                    "DescribeTransactions request for transactionalId `" + transactionalIdKey.idValue + "` " +
+                        "failed due to authorization failure"));
+                break;
+
+            case TRANSACTIONAL_ID_NOT_FOUND:
+                failed.put(transactionalIdKey, new TransactionalIdNotFoundException(
+                    "DescribeTransactions request for transactionalId `" + transactionalIdKey.idValue + "` " +
+                        "failed because the ID could not be found"));
+                break;
+
+            case COORDINATOR_LOAD_IN_PROGRESS:
+                // If the coordinator is in the middle of loading, then we just need to retry
+                log.debug("DescribeTransactions request for transactionalId `{}` failed because the " +
+                        "coordinator is still in the process of loading state. Will retry",
+                    transactionalIdKey.idValue);
+                break;
+
+            case NOT_COORDINATOR:
+            case COORDINATOR_NOT_AVAILABLE:
+                // If the coordinator is unavailable or there was a coordinator change, then we unmap
+                // the key so that we retry the `FindCoordinator` request
+                unmapped.add(transactionalIdKey);
+                log.debug("DescribeTransactions request for transactionalId `{}` returned error {}. Will retry",
+                    transactionalIdKey.idValue, error);
+                break;
+
+            default:
+                failed.put(transactionalIdKey, error.exception("DescribeTransactions request for " +
+                    "transactionalId `" + transactionalIdKey.idValue + "` failed due to unexpected error"));

Review comment:
       I was thinking about the case where the broker would send back an unknown error code to the client. In those cases, it is useful to have a trace of it as `error.exception` will result in an `UnknownServerException`. I don't feel strong about it though but only thought that this could be useful.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10483: KAFKA-12586; Add `DescribeTransactions` Admin API

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10483:
URL: https://github.com/apache/kafka/pull/10483#discussion_r608009360



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorKey.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+
+import java.util.Objects;
+
+public class CoordinatorKey {

Review comment:
       Hmm.. I think it is a little clearer to use a separate type. If we _do_ add batching to FindCoordinator, then we'd just have to bring it back.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #10483: KAFKA-12586; Add `DescribeTransactions` Admin API

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #10483:
URL: https://github.com/apache/kafka/pull/10483


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10483: KAFKA-12586; Add `DescribeTransactions` Admin API

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10483:
URL: https://github.com/apache/kafka/pull/10483#discussion_r607561841



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.Objects;
+import java.util.Set;
+
+public class CoordinatorStrategy<V> implements AdminApiLookupStrategy<CoordinatorKey> {

Review comment:
       It seems that the generic type is useless here ?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.Objects;
+import java.util.Set;
+
+public class CoordinatorStrategy<V> implements AdminApiLookupStrategy<CoordinatorKey> {
+    private final Logger log;
+
+    public CoordinatorStrategy(
+        LogContext logContext
+    ) {
+        this.log = logContext.logger(CoordinatorStrategy.class);
+    }
+
+    @Override
+    public ApiRequestScope lookupScope(CoordinatorKey key) {
+        // The `FindCoordinator` API does not support batched lookups, so we use a
+        // separate lookup context for each coordinator key we need to lookup
+        return new LookupRequestScope(key);

Review comment:
       Will we support batching lookups?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorKey.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+
+import java.util.Objects;
+
+public class CoordinatorKey {

Review comment:
       This class is similar to `FindCoordinatorRequestData`, can we replace it using auto-generated protocal.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.Objects;
+import java.util.Set;
+
+public class CoordinatorStrategy<V> implements AdminApiLookupStrategy<CoordinatorKey> {
+    private final Logger log;
+
+    public CoordinatorStrategy(
+        LogContext logContext
+    ) {
+        this.log = logContext.logger(CoordinatorStrategy.class);
+    }
+
+    @Override
+    public ApiRequestScope lookupScope(CoordinatorKey key) {
+        // The `FindCoordinator` API does not support batched lookups, so we use a
+        // separate lookup context for each coordinator key we need to lookup
+        return new LookupRequestScope(key);
+    }
+
+    @Override
+    public FindCoordinatorRequest.Builder buildRequest(Set<CoordinatorKey> keys) {
+        CoordinatorKey key = requireSingleton(keys);
+        return new FindCoordinatorRequest.Builder(
+            new FindCoordinatorRequestData()
+                .setKey(key.idValue)
+                .setKeyType(key.type.id())
+        );
+    }
+
+    @Override
+    public LookupResult<CoordinatorKey> handleResponse(
+        Set<CoordinatorKey> keys,
+        AbstractResponse abstractResponse
+    ) {
+        CoordinatorKey key = requireSingleton(keys);
+        FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse;
+        Errors error = response.error();
+
+        switch (error) {
+            case NONE:
+                return LookupResult.mapped(key, response.data().nodeId());
+
+            case NOT_COORDINATOR:

Review comment:
       It seems that find coordinator rpc will not return `NOT_COORDINATOR`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #10483: KAFKA-12586; Add `DescribeTransactions` Admin API

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #10483:
URL: https://github.com/apache/kafka/pull/10483#issuecomment-824996875


   @dajac Thanks for reviewing. I will go ahead and merge since the only failures look like the usual MM ones. 
   
   @chia7712 Feel free to leave additional comments and I can address them in a separate PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10483: KAFKA-12586; Add `DescribeTransactions` Admin API

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10483:
URL: https://github.com/apache/kafka/pull/10483#discussion_r607994863



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.Objects;
+import java.util.Set;
+
+public class CoordinatorStrategy<V> implements AdminApiLookupStrategy<CoordinatorKey> {

Review comment:
       Oh yeah, this is leftover from a previous iteration.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10483: KAFKA-12586; Add `DescribeTransactions` Admin API

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10483:
URL: https://github.com/apache/kafka/pull/10483#discussion_r607569144



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.Objects;
+import java.util.Set;
+
+public class CoordinatorStrategy<V> implements AdminApiLookupStrategy<CoordinatorKey> {
+    private final Logger log;
+
+    public CoordinatorStrategy(
+        LogContext logContext
+    ) {
+        this.log = logContext.logger(CoordinatorStrategy.class);
+    }
+
+    @Override
+    public ApiRequestScope lookupScope(CoordinatorKey key) {
+        // The `FindCoordinator` API does not support batched lookups, so we use a
+        // separate lookup context for each coordinator key we need to lookup
+        return new LookupRequestScope(key);

Review comment:
       Will we support batching lookups in the future?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji edited a comment on pull request #10483: KAFKA-12586; Add `DescribeTransactions` Admin API

Posted by GitBox <gi...@apache.org>.
hachikuji edited a comment on pull request #10483:
URL: https://github.com/apache/kafka/pull/10483#issuecomment-813766742


   @chia7712 @dajac No rush, but when you have time, this is a continuation of the previous work which added `AdminApiDriver`. This patch contains `CoordinatorStrategy`, which is needed to lookup group/transaction coordinators.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10483: KAFKA-12586; Add `DescribeTransactions` Admin API

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10483:
URL: https://github.com/apache/kafka/pull/10483#discussion_r607994309



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.Objects;
+import java.util.Set;
+
+public class CoordinatorStrategy<V> implements AdminApiLookupStrategy<CoordinatorKey> {
+    private final Logger log;
+
+    public CoordinatorStrategy(
+        LogContext logContext
+    ) {
+        this.log = logContext.logger(CoordinatorStrategy.class);
+    }
+
+    @Override
+    public ApiRequestScope lookupScope(CoordinatorKey key) {
+        // The `FindCoordinator` API does not support batched lookups, so we use a
+        // separate lookup context for each coordinator key we need to lookup
+        return new LookupRequestScope(key);

Review comment:
       Yes, a future version of FindCoordinator could implement that. Then we'll just have to decide how to propagate the version to the lookup strategy. I don't think it would be too difficult.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #10483: KAFKA-12586; Add `DescribeTransactions` Admin API

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #10483:
URL: https://github.com/apache/kafka/pull/10483#issuecomment-813766742


   @chia7712 @dajac No rush, but when you have time, this is a continuation of the previous work which added `AdminApiDriver`. This patch contains `CoordinatorStrategy`, which is needed to lookup transaction coordinators.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10483: KAFKA-12586; Add `DescribeTransactions` Admin API

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10483:
URL: https://github.com/apache/kafka/pull/10483#discussion_r612131031



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandler.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.admin.TransactionDescription;
+import org.apache.kafka.clients.admin.TransactionState;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
+import org.apache.kafka.common.errors.TransactionalIdNotFoundException;
+import org.apache.kafka.common.message.DescribeTransactionsRequestData;
+import org.apache.kafka.common.message.DescribeTransactionsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.DescribeTransactionsRequest;
+import org.apache.kafka.common.requests.DescribeTransactionsResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class DescribeTransactionsHandler implements AdminApiHandler<CoordinatorKey, TransactionDescription> {
+    private final LogContext logContext;
+    private final Logger log;
+    private final Set<CoordinatorKey> keys;
+
+    public DescribeTransactionsHandler(
+        Collection<String> transactionalIds,
+        LogContext logContext
+    ) {
+        this.keys = buildKeySet(transactionalIds);
+        this.log = logContext.logger(DescribeTransactionsHandler.class);
+        this.logContext = logContext;
+    }
+
+    private static Set<CoordinatorKey> buildKeySet(Collection<String> transactionalIds) {
+        return transactionalIds.stream()
+            .map(DescribeTransactionsHandler::asCoordinatorKey)
+            .collect(Collectors.toSet());
+    }
+
+    @Override
+    public String apiName() {
+        return "describeTransactions";
+    }
+
+    @Override
+    public Keys<CoordinatorKey> initializeKeys() {
+        return Keys.dynamicMapped(keys, new CoordinatorStrategy(logContext));
+    }
+
+    @Override
+    public DescribeTransactionsRequest.Builder buildRequest(
+        Integer brokerId,
+        Set<CoordinatorKey> keys
+    ) {
+        DescribeTransactionsRequestData request = new DescribeTransactionsRequestData();
+        List<String> transactionalIds = keys.stream().map(key -> key.idValue).collect(Collectors.toList());
+        request.setTransactionalIds(transactionalIds);
+        return new DescribeTransactionsRequest.Builder(request);
+    }
+
+    @Override
+    public ApiResult<CoordinatorKey, TransactionDescription> handleResponse(
+        Integer brokerId,
+        Set<CoordinatorKey> keys,
+        AbstractResponse abstractResponse
+    ) {
+        DescribeTransactionsResponse response = (DescribeTransactionsResponse) abstractResponse;
+        Map<CoordinatorKey, TransactionDescription> completed = new HashMap<>();
+        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+        List<CoordinatorKey> unmapped = new ArrayList<>();
+
+        for (DescribeTransactionsResponseData.TransactionState transactionState : response.data().transactionStates()) {
+            CoordinatorKey transactionalIdKey = asCoordinatorKey(transactionState.transactionalId());
+            Errors error = Errors.forCode(transactionState.errorCode());
+
+            if (error != Errors.NONE) {
+                handleError(transactionalIdKey, error, failed, unmapped);
+                continue;
+            }
+
+            OptionalLong transactionStartTimeMs = transactionState.transactionStartTimeMs() < 0 ?
+                OptionalLong.empty() :
+                OptionalLong.of(transactionState.transactionStartTimeMs());
+
+            completed.put(transactionalIdKey, new TransactionDescription(
+                brokerId,
+                TransactionState.parse(transactionState.transactionState()),
+                transactionState.producerId(),
+                transactionState.producerEpoch(),
+                transactionState.transactionTimeoutMs(),
+                transactionStartTimeMs,
+                collectTopicPartitions(transactionState)
+            ));
+        }
+
+        return new ApiResult<>(completed, failed, unmapped);
+    }
+
+    private Set<TopicPartition> collectTopicPartitions(
+        DescribeTransactionsResponseData.TransactionState transactionState
+    ) {
+        Set<TopicPartition> res = new HashSet<>();
+        for (DescribeTransactionsResponseData.TopicData topicData : transactionState.topics()) {
+            String topic = topicData.topic();
+            for (Integer partitionId : topicData.partitions()) {
+                res.add(new TopicPartition(topic, partitionId));
+            }
+        }
+        return res;
+    }
+
+    public static CoordinatorKey asCoordinatorKey(String transactionalId) {
+        return new CoordinatorKey(transactionalId, FindCoordinatorRequest.CoordinatorType.TRANSACTION);
+    }
+
+    private void handleError(
+        CoordinatorKey transactionalIdKey,
+        Errors error,
+        Map<CoordinatorKey, Throwable> failed,
+        List<CoordinatorKey> unmapped
+    ) {
+        switch (error) {
+            case TRANSACTIONAL_ID_AUTHORIZATION_FAILED:
+                failed.put(transactionalIdKey, new TransactionalIdAuthorizationException(
+                    "DescribeTransactions request for transactionalId `" + transactionalIdKey.idValue + "` " +
+                        "failed due to authorization failure"));
+                break;
+
+            case TRANSACTIONAL_ID_NOT_FOUND:
+                failed.put(transactionalIdKey, new TransactionalIdNotFoundException(
+                    "DescribeTransactions request for transactionalId `" + transactionalIdKey.idValue + "` " +
+                        "failed because the ID could not be found"));
+                break;
+
+            case COORDINATOR_LOAD_IN_PROGRESS:
+                // If the coordinator is in the middle of loading, then we just need to retry
+                log.debug("DescribeTransactions request for transactionalId `{}` failed because the " +
+                        "coordinator is still in the process of loading state. Will retry",
+                    transactionalIdKey.idValue);
+                break;
+
+            case NOT_COORDINATOR:
+            case COORDINATOR_NOT_AVAILABLE:
+                // If the coordinator is unavailable or there was a coordinator change, then we unmap
+                // the key so that we retry the `FindCoordinator` request
+                unmapped.add(transactionalIdKey);
+                log.debug("DescribeTransactions request for transactionalId `{}` returned error {}. Will retry",
+                    transactionalIdKey.idValue, error);
+                break;
+
+            default:
+                failed.put(transactionalIdKey, error.exception("DescribeTransactions request for " +
+                    "transactionalId `" + transactionalIdKey.idValue + "` failed due to unexpected error"));

Review comment:
       We lose the underlying error code when we convert it to `Errors.UNKNOWN_SERVER_ERROR`. I debated a refactor to pass through the code as well, but decided against it in the end, though not for any particularly strong reason. At least we do log a warning in the conversion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #10483: KAFKA-12586; Add `DescribeTransactions` Admin API

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #10483:
URL: https://github.com/apache/kafka/pull/10483#issuecomment-824473278


   @chia7712 Thanks for the comments. I finally had a chance to address them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10483: KAFKA-12586; Add `DescribeTransactions` Admin API

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10483:
URL: https://github.com/apache/kafka/pull/10483#discussion_r610581181



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##########
@@ -1508,6 +1508,27 @@ default DescribeProducersResult describeProducers(Collection<TopicPartition> par
      */
     DescribeProducersResult describeProducers(Collection<TopicPartition> partitions, DescribeProducersOptions options);
 
+    /**
+     * Describe the state of a set of transactionalIds. See

Review comment:
       nit: I would use `transactional ids` rather than `transactionalIds` in the description. There are few other cases in this file.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeTransactionsResult.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.admin.internals.CoordinatorKey;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+@InterfaceStability.Evolving
+public class DescribeTransactionsResult {
+    private final Map<CoordinatorKey, KafkaFutureImpl<TransactionDescription>> futures;
+
+    DescribeTransactionsResult(Map<CoordinatorKey, KafkaFutureImpl<TransactionDescription>> futures) {
+        this.futures = futures;
+    }
+
+    public KafkaFuture<TransactionDescription> transactionalIdResult(String transactionalId) {

Review comment:
       `transactionalIdResult` looks weird to me. How about simply using `transaction`?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.Objects;
+import java.util.Set;
+
+public class CoordinatorStrategy implements AdminApiLookupStrategy<CoordinatorKey> {
+    private final Logger log;
+
+    public CoordinatorStrategy(
+        LogContext logContext
+    ) {
+        this.log = logContext.logger(CoordinatorStrategy.class);
+    }
+
+    @Override
+    public ApiRequestScope lookupScope(CoordinatorKey key) {
+        // The `FindCoordinator` API does not support batched lookups, so we use a
+        // separate lookup context for each coordinator key we need to lookup
+        return new LookupRequestScope(key);
+    }
+
+    @Override
+    public FindCoordinatorRequest.Builder buildRequest(Set<CoordinatorKey> keys) {
+        CoordinatorKey key = requireSingleton(keys);
+        return new FindCoordinatorRequest.Builder(
+            new FindCoordinatorRequestData()
+                .setKey(key.idValue)
+                .setKeyType(key.type.id())
+        );
+    }
+
+    @Override
+    public LookupResult<CoordinatorKey> handleResponse(
+        Set<CoordinatorKey> keys,
+        AbstractResponse abstractResponse
+    ) {
+        CoordinatorKey key = requireSingleton(keys);
+        FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse;
+        Errors error = response.error();
+
+        switch (error) {
+            case NONE:
+                return LookupResult.mapped(key, response.data().nodeId());
+
+            case COORDINATOR_NOT_AVAILABLE:
+            case COORDINATOR_LOAD_IN_PROGRESS:
+                log.debug("FindCoordinator request for key {} returned topic-level error {}. Will retry",
+                    key, error);
+                return LookupResult.empty();
+
+            case GROUP_AUTHORIZATION_FAILED:
+                return LookupResult.failed(key, new GroupAuthorizationException("FindCoordinator request for groupId " +
+                    "`" + key + "` failed due to authorization failure", key.idValue));
+
+            case TRANSACTIONAL_ID_AUTHORIZATION_FAILED:
+                return LookupResult.failed(key, new TransactionalIdAuthorizationException("FindCoordinator request for " +
+                    "transactionalId `" + key + "` failed due to authorization failure"));
+
+            default:
+                return LookupResult.failed(key, error.exception("FindCoordinator request for key " +
+                    "`" + key + "` failed due to an unexpected error"));

Review comment:
       I would mention the error in the message here as well.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandler.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.admin.TransactionDescription;
+import org.apache.kafka.clients.admin.TransactionState;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
+import org.apache.kafka.common.errors.TransactionalIdNotFoundException;
+import org.apache.kafka.common.message.DescribeTransactionsRequestData;
+import org.apache.kafka.common.message.DescribeTransactionsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.DescribeTransactionsRequest;
+import org.apache.kafka.common.requests.DescribeTransactionsResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class DescribeTransactionsHandler implements AdminApiHandler<CoordinatorKey, TransactionDescription> {
+    private final LogContext logContext;
+    private final Logger log;
+    private final Set<CoordinatorKey> keys;
+
+    public DescribeTransactionsHandler(
+        Collection<String> transactionalIds,
+        LogContext logContext
+    ) {
+        this.keys = buildKeySet(transactionalIds);
+        this.log = logContext.logger(DescribeTransactionsHandler.class);
+        this.logContext = logContext;
+    }
+
+    private static Set<CoordinatorKey> buildKeySet(Collection<String> transactionalIds) {
+        return transactionalIds.stream()
+            .map(DescribeTransactionsHandler::asCoordinatorKey)
+            .collect(Collectors.toSet());
+    }
+
+    @Override
+    public String apiName() {
+        return "describeTransactions";
+    }
+
+    @Override
+    public Keys<CoordinatorKey> initializeKeys() {
+        return Keys.dynamicMapped(keys, new CoordinatorStrategy(logContext));
+    }
+
+    @Override
+    public DescribeTransactionsRequest.Builder buildRequest(
+        Integer brokerId,
+        Set<CoordinatorKey> keys
+    ) {
+        DescribeTransactionsRequestData request = new DescribeTransactionsRequestData();
+        List<String> transactionalIds = keys.stream().map(key -> key.idValue).collect(Collectors.toList());
+        request.setTransactionalIds(transactionalIds);
+        return new DescribeTransactionsRequest.Builder(request);
+    }
+
+    @Override
+    public ApiResult<CoordinatorKey, TransactionDescription> handleResponse(
+        Integer brokerId,
+        Set<CoordinatorKey> keys,
+        AbstractResponse abstractResponse
+    ) {
+        DescribeTransactionsResponse response = (DescribeTransactionsResponse) abstractResponse;
+        Map<CoordinatorKey, TransactionDescription> completed = new HashMap<>();
+        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+        List<CoordinatorKey> unmapped = new ArrayList<>();
+
+        for (DescribeTransactionsResponseData.TransactionState transactionState : response.data().transactionStates()) {
+            CoordinatorKey transactionalIdKey = asCoordinatorKey(transactionState.transactionalId());
+            Errors error = Errors.forCode(transactionState.errorCode());
+
+            if (error != Errors.NONE) {
+                handleError(transactionalIdKey, error, failed, unmapped);
+                continue;
+            }
+
+            OptionalLong transactionStartTimeMs = transactionState.transactionStartTimeMs() < 0 ?
+                OptionalLong.empty() :
+                OptionalLong.of(transactionState.transactionStartTimeMs());
+
+            completed.put(transactionalIdKey, new TransactionDescription(
+                brokerId,
+                TransactionState.parse(transactionState.transactionState()),
+                transactionState.producerId(),
+                transactionState.producerEpoch(),
+                transactionState.transactionTimeoutMs(),
+                transactionStartTimeMs,
+                collectTopicPartitions(transactionState)
+            ));
+        }
+
+        return new ApiResult<>(completed, failed, unmapped);
+    }
+
+    private Set<TopicPartition> collectTopicPartitions(
+        DescribeTransactionsResponseData.TransactionState transactionState
+    ) {
+        Set<TopicPartition> res = new HashSet<>();
+        for (DescribeTransactionsResponseData.TopicData topicData : transactionState.topics()) {
+            String topic = topicData.topic();
+            for (Integer partitionId : topicData.partitions()) {
+                res.add(new TopicPartition(topic, partitionId));
+            }
+        }
+        return res;
+    }
+
+    public static CoordinatorKey asCoordinatorKey(String transactionalId) {
+        return new CoordinatorKey(transactionalId, FindCoordinatorRequest.CoordinatorType.TRANSACTION);
+    }
+
+    private void handleError(
+        CoordinatorKey transactionalIdKey,
+        Errors error,
+        Map<CoordinatorKey, Throwable> failed,
+        List<CoordinatorKey> unmapped
+    ) {
+        switch (error) {
+            case TRANSACTIONAL_ID_AUTHORIZATION_FAILED:
+                failed.put(transactionalIdKey, new TransactionalIdAuthorizationException(
+                    "DescribeTransactions request for transactionalId `" + transactionalIdKey.idValue + "` " +
+                        "failed due to authorization failure"));
+                break;
+
+            case TRANSACTIONAL_ID_NOT_FOUND:
+                failed.put(transactionalIdKey, new TransactionalIdNotFoundException(
+                    "DescribeTransactions request for transactionalId `" + transactionalIdKey.idValue + "` " +
+                        "failed because the ID could not be found"));
+                break;
+
+            case COORDINATOR_LOAD_IN_PROGRESS:
+                // If the coordinator is in the middle of loading, then we just need to retry
+                log.debug("DescribeTransactions request for transactionalId `{}` failed because the " +
+                        "coordinator is still in the process of loading state. Will retry",
+                    transactionalIdKey.idValue);
+                break;
+
+            case NOT_COORDINATOR:
+            case COORDINATOR_NOT_AVAILABLE:
+                // If the coordinator is unavailable or there was a coordinator change, then we unmap
+                // the key so that we retry the `FindCoordinator` request
+                unmapped.add(transactionalIdKey);
+                log.debug("DescribeTransactions request for transactionalId `{}` returned error {}. Will retry",
+                    transactionalIdKey.idValue, error);
+                break;
+
+            default:
+                failed.put(transactionalIdKey, error.exception("DescribeTransactions request for " +
+                    "transactionalId `" + transactionalIdKey.idValue + "` failed due to unexpected error"));

Review comment:
       I would mention the error in the message.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeTransactionsResult.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.admin.internals.CoordinatorKey;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+@InterfaceStability.Evolving
+public class DescribeTransactionsResult {

Review comment:
       nit: Should we add some Javadoc to the public methods?

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandlerTest.java
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.admin.TransactionDescription;
+import org.apache.kafka.clients.admin.internals.AdminApiHandler.ApiResult;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.DescribeTransactionsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.DescribeTransactionsRequest;
+import org.apache.kafka.common.requests.DescribeTransactionsResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class DescribeTransactionsHandlerTest {
+    private final LogContext logContext = new LogContext();
+
+    @Test
+    public void testBuildRequest() {
+        String transactionalId1 = "foo";
+        String transactionalId2 = "bar";
+        String transactionalId3 = "baz";
+
+        Set<String> transactionalIds = mkSet(transactionalId1, transactionalId2, transactionalId3);
+        DescribeTransactionsHandler handler = new DescribeTransactionsHandler(transactionalIds, logContext);
+
+        assertLookup(handler, transactionalIds);
+        assertLookup(handler, mkSet(transactionalId1));
+        assertLookup(handler, mkSet(transactionalId2, transactionalId3));
+    }
+
+    @Test
+    public void testHandleSuccessfulResponse() {
+        int brokerId = 1;
+        String transactionalId1 = "foo";
+        String transactionalId2 = "bar";
+
+        Set<String> transactionalIds = mkSet(transactionalId1, transactionalId2);
+        DescribeTransactionsHandler handler = new DescribeTransactionsHandler(transactionalIds, logContext);
+
+        DescribeTransactionsResponseData.TransactionState transactionState1 =
+            sampleTransactionState1(transactionalId1);
+        DescribeTransactionsResponseData.TransactionState transactionState2 =
+            sampleTransactionState2(transactionalId2);
+
+        Set<CoordinatorKey> keys = coordinatorKeys(transactionalIds);
+        DescribeTransactionsResponse response = new DescribeTransactionsResponse(new DescribeTransactionsResponseData()
+            .setTransactionStates(asList(transactionState1, transactionState2)));
+
+        ApiResult<CoordinatorKey, TransactionDescription> result = handler.handleResponse(
+            brokerId, keys, response);
+
+        assertEquals(keys, result.completedKeys.keySet());
+        assertMatchingTransactionState(brokerId, transactionState1,
+            result.completedKeys.get(coordinatorKey(transactionalId1)));
+        assertMatchingTransactionState(brokerId, transactionState2,
+            result.completedKeys.get(coordinatorKey(transactionalId2)));
+    }
+
+    @Test
+    public void testHandleErrorResponse() {
+        String transactionalId = "foo";
+        Set<String> transactionalIds = mkSet(transactionalId);
+        DescribeTransactionsHandler handler = new DescribeTransactionsHandler(transactionalIds, logContext);
+        assertFatalError(handler, transactionalId, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED);
+        assertFatalError(handler, transactionalId, Errors.TRANSACTIONAL_ID_NOT_FOUND);
+        assertFatalError(handler, transactionalId, Errors.UNKNOWN_SERVER_ERROR);
+        assertRetriableError(handler, transactionalId, Errors.COORDINATOR_LOAD_IN_PROGRESS);
+        assertUnmappedKey(handler, transactionalId, Errors.NOT_COORDINATOR);
+        assertUnmappedKey(handler, transactionalId, Errors.COORDINATOR_NOT_AVAILABLE);
+    }
+
+    private void assertFatalError(
+        DescribeTransactionsHandler handler,
+        String transactionalId,
+        Errors error
+    ) {
+        CoordinatorKey key = coordinatorKey(transactionalId);
+        ApiResult<CoordinatorKey, TransactionDescription> result = handleResponseError(handler, transactionalId, error);
+        assertEquals(emptyList(), result.unmappedKeys);
+        assertEquals(mkSet(key), result.failedKeys.keySet());
+
+        Throwable throwable = result.failedKeys.get(key);
+        assertTrue(error.exception().getClass().isInstance(throwable));
+    }
+
+    private void assertRetriableError(
+        DescribeTransactionsHandler handler,
+        String transactionalId,
+        Errors error
+    ) {
+        ApiResult<CoordinatorKey, TransactionDescription> result = handleResponseError(handler, transactionalId, error);
+        assertEquals(emptyList(), result.unmappedKeys);
+        assertEquals(emptyMap(), result.failedKeys);
+    }
+
+    private void assertUnmappedKey(
+        DescribeTransactionsHandler handler,
+        String transactionalId,
+        Errors error
+    ) {
+        CoordinatorKey key = coordinatorKey(transactionalId);
+        ApiResult<CoordinatorKey, TransactionDescription> result = handleResponseError(handler, transactionalId, error);
+        assertEquals(emptyMap(), result.failedKeys);
+        assertEquals(singletonList(key), result.unmappedKeys);
+    }
+
+    private ApiResult<CoordinatorKey, TransactionDescription> handleResponseError(
+        DescribeTransactionsHandler handler,
+        String transactionalId,
+        Errors error
+    ) {
+        int brokerId = 1;
+
+        CoordinatorKey key = coordinatorKey(transactionalId);
+        Set<CoordinatorKey> keys = mkSet(key);
+
+        DescribeTransactionsResponseData.TransactionState transactionState = new DescribeTransactionsResponseData.TransactionState()
+            .setErrorCode(error.code())
+            .setTransactionalId(transactionalId);
+
+        DescribeTransactionsResponse response = new DescribeTransactionsResponse(new DescribeTransactionsResponseData()
+            .setTransactionStates(singletonList(transactionState)));
+
+        ApiResult<CoordinatorKey, TransactionDescription> result = handler.handleResponse(brokerId, keys, response);
+        assertEquals(emptyMap(), result.completedKeys);
+        return result;
+    }
+
+    private void assertLookup(
+        DescribeTransactionsHandler handler,
+        Set<String> transactionalIds
+    ) {
+        Set<CoordinatorKey> keys = coordinatorKeys(transactionalIds);
+        DescribeTransactionsRequest.Builder request = handler.buildRequest(1, keys);
+        assertEquals(transactionalIds, new HashSet<>(request.data.transactionalIds()));
+    }
+
+    private static CoordinatorKey coordinatorKey(String transactionalId) {
+        return new CoordinatorKey(transactionalId, FindCoordinatorRequest.CoordinatorType.TRANSACTION);
+    }

Review comment:
       nit: We have the same helper methods in few places. I wonder if we could have only one and use it everywhere? `CoordinatorKey` might be a good place (e.g. `CoordinatorKey#transactionalKey`).

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -5432,6 +5437,47 @@ public void testDescribeProducersRetryAfterDisconnect() throws Exception {
         }
     }
 
+    @Test
+    public void testDescribeTransactions() throws Exception {
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            String transactionalId = "foo";
+            Node coordinator = env.cluster().nodes().iterator().next();
+            TransactionDescription expected = new TransactionDescription(
+                coordinator.id(), TransactionState.COMPLETE_COMMIT, 12345L,
+                15, 10000L, OptionalLong.empty(), emptySet());
+

Review comment:
       nit: One empty line could be removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10483: KAFKA-12586; Add `DescribeTransactions` Admin API

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10483:
URL: https://github.com/apache/kafka/pull/10483#discussion_r610962235



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeTransactionsResult.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.admin.internals.CoordinatorKey;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+@InterfaceStability.Evolving
+public class DescribeTransactionsResult {
+    private final Map<CoordinatorKey, KafkaFutureImpl<TransactionDescription>> futures;
+
+    DescribeTransactionsResult(Map<CoordinatorKey, KafkaFutureImpl<TransactionDescription>> futures) {
+        this.futures = futures;
+    }
+
+    public KafkaFuture<TransactionDescription> transactionalIdResult(String transactionalId) {

Review comment:
       I agree the name is not great, but I was torn on an alternative. I went with `description`. Let me know what you think.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10483: KAFKA-12586; Add `DescribeTransactions` Admin API

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10483:
URL: https://github.com/apache/kafka/pull/10483#discussion_r611715337



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeTransactionsResult.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.admin.internals.CoordinatorKey;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+@InterfaceStability.Evolving
+public class DescribeTransactionsResult {
+    private final Map<CoordinatorKey, KafkaFutureImpl<TransactionDescription>> futures;
+
+    DescribeTransactionsResult(Map<CoordinatorKey, KafkaFutureImpl<TransactionDescription>> futures) {
+        this.futures = futures;
+    }
+
+    public KafkaFuture<TransactionDescription> transactionalIdResult(String transactionalId) {

Review comment:
       `description` sounds good to me.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #10483: KAFKA-12586; Add `DescribeTransactions` Admin API

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #10483:
URL: https://github.com/apache/kafka/pull/10483#discussion_r607563049



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorKey.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+
+import java.util.Objects;
+
+public class CoordinatorKey {

Review comment:
       This class is similar to `FindCoordinatorRequestData`, can we replace it using auto-generated protocal?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10483: KAFKA-12586; Add `DescribeTransactions` Admin API

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10483:
URL: https://github.com/apache/kafka/pull/10483#discussion_r618008786



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/TransactionState.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.HashMap;
+
+public enum TransactionState {
+    ONGOING("Ongoing"),
+    PREPARE_ABORT("PrepareAbort"),
+    PREPARE_COMMIT("PrepareCommit"),
+    COMPLETE_ABORT("CompleteAbort"),
+    COMPLETE_COMMIT("CompleteCommit"),
+    EMPTY("Empty"),
+    PREPARE_EPOCH_FENCE("PrepareEpochFence"),
+    UNKNOWN("Unknown");
+
+    private final static HashMap<String, TransactionState> NAME_TO_ENUM;
+
+    static {
+        NAME_TO_ENUM = new HashMap<>();
+        for (TransactionState state : TransactionState.values()) {
+            NAME_TO_ENUM.put(state.name, state);
+        }
+    }
+
+    private final String name;
+
+    TransactionState(String name) {
+        this.name = name;
+    }
+
+    @Override
+    public String toString() {
+        return name;
+    }
+
+    public static TransactionState parse(String name) {

Review comment:
       I added a test case which fails if we fail to keep them in sync. I thought about adding a separate type, but I'm not sure it's worth it and we actually decided not to expose the transient `Dead` state.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10483: KAFKA-12586; Add `DescribeTransactions` Admin API

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10483:
URL: https://github.com/apache/kafka/pull/10483#discussion_r612182543



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandler.java
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.admin.TransactionDescription;
+import org.apache.kafka.clients.admin.TransactionState;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
+import org.apache.kafka.common.errors.TransactionalIdNotFoundException;
+import org.apache.kafka.common.message.DescribeTransactionsRequestData;
+import org.apache.kafka.common.message.DescribeTransactionsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.DescribeTransactionsRequest;
+import org.apache.kafka.common.requests.DescribeTransactionsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class DescribeTransactionsHandler implements AdminApiHandler<CoordinatorKey, TransactionDescription> {
+    private final LogContext logContext;
+    private final Logger log;
+    private final Set<CoordinatorKey> keys;
+
+    public DescribeTransactionsHandler(
+        Collection<String> transactionalIds,
+        LogContext logContext
+    ) {
+        this.keys = buildKeySet(transactionalIds);
+        this.log = logContext.logger(DescribeTransactionsHandler.class);
+        this.logContext = logContext;
+    }
+
+    private static Set<CoordinatorKey> buildKeySet(Collection<String> transactionalIds) {
+        return transactionalIds.stream()
+            .map(CoordinatorKey::byTransactionalId)
+            .collect(Collectors.toSet());
+    }
+
+    @Override
+    public String apiName() {
+        return "describeTransactions";
+    }
+
+    @Override
+    public Keys<CoordinatorKey> initializeKeys() {
+        return Keys.dynamicMapped(keys, new CoordinatorStrategy(logContext));
+    }
+
+    @Override
+    public DescribeTransactionsRequest.Builder buildRequest(
+        Integer brokerId,

Review comment:
       why this type is `Integer` rather than `int`?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandler.java
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.admin.TransactionDescription;
+import org.apache.kafka.clients.admin.TransactionState;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
+import org.apache.kafka.common.errors.TransactionalIdNotFoundException;
+import org.apache.kafka.common.message.DescribeTransactionsRequestData;
+import org.apache.kafka.common.message.DescribeTransactionsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.DescribeTransactionsRequest;
+import org.apache.kafka.common.requests.DescribeTransactionsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class DescribeTransactionsHandler implements AdminApiHandler<CoordinatorKey, TransactionDescription> {
+    private final LogContext logContext;
+    private final Logger log;
+    private final Set<CoordinatorKey> keys;
+
+    public DescribeTransactionsHandler(
+        Collection<String> transactionalIds,
+        LogContext logContext
+    ) {
+        this.keys = buildKeySet(transactionalIds);
+        this.log = logContext.logger(DescribeTransactionsHandler.class);
+        this.logContext = logContext;
+    }
+
+    private static Set<CoordinatorKey> buildKeySet(Collection<String> transactionalIds) {
+        return transactionalIds.stream()
+            .map(CoordinatorKey::byTransactionalId)
+            .collect(Collectors.toSet());
+    }
+
+    @Override
+    public String apiName() {
+        return "describeTransactions";
+    }
+
+    @Override
+    public Keys<CoordinatorKey> initializeKeys() {
+        return Keys.dynamicMapped(keys, new CoordinatorStrategy(logContext));
+    }
+
+    @Override
+    public DescribeTransactionsRequest.Builder buildRequest(
+        Integer brokerId,
+        Set<CoordinatorKey> keys
+    ) {
+        DescribeTransactionsRequestData request = new DescribeTransactionsRequestData();
+        List<String> transactionalIds = keys.stream().map(key -> key.idValue).collect(Collectors.toList());

Review comment:
       Should it verify the `type`?

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategyTest.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonMap;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class CoordinatorStrategyTest {
+
+    @Test
+    public void testBuildLookupRequest() {
+        CoordinatorStrategy strategy = new CoordinatorStrategy(new LogContext());
+        FindCoordinatorRequest.Builder request = strategy.buildRequest(singleton(
+            CoordinatorKey.byGroupId("foo")));
+        assertEquals("foo", request.data().key());
+        assertEquals(CoordinatorType.GROUP, CoordinatorType.forId(request.data().keyType()));
+    }
+
+    @Test
+    public void testBuildLookupRequestRequiresOneKey() {
+        CoordinatorStrategy strategy = new CoordinatorStrategy(new LogContext());
+        assertThrows(IllegalArgumentException.class, () -> strategy.buildRequest(Collections.emptySet()));
+
+        CoordinatorKey group1 = CoordinatorKey.byGroupId("foo");
+        CoordinatorKey group2 = CoordinatorKey.byGroupId("bar");
+        assertThrows(IllegalArgumentException.class, () -> strategy.buildRequest(mkSet(group1, group2)));
+    }
+
+    @Test
+    public void testSuccessfulCoordinatorLookup() {
+        CoordinatorKey group = CoordinatorKey.byGroupId("foo");
+
+        FindCoordinatorResponseData responseData = new FindCoordinatorResponseData()
+            .setErrorCode(Errors.NONE.code())
+            .setHost("localhost")
+            .setPort(9092)
+            .setNodeId(1);
+
+        AdminApiLookupStrategy.LookupResult<CoordinatorKey> result = runLookup(group, responseData);
+        assertEquals(singletonMap(group, 1), result.mappedKeys);
+        assertEquals(emptyMap(), result.failedKeys);
+    }
+
+    @Test
+    public void testRetriableCoordinatorLookup() {
+        testRetriableCoordinatorLookup(Errors.COORDINATOR_LOAD_IN_PROGRESS);
+        testRetriableCoordinatorLookup(Errors.COORDINATOR_NOT_AVAILABLE);
+    }
+
+    public void testRetriableCoordinatorLookup(Errors error) {

Review comment:
       Could you change modifier from public to private? Otherwise, it looks like a bug that we forget to add test annotation.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/TransactionState.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.HashMap;
+
+public enum TransactionState {

Review comment:
       Should it need `@InterfaceStability.Evolving`?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandler.java
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.admin.TransactionDescription;
+import org.apache.kafka.clients.admin.TransactionState;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
+import org.apache.kafka.common.errors.TransactionalIdNotFoundException;
+import org.apache.kafka.common.message.DescribeTransactionsRequestData;
+import org.apache.kafka.common.message.DescribeTransactionsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.DescribeTransactionsRequest;
+import org.apache.kafka.common.requests.DescribeTransactionsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class DescribeTransactionsHandler implements AdminApiHandler<CoordinatorKey, TransactionDescription> {
+    private final LogContext logContext;
+    private final Logger log;
+    private final Set<CoordinatorKey> keys;
+
+    public DescribeTransactionsHandler(
+        Collection<String> transactionalIds,
+        LogContext logContext
+    ) {
+        this.keys = buildKeySet(transactionalIds);
+        this.log = logContext.logger(DescribeTransactionsHandler.class);
+        this.logContext = logContext;
+    }
+
+    private static Set<CoordinatorKey> buildKeySet(Collection<String> transactionalIds) {
+        return transactionalIds.stream()
+            .map(CoordinatorKey::byTransactionalId)
+            .collect(Collectors.toSet());
+    }
+
+    @Override
+    public String apiName() {
+        return "describeTransactions";
+    }
+
+    @Override
+    public Keys<CoordinatorKey> initializeKeys() {
+        return Keys.dynamicMapped(keys, new CoordinatorStrategy(logContext));
+    }
+
+    @Override
+    public DescribeTransactionsRequest.Builder buildRequest(
+        Integer brokerId,
+        Set<CoordinatorKey> keys
+    ) {
+        DescribeTransactionsRequestData request = new DescribeTransactionsRequestData();
+        List<String> transactionalIds = keys.stream().map(key -> key.idValue).collect(Collectors.toList());
+        request.setTransactionalIds(transactionalIds);
+        return new DescribeTransactionsRequest.Builder(request);
+    }
+
+    @Override
+    public ApiResult<CoordinatorKey, TransactionDescription> handleResponse(
+        Integer brokerId,

Review comment:
       ditto

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/TransactionState.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.HashMap;
+
+public enum TransactionState {
+    ONGOING("Ongoing"),
+    PREPARE_ABORT("PrepareAbort"),
+    PREPARE_COMMIT("PrepareCommit"),
+    COMPLETE_ABORT("CompleteAbort"),
+    COMPLETE_COMMIT("CompleteCommit"),
+    EMPTY("Empty"),
+    PREPARE_EPOCH_FENCE("PrepareEpochFence"),
+    UNKNOWN("Unknown");
+
+    private final static HashMap<String, TransactionState> NAME_TO_ENUM;
+
+    static {
+        NAME_TO_ENUM = new HashMap<>();
+        for (TransactionState state : TransactionState.values()) {
+            NAME_TO_ENUM.put(state.name, state);
+        }
+    }
+
+    private final String name;
+
+    TransactionState(String name) {
+        this.name = name;
+    }
+
+    @Override
+    public String toString() {
+        return name;
+    }
+
+    public static TransactionState parse(String name) {

Review comment:
       The name is a part of serialization data now. Should we unify `TransactionState` for both scala and java? 

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategyTest.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.FindCoordinatorResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonMap;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class CoordinatorStrategyTest {
+
+    @Test
+    public void testBuildLookupRequest() {
+        CoordinatorStrategy strategy = new CoordinatorStrategy(new LogContext());
+        FindCoordinatorRequest.Builder request = strategy.buildRequest(singleton(
+            CoordinatorKey.byGroupId("foo")));
+        assertEquals("foo", request.data().key());
+        assertEquals(CoordinatorType.GROUP, CoordinatorType.forId(request.data().keyType()));
+    }
+
+    @Test
+    public void testBuildLookupRequestRequiresOneKey() {

Review comment:
       `handleResponse` needs similar UT (XXXRequiresOneKey)

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandler.java
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.admin.TransactionDescription;
+import org.apache.kafka.clients.admin.TransactionState;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
+import org.apache.kafka.common.errors.TransactionalIdNotFoundException;
+import org.apache.kafka.common.message.DescribeTransactionsRequestData;
+import org.apache.kafka.common.message.DescribeTransactionsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.DescribeTransactionsRequest;
+import org.apache.kafka.common.requests.DescribeTransactionsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class DescribeTransactionsHandler implements AdminApiHandler<CoordinatorKey, TransactionDescription> {
+    private final LogContext logContext;
+    private final Logger log;
+    private final Set<CoordinatorKey> keys;
+
+    public DescribeTransactionsHandler(
+        Collection<String> transactionalIds,
+        LogContext logContext
+    ) {
+        this.keys = buildKeySet(transactionalIds);
+        this.log = logContext.logger(DescribeTransactionsHandler.class);
+        this.logContext = logContext;
+    }
+
+    private static Set<CoordinatorKey> buildKeySet(Collection<String> transactionalIds) {
+        return transactionalIds.stream()
+            .map(CoordinatorKey::byTransactionalId)
+            .collect(Collectors.toSet());
+    }
+
+    @Override
+    public String apiName() {
+        return "describeTransactions";
+    }
+
+    @Override
+    public Keys<CoordinatorKey> initializeKeys() {
+        return Keys.dynamicMapped(keys, new CoordinatorStrategy(logContext));
+    }
+
+    @Override
+    public DescribeTransactionsRequest.Builder buildRequest(
+        Integer brokerId,
+        Set<CoordinatorKey> keys
+    ) {
+        DescribeTransactionsRequestData request = new DescribeTransactionsRequestData();
+        List<String> transactionalIds = keys.stream().map(key -> key.idValue).collect(Collectors.toList());
+        request.setTransactionalIds(transactionalIds);
+        return new DescribeTransactionsRequest.Builder(request);
+    }
+
+    @Override
+    public ApiResult<CoordinatorKey, TransactionDescription> handleResponse(
+        Integer brokerId,
+        Set<CoordinatorKey> keys,
+        AbstractResponse abstractResponse
+    ) {
+        DescribeTransactionsResponse response = (DescribeTransactionsResponse) abstractResponse;
+        Map<CoordinatorKey, TransactionDescription> completed = new HashMap<>();
+        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+        List<CoordinatorKey> unmapped = new ArrayList<>();
+
+        for (DescribeTransactionsResponseData.TransactionState transactionState : response.data().transactionStates()) {
+            CoordinatorKey transactionalIdKey = CoordinatorKey.byTransactionalId(
+                transactionState.transactionalId());
+            if (!keys.contains(transactionalIdKey)) {
+                log.warn("Response included transactionalId `{}`, which was not requested",
+                    transactionState.transactionalId());
+                continue;
+            }
+
+            Errors error = Errors.forCode(transactionState.errorCode());
+            if (error != Errors.NONE) {
+                handleError(transactionalIdKey, error, failed, unmapped);
+                continue;
+            }
+
+            OptionalLong transactionStartTimeMs = transactionState.transactionStartTimeMs() < 0 ?
+                OptionalLong.empty() :
+                OptionalLong.of(transactionState.transactionStartTimeMs());
+
+            completed.put(transactionalIdKey, new TransactionDescription(
+                brokerId,
+                TransactionState.parse(transactionState.transactionState()),
+                transactionState.producerId(),
+                transactionState.producerEpoch(),
+                transactionState.transactionTimeoutMs(),
+                transactionStartTimeMs,
+                collectTopicPartitions(transactionState)
+            ));
+        }
+
+        return new ApiResult<>(completed, failed, unmapped);
+    }
+
+    private Set<TopicPartition> collectTopicPartitions(
+        DescribeTransactionsResponseData.TransactionState transactionState
+    ) {
+        Set<TopicPartition> res = new HashSet<>();
+        for (DescribeTransactionsResponseData.TopicData topicData : transactionState.topics()) {
+            String topic = topicData.topic();
+            for (Integer partitionId : topicData.partitions()) {
+                res.add(new TopicPartition(topic, partitionId));
+            }
+        }
+        return res;
+    }
+
+    private void handleError(
+        CoordinatorKey transactionalIdKey,
+        Errors error,
+        Map<CoordinatorKey, Throwable> failed,
+        List<CoordinatorKey> unmapped
+    ) {
+        switch (error) {
+            case TRANSACTIONAL_ID_AUTHORIZATION_FAILED:
+                failed.put(transactionalIdKey, new TransactionalIdAuthorizationException(
+                    "DescribeTransactions request for transactionalId `" + transactionalIdKey.idValue + "` " +
+                        "failed due to authorization failure"));
+                break;
+
+            case TRANSACTIONAL_ID_NOT_FOUND:
+                failed.put(transactionalIdKey, new TransactionalIdNotFoundException(
+                    "DescribeTransactions request for transactionalId `" + transactionalIdKey.idValue + "` " +
+                        "failed because the ID could not be found"));
+                break;
+
+            case COORDINATOR_LOAD_IN_PROGRESS:
+                // If the coordinator is in the middle of loading, then we just need to retry
+                log.debug("DescribeTransactions request for transactionalId `{}` failed because the " +
+                        "coordinator is still in the process of loading state. Will retry",
+                    transactionalIdKey.idValue);
+                break;
+
+            case NOT_COORDINATOR:
+            case COORDINATOR_NOT_AVAILABLE:
+                // If the coordinator is unavailable or there was a coordinator change, then we unmap
+                // the key so that we retry the `FindCoordinator` request
+                unmapped.add(transactionalIdKey);
+                log.debug("DescribeTransactions request for transactionalId `{}` returned error {}. Will retry",
+                    transactionalIdKey.idValue, error);
+                break;
+
+            default:
+                failed.put(transactionalIdKey, error.exception("DescribeTransactions request for " +
+                    "transactionalId `" + transactionalIdKey.idValue + "` failed due to unexpected error "));

Review comment:
       redundant space: "to unexpected error"

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/TransactionState.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.HashMap;
+
+public enum TransactionState {
+    ONGOING("Ongoing"),
+    PREPARE_ABORT("PrepareAbort"),
+    PREPARE_COMMIT("PrepareCommit"),
+    COMPLETE_ABORT("CompleteAbort"),
+    COMPLETE_COMMIT("CompleteCommit"),
+    EMPTY("Empty"),
+    PREPARE_EPOCH_FENCE("PrepareEpochFence"),
+    UNKNOWN("Unknown");
+
+    private final static HashMap<String, TransactionState> NAME_TO_ENUM;

Review comment:
       How about using java stream?
   ```java
   Arrays.stream(TransactionState.values())
                   .collect(Collectors.toMap(TransactionState::name, Function.identity()))
   ```

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeTransactionsHandler.java
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.admin.TransactionDescription;
+import org.apache.kafka.clients.admin.TransactionState;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
+import org.apache.kafka.common.errors.TransactionalIdNotFoundException;
+import org.apache.kafka.common.message.DescribeTransactionsRequestData;
+import org.apache.kafka.common.message.DescribeTransactionsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.DescribeTransactionsRequest;
+import org.apache.kafka.common.requests.DescribeTransactionsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class DescribeTransactionsHandler implements AdminApiHandler<CoordinatorKey, TransactionDescription> {
+    private final LogContext logContext;
+    private final Logger log;
+    private final Set<CoordinatorKey> keys;
+
+    public DescribeTransactionsHandler(
+        Collection<String> transactionalIds,
+        LogContext logContext
+    ) {
+        this.keys = buildKeySet(transactionalIds);
+        this.log = logContext.logger(DescribeTransactionsHandler.class);
+        this.logContext = logContext;
+    }
+
+    private static Set<CoordinatorKey> buildKeySet(Collection<String> transactionalIds) {
+        return transactionalIds.stream()
+            .map(CoordinatorKey::byTransactionalId)
+            .collect(Collectors.toSet());
+    }
+
+    @Override
+    public String apiName() {
+        return "describeTransactions";
+    }
+
+    @Override
+    public Keys<CoordinatorKey> initializeKeys() {
+        return Keys.dynamicMapped(keys, new CoordinatorStrategy(logContext));
+    }
+
+    @Override
+    public DescribeTransactionsRequest.Builder buildRequest(
+        Integer brokerId,
+        Set<CoordinatorKey> keys
+    ) {
+        DescribeTransactionsRequestData request = new DescribeTransactionsRequestData();
+        List<String> transactionalIds = keys.stream().map(key -> key.idValue).collect(Collectors.toList());
+        request.setTransactionalIds(transactionalIds);
+        return new DescribeTransactionsRequest.Builder(request);
+    }
+
+    @Override
+    public ApiResult<CoordinatorKey, TransactionDescription> handleResponse(
+        Integer brokerId,
+        Set<CoordinatorKey> keys,
+        AbstractResponse abstractResponse
+    ) {
+        DescribeTransactionsResponse response = (DescribeTransactionsResponse) abstractResponse;
+        Map<CoordinatorKey, TransactionDescription> completed = new HashMap<>();
+        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+        List<CoordinatorKey> unmapped = new ArrayList<>();
+
+        for (DescribeTransactionsResponseData.TransactionState transactionState : response.data().transactionStates()) {
+            CoordinatorKey transactionalIdKey = CoordinatorKey.byTransactionalId(
+                transactionState.transactionalId());
+            if (!keys.contains(transactionalIdKey)) {
+                log.warn("Response included transactionalId `{}`, which was not requested",
+                    transactionState.transactionalId());
+                continue;
+            }
+
+            Errors error = Errors.forCode(transactionState.errorCode());
+            if (error != Errors.NONE) {
+                handleError(transactionalIdKey, error, failed, unmapped);
+                continue;
+            }
+
+            OptionalLong transactionStartTimeMs = transactionState.transactionStartTimeMs() < 0 ?
+                OptionalLong.empty() :
+                OptionalLong.of(transactionState.transactionStartTimeMs());
+
+            completed.put(transactionalIdKey, new TransactionDescription(
+                brokerId,
+                TransactionState.parse(transactionState.transactionState()),
+                transactionState.producerId(),
+                transactionState.producerEpoch(),
+                transactionState.transactionTimeoutMs(),
+                transactionStartTimeMs,
+                collectTopicPartitions(transactionState)
+            ));
+        }
+
+        return new ApiResult<>(completed, failed, unmapped);
+    }
+
+    private Set<TopicPartition> collectTopicPartitions(
+        DescribeTransactionsResponseData.TransactionState transactionState
+    ) {
+        Set<TopicPartition> res = new HashSet<>();
+        for (DescribeTransactionsResponseData.TopicData topicData : transactionState.topics()) {
+            String topic = topicData.topic();
+            for (Integer partitionId : topicData.partitions()) {
+                res.add(new TopicPartition(topic, partitionId));
+            }
+        }
+        return res;
+    }
+
+    private void handleError(
+        CoordinatorKey transactionalIdKey,
+        Errors error,
+        Map<CoordinatorKey, Throwable> failed,
+        List<CoordinatorKey> unmapped
+    ) {
+        switch (error) {
+            case TRANSACTIONAL_ID_AUTHORIZATION_FAILED:
+                failed.put(transactionalIdKey, new TransactionalIdAuthorizationException(
+                    "DescribeTransactions request for transactionalId `" + transactionalIdKey.idValue + "` " +
+                        "failed due to authorization failure"));
+                break;
+
+            case TRANSACTIONAL_ID_NOT_FOUND:
+                failed.put(transactionalIdKey, new TransactionalIdNotFoundException(
+                    "DescribeTransactions request for transactionalId `" + transactionalIdKey.idValue + "` " +
+                        "failed because the ID could not be found"));
+                break;
+
+            case COORDINATOR_LOAD_IN_PROGRESS:
+                // If the coordinator is in the middle of loading, then we just need to retry
+                log.debug("DescribeTransactions request for transactionalId `{}` failed because the " +
+                        "coordinator is still in the process of loading state. Will retry",
+                    transactionalIdKey.idValue);
+                break;
+
+            case NOT_COORDINATOR:
+            case COORDINATOR_NOT_AVAILABLE:
+                // If the coordinator is unavailable or there was a coordinator change, then we unmap
+                // the key so that we retry the `FindCoordinator` request
+                unmapped.add(transactionalIdKey);
+                log.debug("DescribeTransactions request for transactionalId `{}` returned error {}. Will retry",

Review comment:
       This error message seems to be inconsistent with comment "retry the `FindCoordinator` request"

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/TransactionDescription.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Objects;
+import java.util.OptionalLong;
+import java.util.Set;
+
+public class TransactionDescription {

Review comment:
       Should it need `@InterfaceStability.Evolving`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org