You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/04/27 08:51:46 UTC

[GitHub] [kafka] dajac commented on a change in pull request #10599: KAFKA-12716; Add `Admin` API to abort transactions

dajac commented on a change in pull request #10599:
URL: https://github.com/apache/kafka/pull/10599#discussion_r620974574



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/AbortTransactionResult.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+@InterfaceStability.Evolving
+public class AbortTransactionResult {

Review comment:
       Should we add some javadoc to the classes/methods published in our public API?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/AbortTransactionSpec.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Objects;
+
+@InterfaceStability.Evolving
+public class AbortTransactionSpec {
+    private final TopicPartition topicPartition;
+    private final long producerId;
+    private final short producerEpoch;
+    private final int coordinatorEpoch;

Review comment:
       In the KIP, you also mentioned `transactionStartOffset`. Is it not required anymore or do you plan to add it later on?

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/internals/AbortTransactionHandlerTest.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.admin.AbortTransactionSpec;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
+import org.apache.kafka.common.errors.TransactionCoordinatorFencedException;
+import org.apache.kafka.common.message.WriteTxnMarkersRequestData;
+import org.apache.kafka.common.message.WriteTxnMarkersResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.WriteTxnMarkersRequest;
+import org.apache.kafka.common.requests.WriteTxnMarkersResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.junit.jupiter.api.Test;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class AbortTransactionHandlerTest {
+    private final LogContext logContext = new LogContext();
+    private final TopicPartition topicPartition = new TopicPartition("foo", 5);
+    private final AbortTransactionSpec abortSpec = new AbortTransactionSpec(
+        topicPartition, 12345L, (short) 15, 4321);
+
+    @Test
+    public void testInvalidBuildRequestCall() {
+        AbortTransactionHandler handler = new AbortTransactionHandler(abortSpec, logContext);
+        assertThrows(IllegalArgumentException.class, () -> handler.buildRequest(1,
+            emptySet()));
+        assertThrows(IllegalArgumentException.class, () -> handler.buildRequest(1,
+            mkSet(new TopicPartition("foo", 1))));
+        assertThrows(IllegalArgumentException.class, () -> handler.buildRequest(1,
+            mkSet(topicPartition, new TopicPartition("foo", 1))));
+    }
+
+    @Test
+    public void testValidBuildRequestCall() {
+        AbortTransactionHandler handler = new AbortTransactionHandler(abortSpec, logContext);
+        WriteTxnMarkersRequest.Builder request = handler.buildRequest(1, singleton(topicPartition));
+        assertEquals(1, request.data.markers().size());
+
+        WriteTxnMarkersRequestData.WritableTxnMarker markerRequest = request.data.markers().get(0);
+        assertEquals(abortSpec.producerId(), markerRequest.producerId());
+        assertEquals(abortSpec.producerEpoch(), markerRequest.producerEpoch());
+        assertEquals(abortSpec.coordinatorEpoch(), markerRequest.coordinatorEpoch());
+        assertEquals(1, markerRequest.topics().size());
+
+        WriteTxnMarkersRequestData.WritableTxnMarkerTopic topicRequest = markerRequest.topics().get(0);
+        assertEquals(abortSpec.topicPartition().topic(), topicRequest.name());
+        assertEquals(singletonList(abortSpec.topicPartition().partition()), topicRequest.partitionIndexes());
+    }
+
+    @Test
+    public void testInvalidHandleResponseCall() {
+        AbortTransactionHandler handler = new AbortTransactionHandler(abortSpec, logContext);
+        WriteTxnMarkersResponseData response = new WriteTxnMarkersResponseData();
+        assertThrows(IllegalArgumentException.class, () -> handler.handleResponse(1,
+            emptySet(), new WriteTxnMarkersResponse(response)));
+        assertThrows(IllegalArgumentException.class, () -> handler.handleResponse(1,
+            mkSet(new TopicPartition("foo", 1)), new WriteTxnMarkersResponse(response)));
+        assertThrows(IllegalArgumentException.class, () -> handler.handleResponse(1,
+            mkSet(topicPartition, new TopicPartition("foo", 1)), new WriteTxnMarkersResponse(response)));
+    }
+
+    @Test
+    public void testInvalidResponse() {
+        AbortTransactionHandler handler = new AbortTransactionHandler(abortSpec, logContext);
+
+        WriteTxnMarkersResponseData response = new WriteTxnMarkersResponseData();
+        assertFailed(KafkaException.class, topicPartition, handler.handleResponse(1, singleton(topicPartition),
+            new WriteTxnMarkersResponse(response)));
+
+        WriteTxnMarkersResponseData.WritableTxnMarkerResult markerResponse =
+            new WriteTxnMarkersResponseData.WritableTxnMarkerResult();
+        response.markers().add(markerResponse);
+        assertFailed(KafkaException.class, topicPartition, handler.handleResponse(1, singleton(topicPartition),
+            new WriteTxnMarkersResponse(response)));
+
+        markerResponse.setProducerId(abortSpec.producerId());
+        assertFailed(KafkaException.class, topicPartition, handler.handleResponse(1, singleton(topicPartition),
+            new WriteTxnMarkersResponse(response)));
+
+        WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult topicResponse =
+            new WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult();
+        markerResponse.topics().add(topicResponse);
+        assertFailed(KafkaException.class, topicPartition, handler.handleResponse(1, singleton(topicPartition),
+            new WriteTxnMarkersResponse(response)));
+
+        topicResponse.setName(abortSpec.topicPartition().topic());
+        assertFailed(KafkaException.class, topicPartition, handler.handleResponse(1, singleton(topicPartition),
+            new WriteTxnMarkersResponse(response)));
+
+        WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult partitionResponse =
+            new WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult();
+        topicResponse.partitions().add(partitionResponse);
+        assertFailed(KafkaException.class, topicPartition, handler.handleResponse(1, singleton(topicPartition),
+            new WriteTxnMarkersResponse(response)));
+
+        partitionResponse.setPartitionIndex(abortSpec.topicPartition().partition());
+        topicResponse.setName(abortSpec.topicPartition().topic() + "random");
+        assertFailed(KafkaException.class, topicPartition, handler.handleResponse(1, singleton(topicPartition),
+            new WriteTxnMarkersResponse(response)));
+
+        topicResponse.setName(abortSpec.topicPartition().topic());
+        markerResponse.setProducerId(abortSpec.producerId() + 1);
+        assertFailed(KafkaException.class, topicPartition, handler.handleResponse(1, singleton(topicPartition),
+            new WriteTxnMarkersResponse(response)));
+    }
+
+    @Test
+    public void testSuccessfulResponse() {
+        assertCompleted(abortSpec.topicPartition(), handleWithError(abortSpec, Errors.NONE));
+    }
+
+    @Test
+    public void testRetriableErrors() {
+        assertUnmapped(abortSpec.topicPartition(), handleWithError(abortSpec, Errors.NOT_LEADER_OR_FOLLOWER));
+        assertUnmapped(abortSpec.topicPartition(), handleWithError(abortSpec, Errors.UNKNOWN_TOPIC_OR_PARTITION));
+        assertUnmapped(abortSpec.topicPartition(), handleWithError(abortSpec, Errors.REPLICA_NOT_AVAILABLE));
+        assertUnmapped(abortSpec.topicPartition(), handleWithError(abortSpec, Errors.BROKER_NOT_AVAILABLE));
+    }
+
+    @Test
+    public void testFatalErrors() {

Review comment:
       Should we also test an unexpected error?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/AbortTransactionResult.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+@InterfaceStability.Evolving
+public class AbortTransactionResult {
+    private final Map<TopicPartition, KafkaFutureImpl<Void>> futures;
+
+    AbortTransactionResult(Map<TopicPartition, KafkaFutureImpl<Void>> futures) {
+        this.futures = futures;
+    }
+
+    public KafkaFuture<Void> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]))
+            .thenApply(nil -> {
+                for (Map.Entry<TopicPartition, KafkaFutureImpl<Void>> entry : futures.entrySet()) {
+                    try {
+                        KafkaFutureImpl<Void> future = entry.getValue();
+                        future.get();
+                    } catch (InterruptedException | ExecutionException e) {
+                        // This should be unreachable, because allOf ensured that all the futures completed successfully.
+                        throw new KafkaException(e);
+                    }
+                }
+                return null;
+            });

Review comment:
       Is the `thenApply` really necessary here? It seems that `KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]))` already returns what we need here.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/AbortTransactionHandler.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.admin.AbortTransactionSpec;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
+import org.apache.kafka.common.errors.TransactionCoordinatorFencedException;
+import org.apache.kafka.common.message.WriteTxnMarkersRequestData;
+import org.apache.kafka.common.message.WriteTxnMarkersResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.WriteTxnMarkersRequest;
+import org.apache.kafka.common.requests.WriteTxnMarkersResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
+
+public class AbortTransactionHandler implements AdminApiHandler<TopicPartition, Void> {
+    private final Logger log;
+    private final AbortTransactionSpec abortSpec;
+    private final PartitionLeaderStrategy lookupStrategy;
+
+    public AbortTransactionHandler(
+        AbortTransactionSpec abortSpec,
+        LogContext logContext
+    ) {
+        this.abortSpec = abortSpec;
+        this.log = logContext.logger(AbortTransactionHandler.class);
+        this.lookupStrategy = new PartitionLeaderStrategy(logContext);
+    }
+
+    @Override
+    public String apiName() {
+        return "abortTransaction";
+    }
+
+    @Override
+    public Keys<TopicPartition> initializeKeys() {
+        return Keys.dynamicMapped(
+            Collections.singleton(abortSpec.topicPartition()),
+            lookupStrategy
+        );
+    }
+
+    @Override
+    public WriteTxnMarkersRequest.Builder buildRequest(
+        int brokerId,
+        Set<TopicPartition> topicPartitions
+    ) {
+        validateTopicPartitions(topicPartitions);
+
+        WriteTxnMarkersRequestData.WritableTxnMarker marker = new WriteTxnMarkersRequestData.WritableTxnMarker()
+            .setCoordinatorEpoch(abortSpec.coordinatorEpoch())
+            .setProducerEpoch(abortSpec.producerEpoch())
+            .setProducerId(abortSpec.producerId())
+            .setTransactionResult(false);
+
+        marker.topics().add(new WriteTxnMarkersRequestData.WritableTxnMarkerTopic()
+            .setName(abortSpec.topicPartition().topic())
+            .setPartitionIndexes(singletonList(abortSpec.topicPartition().partition()))
+        );
+
+        WriteTxnMarkersRequestData request = new WriteTxnMarkersRequestData();
+        request.markers().add(marker);
+
+        return new WriteTxnMarkersRequest.Builder(request);
+    }
+
+    @Override
+    public ApiResult<TopicPartition, Void> handleResponse(
+        int brokerId,
+        Set<TopicPartition> topicPartitions,
+        AbstractResponse abstractResponse
+    ) {
+        validateTopicPartitions(topicPartitions);
+
+        WriteTxnMarkersResponse response = (WriteTxnMarkersResponse) abstractResponse;
+        List<WriteTxnMarkersResponseData.WritableTxnMarkerResult> markerResponses = response.data().markers();
+
+        if (markerResponses.size() != 1 || markerResponses.get(0).producerId() != abortSpec.producerId()) {
+            return ApiResult.failed(abortSpec.topicPartition(), new KafkaException("WriteTxnMarkers response " +
+                "included unexpected marker entries: " + markerResponses + "(expected to find exactly one " +
+                "entry with producerId " + abortSpec.producerId() + ")"));
+        }
+
+        WriteTxnMarkersResponseData.WritableTxnMarkerResult markerResponse = markerResponses.get(0);
+        List<WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult> topicResponses = markerResponse.topics();
+
+        if (topicResponses.size() != 1 || !topicResponses.get(0).name().equals(abortSpec.topicPartition().topic())) {
+            return ApiResult.failed(abortSpec.topicPartition(), new KafkaException("WriteTxnMarkers response " +
+                "included unexpected topic entries: " + markerResponses + "(expected to find exactly one " +
+                "entry with topic partition " + abortSpec.topicPartition() + ")"));
+        }
+
+        WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult topicResponse = topicResponses.get(0);
+        List<WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult> partitionResponses =
+            topicResponse.partitions();
+
+        if (partitionResponses.size() != 1 || partitionResponses.get(0).partitionIndex() != abortSpec.topicPartition().partition()) {
+            return ApiResult.failed(abortSpec.topicPartition(), new KafkaException("WriteTxnMarkers response " +
+                "included unexpected partition entries for topic " + abortSpec.topicPartition().topic() +
+                ": " + markerResponses + "(expected to find exactly one entry with partition " +
+                abortSpec.topicPartition().partition() + ")"));
+        }
+
+        WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult partitionResponse = partitionResponses.get(0);
+        Errors error = Errors.forCode(partitionResponse.errorCode());
+
+        if (error != Errors.NONE) {
+            return handleError(error);
+        } else {
+            return ApiResult.completed(abortSpec.topicPartition(), null);
+        }
+    }
+
+    private ApiResult<TopicPartition, Void> handleError(Errors error) {
+        switch (error) {
+            case CLUSTER_AUTHORIZATION_FAILED:
+                log.error("WriteTxnMarkers request for abort spec {} failed cluster authorization", abortSpec);
+                return ApiResult.failed(abortSpec.topicPartition(), new ClusterAuthorizationException(
+                    "WriteTxnMarkers request with " + abortSpec + " failed due to cluster " +
+                        "authorization error."));

Review comment:
       nit: You've put a `.` at the end of this exception message but not to the others below. I would add it everywhere or remove it here.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/AbortTransactionResult.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+@InterfaceStability.Evolving
+public class AbortTransactionResult {
+    private final Map<TopicPartition, KafkaFutureImpl<Void>> futures;
+
+    AbortTransactionResult(Map<TopicPartition, KafkaFutureImpl<Void>> futures) {
+        this.futures = futures;
+    }
+
+    public KafkaFuture<Void> all() {

Review comment:
       I wonder if we should also add a method to access the result of individual partition. `all()` will fail if any of the partition failed but the user does not know which one and he has no way to inspect the results of all partitions. What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org