You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ch...@apache.org on 2024/03/27 19:13:38 UTC
(kafka) branch trunk updated: MINOR: Renaming the `Abortable_Transaction` error to `Transaction_Abortable` (#15609)
This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cf1ba099c07 MINOR: Renaming the `Abortable_Transaction` error to `Transaction_Abortable` (#15609)
cf1ba099c07 is described below
commit cf1ba099c0723f9cf65dda4cd334d36b7ede6327
Author: Sanskar Jhajharia <12...@users.noreply.github.com>
AuthorDate: Thu Mar 28 00:43:32 2024 +0530
MINOR: Renaming the `Abortable_Transaction` error to `Transaction_Abortable` (#15609)
This is a follow-up to this PR (https://github.com/apache/kafka/pull/15486) which introduced the new ABORTABLE_TRANSACTION error as a part of KIP-890 efforts. However on further discussion, we seem to gain consensus that the error should be rather named as TRANSACTION_ABORTABLE.
This PR aims to address the same. There are no changes in the code apart from that.
Reviewers: Justine Olshan <jo...@confluent.io>, Igor Soarez <so...@apple.com>, Chia-Ping Tsai <ch...@gmail.com>
---
.../producer/internals/TransactionManager.java | 12 ++---
...ion.java => TransactionAbortableException.java} | 4 +-
.../org/apache/kafka/common/protocol/Errors.java | 4 +-
.../common/message/AddOffsetsToTxnRequest.json | 2 +-
.../common/message/AddOffsetsToTxnResponse.json | 2 +-
.../common/message/AddPartitionsToTxnRequest.json | 2 +-
.../common/message/AddPartitionsToTxnResponse.json | 2 +-
.../resources/common/message/EndTxnRequest.json | 2 +-
.../resources/common/message/EndTxnResponse.json | 2 +-
.../common/message/FindCoordinatorRequest.json | 2 +-
.../common/message/FindCoordinatorResponse.json | 2 +-
.../common/message/InitProducerIdRequest.json | 2 +-
.../common/message/InitProducerIdResponse.json | 2 +-
.../resources/common/message/ProduceRequest.json | 2 +-
.../resources/common/message/ProduceResponse.json | 2 +-
.../common/message/TxnOffsetCommitRequest.json | 2 +-
.../common/message/TxnOffsetCommitResponse.json | 2 +-
.../clients/producer/internals/SenderTest.java | 12 ++---
.../producer/internals/TransactionManagerTest.java | 54 +++++++++++-----------
.../transaction/TransactionCoordinator.scala | 2 +-
.../kafka/server/AddPartitionsToTxnManager.scala | 4 +-
.../transaction/TransactionCoordinatorTest.scala | 4 +-
.../server/AddPartitionsToTxnManagerTest.scala | 28 +++++------
.../AddPartitionsToTxnRequestServerTest.scala | 2 +-
24 files changed, 77 insertions(+), 77 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 4ff54b17598..fab24c5e5e2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -1330,7 +1330,7 @@ public class TransactionManager {
// We could still receive INVALID_PRODUCER_EPOCH from old versioned transaction coordinator,
// just treat it the same as PRODUCE_FENCED.
fatalError(Errors.PRODUCER_FENCED.exception());
- } else if (error == Errors.ABORTABLE_TRANSACTION) {
+ } else if (error == Errors.TRANSACTION_ABORTABLE) {
abortableError(error.exception());
} else {
fatalError(new KafkaException("Unexpected error in InitProducerIdResponse; " + error.message()));
@@ -1401,7 +1401,7 @@ public class TransactionManager {
} else if (error == Errors.UNKNOWN_PRODUCER_ID || error == Errors.INVALID_PRODUCER_ID_MAPPING) {
abortableErrorIfPossible(error.exception());
return;
- } else if (error == Errors.ABORTABLE_TRANSACTION) {
+ } else if (error == Errors.TRANSACTION_ABORTABLE) {
abortableError(error.exception());
return;
} else {
@@ -1507,7 +1507,7 @@ public class TransactionManager {
fatalError(error.exception());
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
abortableError(GroupAuthorizationException.forGroupId(key));
- } else if (error == Errors.ABORTABLE_TRANSACTION) {
+ } else if (error == Errors.TRANSACTION_ABORTABLE) {
abortableError(error.exception());
} else {
fatalError(new KafkaException(String.format("Could not find a coordinator with type %s with key %s due to " +
@@ -1562,7 +1562,7 @@ public class TransactionManager {
fatalError(error.exception());
} else if (error == Errors.UNKNOWN_PRODUCER_ID || error == Errors.INVALID_PRODUCER_ID_MAPPING) {
abortableErrorIfPossible(error.exception());
- } else if (error == Errors.ABORTABLE_TRANSACTION) {
+ } else if (error == Errors.TRANSACTION_ABORTABLE) {
abortableError(error.exception());
} else {
fatalError(new KafkaException("Unhandled error in EndTxnResponse: " + error.message()));
@@ -1622,7 +1622,7 @@ public class TransactionManager {
fatalError(error.exception());
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
abortableError(GroupAuthorizationException.forGroupId(builder.data.groupId()));
- } else if (error == Errors.ABORTABLE_TRANSACTION) {
+ } else if (error == Errors.TRANSACTION_ABORTABLE) {
abortableError(error.exception());
} else {
fatalError(new KafkaException("Unexpected error in AddOffsetsToTxnResponse: " + error.message()));
@@ -1687,7 +1687,7 @@ public class TransactionManager {
abortableError(GroupAuthorizationException.forGroupId(builder.data.groupId()));
break;
} else if (error == Errors.FENCED_INSTANCE_ID ||
- error == Errors.ABORTABLE_TRANSACTION) {
+ error == Errors.TRANSACTION_ABORTABLE) {
abortableError(error.exception());
break;
} else if (error == Errors.UNKNOWN_MEMBER_ID
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/AbortableTransactionException.java b/clients/src/main/java/org/apache/kafka/common/errors/TransactionAbortableException.java
similarity index 87%
rename from clients/src/main/java/org/apache/kafka/common/errors/AbortableTransactionException.java
rename to clients/src/main/java/org/apache/kafka/common/errors/TransactionAbortableException.java
index cfea649e750..aa592d552bf 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/AbortableTransactionException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/TransactionAbortableException.java
@@ -16,8 +16,8 @@
*/
package org.apache.kafka.common.errors;
-public class AbortableTransactionException extends ApiException {
- public AbortableTransactionException(String message) {
+public class TransactionAbortableException extends ApiException {
+ public TransactionAbortableException(String message) {
super(message);
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 30bc250c6de..900d191c8f9 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -138,7 +138,7 @@ import org.apache.kafka.common.errors.UnsupportedEndpointTypeException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
-import org.apache.kafka.common.errors.AbortableTransactionException;
+import org.apache.kafka.common.errors.TransactionAbortableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -394,7 +394,7 @@ public enum Errors {
UNKNOWN_SUBSCRIPTION_ID(117, "Client sent a push telemetry request with an invalid or outdated subscription ID.", UnknownSubscriptionIdException::new),
TELEMETRY_TOO_LARGE(118, "Client sent a push telemetry request larger than the maximum size the broker will accept.", TelemetryTooLargeException::new),
INVALID_REGISTRATION(119, "The controller has considered the broker registration to be invalid.", InvalidRegistrationException::new),
- ABORTABLE_TRANSACTION(120, "The server encountered an error with the transaction. The client can abort the transaction to continue using this transactional ID.", AbortableTransactionException::new);
+ TRANSACTION_ABORTABLE(120, "The server encountered an error with the transaction. The client can abort the transaction to continue using this transactional ID.", TransactionAbortableException::new);
private static final Logger log = LoggerFactory.getLogger(Errors.class);
diff --git a/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json b/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json
index 9d7b63c3133..157ae20c0a4 100644
--- a/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json
+++ b/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json
@@ -24,7 +24,7 @@
//
// Version 3 enables flexible versions.
//
- // Version 4 adds support for new error code ABORTABLE_TRANSACTION (KIP-890).
+ // Version 4 adds support for new error code TRANSACTION_ABORTABLE (KIP-890).
"validVersions": "0-4",
"flexibleVersions": "3+",
"fields": [
diff --git a/clients/src/main/resources/common/message/AddOffsetsToTxnResponse.json b/clients/src/main/resources/common/message/AddOffsetsToTxnResponse.json
index 6b3b1c481d6..6a713fea1af 100644
--- a/clients/src/main/resources/common/message/AddOffsetsToTxnResponse.json
+++ b/clients/src/main/resources/common/message/AddOffsetsToTxnResponse.json
@@ -23,7 +23,7 @@
//
// Version 3 enables flexible versions.
//
- // Version 4 adds support for new error code ABORTABLE_TRANSACTION (KIP-890).
+ // Version 4 adds support for new error code TRANSACTION_ABORTABLE (KIP-890).
"validVersions": "0-4",
"flexibleVersions": "3+",
"fields": [
diff --git a/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json b/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json
index 139d1436a65..2270f7a8469 100644
--- a/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json
+++ b/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json
@@ -26,7 +26,7 @@
//
// Version 4 adds VerifyOnly field to check if partitions are already in transaction and adds support to batch multiple transactions.
//
- // Version 5 adds support for new error code ABORTABLE_TRANSACTION (KIP-890).
+ // Version 5 adds support for new error code TRANSACTION_ABORTABLE (KIP-890).
// Versions 3 and below will be exclusively used by clients and versions 4 and above will be used by brokers.
"latestVersionUnstable": false,
"validVersions": "0-5",
diff --git a/clients/src/main/resources/common/message/AddPartitionsToTxnResponse.json b/clients/src/main/resources/common/message/AddPartitionsToTxnResponse.json
index a2af388dba5..6c4eefd2cc0 100644
--- a/clients/src/main/resources/common/message/AddPartitionsToTxnResponse.json
+++ b/clients/src/main/resources/common/message/AddPartitionsToTxnResponse.json
@@ -25,7 +25,7 @@
//
// Version 4 adds support to batch multiple transactions and a top level error code.
//
- // Version 5 adds support for new error code ABORTABLE_TRANSACTION (KIP-890).
+ // Version 5 adds support for new error code TRANSACTION_ABORTABLE (KIP-890).
"validVersions": "0-5",
"flexibleVersions": "3+",
"fields": [
diff --git a/clients/src/main/resources/common/message/EndTxnRequest.json b/clients/src/main/resources/common/message/EndTxnRequest.json
index 5bf6c577343..bc66adcf50a 100644
--- a/clients/src/main/resources/common/message/EndTxnRequest.json
+++ b/clients/src/main/resources/common/message/EndTxnRequest.json
@@ -24,7 +24,7 @@
//
// Version 3 enables flexible versions.
//
- // Version 4 adds support for new error code ABORTABLE_TRANSACTION (KIP-890).
+ // Version 4 adds support for new error code TRANSACTION_ABORTABLE (KIP-890).
"validVersions": "0-4",
"flexibleVersions": "3+",
"fields": [
diff --git a/clients/src/main/resources/common/message/EndTxnResponse.json b/clients/src/main/resources/common/message/EndTxnResponse.json
index 53b0250f808..08ac6cddd38 100644
--- a/clients/src/main/resources/common/message/EndTxnResponse.json
+++ b/clients/src/main/resources/common/message/EndTxnResponse.json
@@ -23,7 +23,7 @@
//
// Version 3 enables flexible versions.
//
- // Version 4 adds support for new error code ABORTABLE_TRANSACTION (KIP-890).
+ // Version 4 adds support for new error code TRANSACTION_ABORTABLE (KIP-890).
"validVersions": "0-4",
"flexibleVersions": "3+",
"fields": [
diff --git a/clients/src/main/resources/common/message/FindCoordinatorRequest.json b/clients/src/main/resources/common/message/FindCoordinatorRequest.json
index e6786f5b10e..42b2f4c891a 100644
--- a/clients/src/main/resources/common/message/FindCoordinatorRequest.json
+++ b/clients/src/main/resources/common/message/FindCoordinatorRequest.json
@@ -26,7 +26,7 @@
//
// Version 4 adds support for batching via CoordinatorKeys (KIP-699)
//
- // Version 5 adds support for new error code ABORTABLE_TRANSACTION (KIP-890).
+ // Version 5 adds support for new error code TRANSACTION_ABORTABLE (KIP-890).
"validVersions": "0-5",
"deprecatedVersions": "0",
"flexibleVersions": "3+",
diff --git a/clients/src/main/resources/common/message/FindCoordinatorResponse.json b/clients/src/main/resources/common/message/FindCoordinatorResponse.json
index a744a1928d6..860d655a252 100644
--- a/clients/src/main/resources/common/message/FindCoordinatorResponse.json
+++ b/clients/src/main/resources/common/message/FindCoordinatorResponse.json
@@ -25,7 +25,7 @@
//
// Version 4 adds support for batching via Coordinators (KIP-699)
//
- // Version 5 adds support for new error code ABORTABLE_TRANSACTION (KIP-890).
+ // Version 5 adds support for new error code TRANSACTION_ABORTABLE (KIP-890).
"validVersions": "0-5",
"flexibleVersions": "3+",
"fields": [
diff --git a/clients/src/main/resources/common/message/InitProducerIdRequest.json b/clients/src/main/resources/common/message/InitProducerIdRequest.json
index 92e6a6b2537..39f546dbc04 100644
--- a/clients/src/main/resources/common/message/InitProducerIdRequest.json
+++ b/clients/src/main/resources/common/message/InitProducerIdRequest.json
@@ -26,7 +26,7 @@
//
// Version 4 adds the support for new error code PRODUCER_FENCED.
//
- // Verison 5 adds support for new error code ABORTABLE_TRANSACTION (KIP-890).
+ // Verison 5 adds support for new error code TRANSACTION_ABORTABLE (KIP-890).
"validVersions": "0-5",
"flexibleVersions": "2+",
"fields": [
diff --git a/clients/src/main/resources/common/message/InitProducerIdResponse.json b/clients/src/main/resources/common/message/InitProducerIdResponse.json
index c0f10b2e851..c5dfec6e321 100644
--- a/clients/src/main/resources/common/message/InitProducerIdResponse.json
+++ b/clients/src/main/resources/common/message/InitProducerIdResponse.json
@@ -25,7 +25,7 @@
//
// Version 4 adds the support for new error code PRODUCER_FENCED.
//
- // Version 5 adds support for new error code ABORTABLE_TRANSACTION (KIP-890).
+ // Version 5 adds support for new error code TRANSACTION_ABORTABLE (KIP-890).
"validVersions": "0-5",
"flexibleVersions": "2+",
"fields": [
diff --git a/clients/src/main/resources/common/message/ProduceRequest.json b/clients/src/main/resources/common/message/ProduceRequest.json
index d396d66070d..ae01fe5c8c0 100644
--- a/clients/src/main/resources/common/message/ProduceRequest.json
+++ b/clients/src/main/resources/common/message/ProduceRequest.json
@@ -36,7 +36,7 @@
//
// Version 10 is the same as version 9 (KIP-951).
//
- // Version 11 adds support for new error code ABORTABLE_TRANSACTION (KIP-890).
+ // Version 11 adds support for new error code TRANSACTION_ABORTABLE (KIP-890).
"validVersions": "0-11",
"deprecatedVersions": "0-6",
"flexibleVersions": "9+",
diff --git a/clients/src/main/resources/common/message/ProduceResponse.json b/clients/src/main/resources/common/message/ProduceResponse.json
index 1c097d3bc3b..92c7a2223da 100644
--- a/clients/src/main/resources/common/message/ProduceResponse.json
+++ b/clients/src/main/resources/common/message/ProduceResponse.json
@@ -35,7 +35,7 @@
//
// Version 10 adds 'CurrentLeader' and 'NodeEndpoints' as tagged fields (KIP-951)
//
- // Version 11 adds support for new error code ABORTABLE_TRANSACTION (KIP-890).
+ // Version 11 adds support for new error code TRANSACTION_ABORTABLE (KIP-890).
"validVersions": "0-11",
"flexibleVersions": "9+",
"fields": [
diff --git a/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json b/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json
index 1df18c64b52..3cb63aa8fb8 100644
--- a/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json
+++ b/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json
@@ -24,7 +24,7 @@
//
// Version 3 adds the member.id, group.instance.id and generation.id.
//
- // Version 4 adds support for new error code ABORTABLE_TRANSACTION (KIP-890).
+ // Version 4 adds support for new error code TRANSACTION_ABORTABLE (KIP-890).
"validVersions": "0-4",
"flexibleVersions": "3+",
"fields": [
diff --git a/clients/src/main/resources/common/message/TxnOffsetCommitResponse.json b/clients/src/main/resources/common/message/TxnOffsetCommitResponse.json
index 0f6c1f27241..1a04cef9d5e 100644
--- a/clients/src/main/resources/common/message/TxnOffsetCommitResponse.json
+++ b/clients/src/main/resources/common/message/TxnOffsetCommitResponse.json
@@ -23,7 +23,7 @@
//
// Version 3 adds illegal generation, fenced instance id, and unknown member id errors.
//
- // Version 4 adds support for new error code ABORTABLE_TRANSACTION (KIP-890).
+ // Version 4 adds support for new error code TRANSACTION_ABORTABLE (KIP-890).
"validVersions": "0-4",
"flexibleVersions": "3+",
"fields": [
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index b9c734a2cb1..4ef9dab4d09 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -42,7 +42,7 @@ import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TransactionAbortedException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
-import org.apache.kafka.common.errors.AbortableTransactionException;
+import org.apache.kafka.common.errors.TransactionAbortableException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.AddPartitionsToTxnResponseData;
import org.apache.kafka.common.message.ApiMessageType;
@@ -3149,10 +3149,10 @@ public class SenderTest {
}
@Test
- public void testAbortableTxnExceptionIsAnAbortableError() throws Exception {
+ public void testTransactionAbortablenExceptionIsAnAbortableError() throws Exception {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
apiVersions.update("0", NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
- TransactionManager txnManager = new TransactionManager(logContext, "textAbortableTxnException", 60000, 100, apiVersions);
+ TransactionManager txnManager = new TransactionManager(logContext, "textTransactionAbortableException", 60000, 100, apiVersions);
setupWithTransactionState(txnManager);
doInitTransactions(txnManager, producerIdAndEpoch);
@@ -3164,11 +3164,11 @@ public class SenderTest {
Future<RecordMetadata> request = appendToAccumulator(tp0);
sender.runOnce(); // send request
- sendIdempotentProducerResponse(0, tp0, Errors.ABORTABLE_TRANSACTION, -1);
+ sendIdempotentProducerResponse(0, tp0, Errors.TRANSACTION_ABORTABLE, -1);
- // Return AbortableTransactionException error. It should be abortable.
+ // Return TransactionAbortableException error. It should be abortable.
sender.runOnce();
- assertFutureFailure(request, AbortableTransactionException.class);
+ assertFutureFailure(request, TransactionAbortableException.class);
assertTrue(txnManager.hasAbortableError());
TransactionalRequestResult result = txnManager.beginAbort();
sender.runOnce();
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 4d8c445be0a..51299ad337e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -40,7 +40,7 @@ import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
-import org.apache.kafka.common.errors.AbortableTransactionException;
+import org.apache.kafka.common.errors.TransactionAbortableException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
@@ -3516,22 +3516,22 @@ public class TransactionManagerTest {
}
@Test
- public void testAbortableTransactionExceptionInInitProducerId() {
+ public void testTransactionAbortableExceptionInInitProducerId() {
TransactionalRequestResult initPidResult = transactionManager.initializeTransactions();
prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
- prepareInitPidResponse(Errors.ABORTABLE_TRANSACTION, false, producerId, RecordBatch.NO_PRODUCER_EPOCH);
+ prepareInitPidResponse(Errors.TRANSACTION_ABORTABLE, false, producerId, RecordBatch.NO_PRODUCER_EPOCH);
runUntil(transactionManager::hasError);
assertTrue(initPidResult.isCompleted());
assertFalse(initPidResult.isSuccessful());
- assertThrows(AbortableTransactionException.class, initPidResult::await);
- assertAbortableError(AbortableTransactionException.class);
+ assertThrows(TransactionAbortableException.class, initPidResult::await);
+ assertAbortableError(TransactionAbortableException.class);
}
@Test
- public void testAbortableTransactionExceptionInAddPartitions() {
+ public void testTransactionAbortableExceptionInAddPartitions() {
final TopicPartition tp = new TopicPartition("foo", 0);
doInitTransactions();
@@ -3539,15 +3539,15 @@ public class TransactionManagerTest {
transactionManager.beginTransaction();
transactionManager.maybeAddPartition(tp);
- prepareAddPartitionsToTxn(tp, Errors.ABORTABLE_TRANSACTION);
+ prepareAddPartitionsToTxn(tp, Errors.TRANSACTION_ABORTABLE);
runUntil(transactionManager::hasError);
- assertTrue(transactionManager.lastError() instanceof AbortableTransactionException);
+ assertTrue(transactionManager.lastError() instanceof TransactionAbortableException);
- assertAbortableError(AbortableTransactionException.class);
+ assertAbortableError(TransactionAbortableException.class);
}
@Test
- public void testAbortableTransactionExceptionInFindCoordinator() {
+ public void testTransactionAbortableExceptionInFindCoordinator() {
doInitTransactions();
transactionManager.beginTransaction();
@@ -3557,19 +3557,19 @@ public class TransactionManagerTest {
prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, producerId, epoch);
runUntil(() -> !transactionManager.hasPartitionsToAdd());
- prepareFindCoordinatorResponse(Errors.ABORTABLE_TRANSACTION, false, CoordinatorType.GROUP, consumerGroupId);
+ prepareFindCoordinatorResponse(Errors.TRANSACTION_ABORTABLE, false, CoordinatorType.GROUP, consumerGroupId);
runUntil(transactionManager::hasError);
- assertTrue(transactionManager.lastError() instanceof AbortableTransactionException);
+ assertTrue(transactionManager.lastError() instanceof TransactionAbortableException);
runUntil(sendOffsetsResult::isCompleted);
assertFalse(sendOffsetsResult.isSuccessful());
- assertTrue(sendOffsetsResult.error() instanceof AbortableTransactionException);
+ assertTrue(sendOffsetsResult.error() instanceof TransactionAbortableException);
- assertAbortableError(AbortableTransactionException.class);
+ assertAbortableError(TransactionAbortableException.class);
}
@Test
- public void testAbortableTransactionExceptionInEndTxn() throws InterruptedException {
+ public void testTransactionAbortableExceptionInEndTxn() throws InterruptedException {
doInitTransactions();
transactionManager.beginTransaction();
@@ -3581,7 +3581,7 @@ public class TransactionManagerTest {
assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
prepareProduceResponse(Errors.NONE, producerId, epoch);
- prepareEndTxnResponse(Errors.ABORTABLE_TRANSACTION, TransactionResult.COMMIT, producerId, epoch);
+ prepareEndTxnResponse(Errors.TRANSACTION_ABORTABLE, TransactionResult.COMMIT, producerId, epoch);
runUntil(commitResult::isCompleted);
runUntil(responseFuture::isDone);
@@ -3590,11 +3590,11 @@ public class TransactionManagerTest {
assertFalse(commitResult.isSuccessful());
assertTrue(commitResult.isAcked());
- assertAbortableError(AbortableTransactionException.class);
+ assertAbortableError(TransactionAbortableException.class);
}
@Test
- public void testAbortableTransactionExceptionInAddOffsetsToTxn() {
+ public void testTransactionAbortableExceptionInAddOffsetsToTxn() {
final TopicPartition tp = new TopicPartition("foo", 0);
doInitTransactions();
@@ -3603,18 +3603,18 @@ public class TransactionManagerTest {
TransactionalRequestResult sendOffsetsResult = transactionManager.sendOffsetsToTransaction(
singletonMap(tp, new OffsetAndMetadata(39L)), new ConsumerGroupMetadata(consumerGroupId));
- prepareAddOffsetsToTxnResponse(Errors.ABORTABLE_TRANSACTION, consumerGroupId, producerId, epoch);
+ prepareAddOffsetsToTxnResponse(Errors.TRANSACTION_ABORTABLE, consumerGroupId, producerId, epoch);
runUntil(transactionManager::hasError);
- assertTrue(transactionManager.lastError() instanceof AbortableTransactionException);
+ assertTrue(transactionManager.lastError() instanceof TransactionAbortableException);
assertTrue(sendOffsetsResult.isCompleted());
assertFalse(sendOffsetsResult.isSuccessful());
- assertTrue(sendOffsetsResult.error() instanceof AbortableTransactionException);
+ assertTrue(sendOffsetsResult.error() instanceof TransactionAbortableException);
- assertAbortableError(AbortableTransactionException.class);
+ assertAbortableError(TransactionAbortableException.class);
}
@Test
- public void testAbortableTransactionExceptionInTxnOffsetCommit() {
+ public void testTransactionAbortableExceptionInTxnOffsetCommit() {
final TopicPartition tp = new TopicPartition("foo", 0);
doInitTransactions();
@@ -3625,14 +3625,14 @@ public class TransactionManagerTest {
prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, producerId, epoch);
prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.GROUP, consumerGroupId);
- prepareTxnOffsetCommitResponse(consumerGroupId, producerId, epoch, singletonMap(tp, Errors.ABORTABLE_TRANSACTION));
+ prepareTxnOffsetCommitResponse(consumerGroupId, producerId, epoch, singletonMap(tp, Errors.TRANSACTION_ABORTABLE));
runUntil(transactionManager::hasError);
- assertTrue(transactionManager.lastError() instanceof AbortableTransactionException);
+ assertTrue(transactionManager.lastError() instanceof TransactionAbortableException);
assertTrue(sendOffsetsResult.isCompleted());
assertFalse(sendOffsetsResult.isSuccessful());
- assertTrue(sendOffsetsResult.error() instanceof AbortableTransactionException);
- assertAbortableError(AbortableTransactionException.class);
+ assertTrue(sendOffsetsResult.error() instanceof TransactionAbortableException);
+ assertAbortableError(TransactionAbortableException.class);
}
private FutureRecordMetadata appendToAccumulator(TopicPartition tp) throws InterruptedException {
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 4d9ba7fa558..aca2e601acc 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -369,7 +369,7 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
if (txnMetadata.topicPartitions.contains(part))
(part, Errors.NONE)
else
- (part, Errors.ABORTABLE_TRANSACTION)
+ (part, Errors.TRANSACTION_ABORTABLE)
}.toMap)
}
}
diff --git a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala b/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala
index ec909580a84..3aad9b74acc 100644
--- a/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala
+++ b/core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala
@@ -45,7 +45,7 @@ object AddPartitionsToTxnManager {
/**
* This is an enum which handles the Partition Response based on the Request Version and the exact operation
* defaultError: This is the default workflow which maps to cases when the Produce Request Version or the Txn_offset_commit request was lower than the first version supporting the new Error Class
- * genericError: This maps to the case when the clients are updated to handle the AbortableTxnException
+ * genericError: This maps to the case when the clients are updated to handle the TransactionAbortableException
* addPartition: This is a WIP. To be updated as a part of KIP-890 Part 2
*/
sealed trait SupportedOperation
@@ -226,7 +226,7 @@ class AddPartitionsToTxnManager(
val code =
if (partitionResult.partitionErrorCode == Errors.PRODUCER_FENCED.code)
Errors.INVALID_PRODUCER_EPOCH.code
- else if (partitionResult.partitionErrorCode() == Errors.ABORTABLE_TRANSACTION.code && transactionDataAndCallbacks.supportedOperation != genericError) // For backward compatibility with clients.
+ else if (partitionResult.partitionErrorCode() == Errors.TRANSACTION_ABORTABLE.code && transactionDataAndCallbacks.supportedOperation != genericError) // For backward compatibility with clients.
Errors.INVALID_TXN_STATE.code
else
partitionResult.partitionErrorCode
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
index 1da8d664801..ddcaa4ca8bb 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
@@ -283,7 +283,7 @@ class TransactionCoordinatorTest {
coordinator.handleVerifyPartitionsInTransaction(transactionalId, 0L, 0, partitions, verifyPartitionsInTxnCallback)
errors.foreach { case (_, error) =>
- assertEquals(Errors.ABORTABLE_TRANSACTION, error)
+ assertEquals(Errors.TRANSACTION_ABORTABLE, error)
}
}
@@ -399,7 +399,7 @@ class TransactionCoordinatorTest {
val extraPartitions = partitions ++ Set(new TopicPartition("topic2", 0))
coordinator.handleVerifyPartitionsInTransaction(transactionalId, 0L, 0, extraPartitions, verifyPartitionsInTxnCallback)
- assertEquals(Errors.ABORTABLE_TRANSACTION, errors(new TopicPartition("topic2", 0)))
+ assertEquals(Errors.TRANSACTION_ABORTABLE, errors(new TopicPartition("topic2", 0)))
assertEquals(Errors.NONE, errors(new TopicPartition("topic1", 0)))
verify(transactionManager).getTransactionState(ArgumentMatchers.eq(transactionalId))
}
diff --git a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala
index 5ee9cf83026..a2569647092 100644
--- a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnManagerTest.scala
@@ -299,31 +299,31 @@ class AddPartitionsToTxnManagerTest {
assertEquals(expectedTransaction1Errors, transaction1Errors)
assertEquals(expectedTransaction2Errors, transaction2Errors)
- val preConvertedAbortableTransaction1Errors = topicPartitions.map(_ -> Errors.ABORTABLE_TRANSACTION).toMap
- val preConvertedAbortableTransaction2Errors = Map(new TopicPartition("foo", 1) -> Errors.NONE,
- new TopicPartition("foo", 2) -> Errors.ABORTABLE_TRANSACTION,
+ val preConvertedTransactionAbortableErrorsTxn1 = topicPartitions.map(_ -> Errors.TRANSACTION_ABORTABLE).toMap
+ val preConvertedTransactionAbortableErrorsTxn2 = Map(new TopicPartition("foo", 1) -> Errors.NONE,
+ new TopicPartition("foo", 2) -> Errors.TRANSACTION_ABORTABLE,
new TopicPartition("foo", 3) -> Errors.NONE)
- val abortableTransaction1ErrorResponse = AddPartitionsToTxnResponse.resultForTransaction(transactionalId1, preConvertedAbortableTransaction1Errors.asJava)
- val abortableTransaction2ErrorResponse = AddPartitionsToTxnResponse.resultForTransaction(transactionalId2, preConvertedAbortableTransaction2Errors.asJava)
+ val transactionAbortableErrorResponseTxn1 = AddPartitionsToTxnResponse.resultForTransaction(transactionalId1, preConvertedTransactionAbortableErrorsTxn1.asJava)
+ val transactionAbortableErrorResponseTxn2 = AddPartitionsToTxnResponse.resultForTransaction(transactionalId2, preConvertedTransactionAbortableErrorsTxn2.asJava)
val mixedErrorsAddPartitionsResponseAbortableError = new AddPartitionsToTxnResponse(new AddPartitionsToTxnResponseData()
- .setResultsByTransaction(new AddPartitionsToTxnResultCollection(Seq(abortableTransaction1ErrorResponse, abortableTransaction2ErrorResponse).iterator.asJava)))
+ .setResultsByTransaction(new AddPartitionsToTxnResultCollection(Seq(transactionAbortableErrorResponseTxn1, transactionAbortableErrorResponseTxn2).iterator.asJava)))
val mixedAbortableErrorsResponse = clientResponse(mixedErrorsAddPartitionsResponseAbortableError)
- val expectedAbortableTransaction1ErrorsLowerVersion = topicPartitions.map(_ -> Errors.INVALID_TXN_STATE).toMap
- val expectedAbortableTransaction2ErrorsLowerVersion = Map(new TopicPartition("foo", 2) -> Errors.INVALID_TXN_STATE)
+ val expectedTransactionAbortableErrorsTxn1LowerVersion = topicPartitions.map(_ -> Errors.INVALID_TXN_STATE).toMap
+ val expectedTransactionAbortableErrorsTxn2LowerVersion = Map(new TopicPartition("foo", 2) -> Errors.INVALID_TXN_STATE)
- val expectedAbortableTransaction1ErrorsHigherVersion = topicPartitions.map(_ -> Errors.ABORTABLE_TRANSACTION).toMap
- val expectedAbortableTransaction2ErrorsHigherVersion = Map(new TopicPartition("foo", 2) -> Errors.ABORTABLE_TRANSACTION)
+ val expectedTransactionAbortableErrorsTxn1HigherVersion = topicPartitions.map(_ -> Errors.TRANSACTION_ABORTABLE).toMap
+ val expectedTransactionAbortableErrorsTxn2HigherVersion = Map(new TopicPartition("foo", 2) -> Errors.TRANSACTION_ABORTABLE)
addTransactionsToVerifyRequestVersion(defaultError)
receiveResponse(mixedAbortableErrorsResponse)
- assertEquals(expectedAbortableTransaction1ErrorsLowerVersion, transaction1Errors)
- assertEquals(expectedAbortableTransaction2ErrorsLowerVersion, transaction2Errors)
+ assertEquals(expectedTransactionAbortableErrorsTxn1LowerVersion, transaction1Errors)
+ assertEquals(expectedTransactionAbortableErrorsTxn2LowerVersion, transaction2Errors)
addTransactionsToVerifyRequestVersion(genericError)
receiveResponse(mixedAbortableErrorsResponse)
- assertEquals(expectedAbortableTransaction1ErrorsHigherVersion, transaction1Errors)
- assertEquals(expectedAbortableTransaction2ErrorsHigherVersion, transaction2Errors)
+ assertEquals(expectedTransactionAbortableErrorsTxn1HigherVersion, transaction1Errors)
+ assertEquals(expectedTransactionAbortableErrorsTxn2HigherVersion, transaction2Errors)
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestServerTest.scala b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestServerTest.scala
index bc3da1852ab..6daa5e80b3c 100644
--- a/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestServerTest.scala
@@ -163,7 +163,7 @@ class AddPartitionsToTxnRequestServerTest extends BaseRequestTest {
val verifyErrors = verifyResponse.errors()
- assertEquals(Collections.singletonMap(transactionalId, Collections.singletonMap(tp0, Errors.ABORTABLE_TRANSACTION)), verifyErrors)
+ assertEquals(Collections.singletonMap(transactionalId, Collections.singletonMap(tp0, Errors.TRANSACTION_ABORTABLE)), verifyErrors)
}
private def setUpTransactions(transactionalId: String, verifyOnly: Boolean, partitions: Set[TopicPartition]): (Int, AddPartitionsToTxnTransaction) = {