You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/04/30 13:40:21 UTC

[GitHub] [kafka] dajac commented on a change in pull request #10616: KAFKA-12709; Add Admin API for `ListTransactions`

dajac commented on a change in pull request #10616:
URL: https://github.com/apache/kafka/pull/10616#discussion_r623846200



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsOptions.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Options for {@link Admin#listTransactions()}.
+ *
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class ListTransactionsOptions extends AbstractOptions<ListTransactionsOptions> {
+    private Set<TransactionState> filteredStates = Collections.emptySet();
+    private Set<Long> filteredProducerIds = Collections.emptySet();
+
+    public ListTransactionsOptions filterStates(Collection<TransactionState> states) {

Review comment:
       Should we add javadoc to the various methods here?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsResult.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The result of the {@link Admin#listTransactions()} call.
+ * <p>
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class ListTransactionsResult {
+    private final KafkaFutureImpl<Map<Integer, KafkaFutureImpl<Collection<TransactionListing>>>> future;
+
+    ListTransactionsResult(KafkaFutureImpl<Map<Integer, KafkaFutureImpl<Collection<TransactionListing>>>> future) {
+        this.future = future;
+    }
+
+    public KafkaFuture<Collection<TransactionListing>> all() {
+        return allByBrokerId().thenApply(map -> {
+            List<TransactionListing> allListings = new ArrayList<>();
+            for (Collection<TransactionListing> listings : map.values()) {
+                allListings.addAll(listings);
+            }
+            return allListings;
+        });
+    }
+
+    public KafkaFuture<Set<Integer>> brokerIds() {
+        return future.thenApply(map -> new HashSet<>(map.keySet()));
+    }

Review comment:
       I wonder if this one is really necessary. What's the reasoning behind it?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/ListTransactionsHandler.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.admin.ListTransactionsOptions;
+import org.apache.kafka.clients.admin.TransactionListing;
+import org.apache.kafka.clients.admin.TransactionState;
+import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
+import org.apache.kafka.common.message.ListTransactionsRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.ListTransactionsRequest;
+import org.apache.kafka.common.requests.ListTransactionsResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ListTransactionsHandler implements AdminApiHandler<AllBrokersStrategy.BrokerKey, Collection<TransactionListing>> {
+    private final Logger log;
+    private final ListTransactionsOptions options;
+    private final AllBrokersStrategy lookupStrategy;
+
+    public ListTransactionsHandler(
+        ListTransactionsOptions options,
+        LogContext logContext
+    ) {
+        this.options = options;
+        this.log = logContext.logger(ListTransactionsHandler.class);
+        this.lookupStrategy = new AllBrokersStrategy(logContext);
+    }
+
+    public static AllBrokersStrategy.AllBrokersFuture<Collection<TransactionListing>> newFuture() {
+        return new AllBrokersStrategy.AllBrokersFuture<>();
+    }
+
+    @Override
+    public String apiName() {
+        return "listTransactions";
+    }
+
+    @Override
+    public AdminApiLookupStrategy<AllBrokersStrategy.BrokerKey> lookupStrategy() {
+        return lookupStrategy;
+    }
+
+    @Override
+    public ListTransactionsRequest.Builder buildRequest(
+        int brokerId,
+        Set<AllBrokersStrategy.BrokerKey> keys
+    ) {
+        ListTransactionsRequestData request = new ListTransactionsRequestData();
+        request.setProducerIdFilters(new ArrayList<>(options.filteredProducerIds()));
+        request.setStateFilters(options.filteredStates().stream()
+            .map(TransactionState::toString)
+            .collect(Collectors.toList()));
+        return new ListTransactionsRequest.Builder(request);
+    }
+
+    @Override
+    public ApiResult<AllBrokersStrategy.BrokerKey, Collection<TransactionListing>> handleResponse(
+        int brokerId,
+        Set<AllBrokersStrategy.BrokerKey> keys,
+        AbstractResponse abstractResponse
+    ) {
+        AllBrokersStrategy.BrokerKey key = requireSingleton(keys, brokerId);
+
+        ListTransactionsResponse response = (ListTransactionsResponse) abstractResponse;
+        Errors error = Errors.forCode(response.data().errorCode());
+
+        if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
+            log.debug("The `ListTransactions` request sent to broker {} failed because the " +
+                "coordinator is still loading state. Will try again after backing off", brokerId);
+            return ApiResult.empty();
+        } else if (error == Errors.COORDINATOR_NOT_AVAILABLE) {
+            log.debug("The `ListTransactions` request sent to broker {} failed because the " +
+                "coordinator is shutting down", brokerId);
+            return ApiResult.failed(key, new CoordinatorNotAvailableException("ListTransactions " +
+                "request sent to broker " + brokerId + " failed because the coordinator is shutting down"));
+        } else if (error != Errors.NONE) {
+            log.error("The `ListTransactions` request sent to broker {} failed because of an " +
+                "unexpected error {}", brokerId, error);
+            return ApiResult.failed(key, error.exception("ListTransactions request " +
+                "sent to broker " + brokerId + " failed with an unexpected exception"));
+        } else {
+            List<TransactionListing> listings = response.data().transactionStates().stream()
+                .map(transactionState -> new TransactionListing(
+                    transactionState.transactionalId(),
+                    transactionState.producerId(),
+                    TransactionState.parse(transactionState.transactionState())))
+                .collect(Collectors.toList());
+            return ApiResult.completed(key, listings);
+        }
+    }
+
+    private AllBrokersStrategy.BrokerKey requireSingleton(
+        Set<AllBrokersStrategy.BrokerKey> keys,
+        int brokerId
+    ) {
+        if (keys.size() != 1) {
+            throw new IllegalArgumentException("Unexpected key set" + keys);

Review comment:
       nit: `Unexpected key set` -> `Unexpected key set: `. The `:` might not be necessary but we should add a space. The same applies to the `IllegalArgumentException` below.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsResult.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The result of the {@link Admin#listTransactions()} call.
+ * <p>
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class ListTransactionsResult {
+    private final KafkaFutureImpl<Map<Integer, KafkaFutureImpl<Collection<TransactionListing>>>> future;
+
+    ListTransactionsResult(KafkaFutureImpl<Map<Integer, KafkaFutureImpl<Collection<TransactionListing>>>> future) {
+        this.future = future;
+    }
+
+    public KafkaFuture<Collection<TransactionListing>> all() {
+        return allByBrokerId().thenApply(map -> {
+            List<TransactionListing> allListings = new ArrayList<>();
+            for (Collection<TransactionListing> listings : map.values()) {
+                allListings.addAll(listings);
+            }
+            return allListings;
+        });
+    }
+
+    public KafkaFuture<Set<Integer>> brokerIds() {
+        return future.thenApply(map -> new HashSet<>(map.keySet()));
+    }
+
+    public KafkaFuture<Map<Integer, Collection<TransactionListing>>> allByBrokerId() {
+        KafkaFutureImpl<Map<Integer, Collection<TransactionListing>>> allFuture = new KafkaFutureImpl<>();
+        Map<Integer, Collection<TransactionListing>> allListingsMap = new HashMap<>();
+
+        future.whenComplete((map, topLevelException) -> {
+            if (topLevelException != null) {
+                allFuture.completeExceptionally(topLevelException);
+                return;
+            }
+
+            Set<Integer> remainingResponses = new HashSet<>(map.keySet());
+            for (Map.Entry<Integer, KafkaFutureImpl<Collection<TransactionListing>>> entry : map.entrySet()) {
+                Integer brokerId = entry.getKey();
+                KafkaFutureImpl<Collection<TransactionListing>> future = entry.getValue();
+                future.whenComplete((listings, brokerException) -> {
+

Review comment:
       nit: This empty line is not necessary.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##########
@@ -1533,6 +1533,28 @@ default AbortTransactionResult abortTransaction(AbortTransactionSpec spec) {
      */
     AbortTransactionResult abortTransaction(AbortTransactionSpec spec, AbortTransactionOptions options);
 
+    /**
+     * List active transactions in the cluster. See
+     * {@link #listTransactions(ListTransactionsOptions)} for more details.
+     *
+     * @return The result
+     */
+    default ListTransactionsResult listTransactions() {
+        return listTransactions(new ListTransactionsOptions());
+    }
+
+    /**
+     * List active transactions in the cluster. This will query all potential transaction
+     * coordinators in the cluster and collect the state of all transactionalIds. Users

Review comment:
       nit: `transactionalIds` -> `transactions`?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/ListTransactionsResult.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The result of the {@link Admin#listTransactions()} call.
+ * <p>
+ * The API of this class is evolving, see {@link Admin} for details.
+ */
+@InterfaceStability.Evolving
+public class ListTransactionsResult {
+    private final KafkaFutureImpl<Map<Integer, KafkaFutureImpl<Collection<TransactionListing>>>> future;
+
+    ListTransactionsResult(KafkaFutureImpl<Map<Integer, KafkaFutureImpl<Collection<TransactionListing>>>> future) {
+        this.future = future;
+    }
+
+    public KafkaFuture<Collection<TransactionListing>> all() {
+        return allByBrokerId().thenApply(map -> {
+            List<TransactionListing> allListings = new ArrayList<>();
+            for (Collection<TransactionListing> listings : map.values()) {
+                allListings.addAll(listings);
+            }
+            return allListings;
+        });
+    }
+
+    public KafkaFuture<Set<Integer>> brokerIds() {
+        return future.thenApply(map -> new HashSet<>(map.keySet()));
+    }
+
+    public KafkaFuture<Map<Integer, Collection<TransactionListing>>> allByBrokerId() {
+        KafkaFutureImpl<Map<Integer, Collection<TransactionListing>>> allFuture = new KafkaFutureImpl<>();
+        Map<Integer, Collection<TransactionListing>> allListingsMap = new HashMap<>();
+
+        future.whenComplete((map, topLevelException) -> {
+            if (topLevelException != null) {
+                allFuture.completeExceptionally(topLevelException);
+                return;
+            }
+
+            Set<Integer> remainingResponses = new HashSet<>(map.keySet());
+            for (Map.Entry<Integer, KafkaFutureImpl<Collection<TransactionListing>>> entry : map.entrySet()) {
+                Integer brokerId = entry.getKey();
+                KafkaFutureImpl<Collection<TransactionListing>> future = entry.getValue();

Review comment:
       nit: We could use `forEach` here. It makes the code more concise.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org