You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/06/12 22:05:46 UTC
kafka git commit: KAFKA-5427;
Transactional producer should allow FindCoordinator in error state
Repository: kafka
Updated Branches:
refs/heads/trunk 2724053ad -> 43e935a63
KAFKA-5427; Transactional producer should allow FindCoordinator in error state
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ismael Juma <is...@juma.me.uk>, Apurva Mehta <ap...@confluent.io>, Guozhang Wang <wa...@gmail.com>
Closes #3297 from hachikuji/KAFKA-5427
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/43e935a6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/43e935a6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/43e935a6
Branch: refs/heads/trunk
Commit: 43e935a630eb0a7fa64c5a1a38bfee17f9b724dc
Parents: 2724053
Author: Jason Gustafson <ja...@confluent.io>
Authored: Mon Jun 12 15:04:05 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Mon Jun 12 15:04:05 2017 -0700
----------------------------------------------------------------------
.../producer/internals/TransactionManager.java | 32 +++--
.../internals/TransactionManagerTest.java | 124 ++++++++++++++++++-
2 files changed, 133 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/43e935a6/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 821c56b..a26c3b7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -97,8 +97,7 @@ public class TransactionManager {
case INITIALIZING:
return source == UNINITIALIZED;
case READY:
- return source == INITIALIZING || source == COMMITTING_TRANSACTION
- || source == ABORTING_TRANSACTION || source == ABORTABLE_ERROR;
+ return source == INITIALIZING || source == COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION;
case IN_TRANSACTION:
return source == READY;
case COMMITTING_TRANSACTION:
@@ -106,8 +105,7 @@ public class TransactionManager {
case ABORTING_TRANSACTION:
return source == IN_TRANSACTION || source == ABORTABLE_ERROR;
case ABORTABLE_ERROR:
- return source == IN_TRANSACTION || source == COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION
- || source == ABORTABLE_ERROR;
+ return source == IN_TRANSACTION || source == COMMITTING_TRANSACTION || source == ABORTABLE_ERROR;
case FATAL_ERROR:
default:
// We can transition to FATAL_ERROR unconditionally.
@@ -179,7 +177,7 @@ public class TransactionManager {
ensureTransactional();
maybeFailWithError();
transitionTo(State.COMMITTING_TRANSACTION);
- return beginCompletingTransaction(true);
+ return beginCompletingTransaction(TransactionResult.COMMIT);
}
public synchronized TransactionalRequestResult beginAbortingTransaction() {
@@ -190,14 +188,12 @@ public class TransactionManager {
// We're aborting the transaction, so there should be no need to add new partitions
newPartitionsInTransaction.clear();
- return beginCompletingTransaction(false);
+ return beginCompletingTransaction(TransactionResult.ABORT);
}
- private TransactionalRequestResult beginCompletingTransaction(boolean isCommit) {
+ private TransactionalRequestResult beginCompletingTransaction(TransactionResult transactionResult) {
if (!newPartitionsInTransaction.isEmpty())
enqueueRequest(addPartitionsToTransactionHandler());
-
- TransactionResult transactionResult = isCommit ? TransactionResult.COMMIT : TransactionResult.ABORT;
EndTxnRequest.Builder builder = new EndTxnRequest.Builder(transactionalId, producerIdAndEpoch.producerId,
producerIdAndEpoch.epoch, transactionResult);
EndTxnHandler handler = new EndTxnHandler(builder);
@@ -225,7 +221,7 @@ public class TransactionManager {
if (currentState != State.IN_TRANSACTION)
throw new IllegalStateException("Cannot add partitions to a transaction in state " + currentState);
- if (partitionsInTransaction.contains(topicPartition) || pendingPartitionsInTransaction.contains(topicPartition))
+ if (isPartitionAdded(topicPartition) || isPartitionPendingAdd(topicPartition))
return;
log.debug("{}Begin adding new partition {} to transaction", logPrefix, topicPartition);
@@ -286,6 +282,11 @@ public class TransactionManager {
}
synchronized void transitionToAbortableError(RuntimeException exception) {
+ if (currentState == State.ABORTING_TRANSACTION) {
+ log.debug("Skipping transition to abortable error state since the transaction is already being " +
+ "aborted. Underlying exception: ", exception);
+ return;
+ }
transitionTo(State.ABORTABLE_ERROR, exception);
}
@@ -504,13 +505,10 @@ public class TransactionManager {
private boolean maybeTerminateRequestWithError(TxnRequestHandler requestHandler) {
if (hasError()) {
- if (requestHandler instanceof EndTxnHandler) {
- // we allow abort requests to break out of the error state. The state and the last error
- // will be cleared when the request returns
- EndTxnHandler endTxnHandler = (EndTxnHandler) requestHandler;
- if (endTxnHandler.builder.result() == TransactionResult.ABORT)
- return false;
- }
+ if (hasAbortableError() && requestHandler instanceof FindCoordinatorHandler)
+ // No harm letting the FindCoordinator request go through if we're expecting to abort
+ return false;
+
requestHandler.fail(lastError);
return true;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/43e935a6/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 8d5dbe9..c4abd3c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -80,6 +80,7 @@ import static java.util.Collections.singletonMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -1059,6 +1060,71 @@ public class TransactionManagerTest {
}
@Test
+ public void testAbortableErrorWhileAbortInProgress() throws InterruptedException {
+ final long pid = 13131L;
+ final short epoch = 1;
+
+ doInitTransactions(pid, epoch);
+
+ transactionManager.beginTransaction();
+ transactionManager.maybeAddPartitionToTransaction(tp0);
+
+ Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
+
+ assertFalse(responseFuture.isDone());
+ prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
+
+ sender.run(time.milliseconds()); // Send AddPartitionsRequest
+ sender.run(time.milliseconds()); // Send Produce Request
+
+ TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
+ assertTrue(transactionManager.isAborting());
+ assertFalse(transactionManager.hasError());
+
+ sendProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, pid, epoch);
+ prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
+ sender.run(time.milliseconds()); // receive the produce response
+
+ // we do not transition to ABORTABLE_ERROR since we were already aborting
+ assertTrue(transactionManager.isAborting());
+ assertFalse(transactionManager.hasError());
+
+ sender.run(time.milliseconds()); // handle the abort
+ assertTrue(abortResult.isCompleted());
+ assertTrue(abortResult.isSuccessful());
+ assertTrue(transactionManager.isReady()); // make sure we are ready for a transaction now.
+ }
+
+ @Test
+ public void testFindCoordinatorAllowedInAbortableErrorState() throws InterruptedException {
+ final long pid = 13131L;
+ final short epoch = 1;
+
+ doInitTransactions(pid, epoch);
+
+ transactionManager.beginTransaction();
+ transactionManager.maybeAddPartitionToTransaction(tp0);
+
+ Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
+
+ assertFalse(responseFuture.isDone());
+ sender.run(time.milliseconds()); // Send AddPartitionsRequest
+
+ transactionManager.transitionToAbortableError(new KafkaException());
+ sendAddPartitionsToTxnResponse(Errors.NOT_COORDINATOR, tp0, epoch, pid);
+ sender.run(time.milliseconds()); // AddPartitions returns
+ assertTrue(transactionManager.hasAbortableError());
+
+ assertNull(transactionManager.coordinator(CoordinatorType.TRANSACTION));
+ prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
+ sender.run(time.milliseconds()); // FindCoordinator handled
+ assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
+ assertTrue(transactionManager.hasAbortableError());
+ }
+
+ @Test
public void testCancelUnsentAddPartitionsAndProduceOnAbort() throws InterruptedException {
final long pid = 13131L;
final short epoch = 1;
@@ -1279,16 +1345,43 @@ public class TransactionManagerTest {
TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
- prepareAddOffsetsToTxnResponse(Errors.TOPIC_AUTHORIZATION_FAILED, consumerGroupId, pid, epoch);
+ prepareAddOffsetsToTxnResponse(Errors.GROUP_AUTHORIZATION_FAILED, consumerGroupId, pid, epoch);
sender.run(time.milliseconds()); // Send AddOffsetsToTxnRequest
assertFalse(abortResult.isCompleted());
sender.run(time.milliseconds());
+ assertTrue(transactionManager.isReady());
assertTrue(abortResult.isCompleted());
assertTrue(abortResult.isSuccessful());
}
@Test
+ public void shouldFailAbortIfAddOffsetsFailsWithFatalError() throws Exception {
+ final long pid = 13131L;
+ final short epoch = 1;
+
+ doInitTransactions(pid, epoch);
+
+ transactionManager.beginTransaction();
+ Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+ offsets.put(tp1, new OffsetAndMetadata(1));
+ final String consumerGroupId = "myconsumergroup";
+
+ transactionManager.sendOffsetsToTransaction(offsets, consumerGroupId);
+
+ TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
+
+ prepareAddOffsetsToTxnResponse(Errors.UNKNOWN, consumerGroupId, pid, epoch);
+ sender.run(time.milliseconds()); // Send AddOffsetsToTxnRequest
+ assertFalse(abortResult.isCompleted());
+
+ sender.run(time.milliseconds());
+ assertTrue(abortResult.isCompleted());
+ assertFalse(abortResult.isSuccessful());
+ assertTrue(transactionManager.hasFatalError());
+ }
+
+ @Test
public void testNoDrainWhenPartitionsPending() throws InterruptedException {
final long pid = 13131L;
final short epoch = 1;
@@ -1623,8 +1716,15 @@ public class TransactionManagerTest {
}, new InitProducerIdResponse(0, error, pid, epoch), shouldDisconnect);
}
+ private void sendProduceResponse(Errors error, final long pid, final short epoch) {
+ client.respond(produceRequestMatcher(pid, epoch), produceResponse(tp0, 0, error, 0));
+ }
+
private void prepareProduceResponse(Errors error, final long pid, final short epoch) {
- client.prepareResponse(new MockClient.RequestMatcher() {
+ client.prepareResponse(produceRequestMatcher(pid, epoch), produceResponse(tp0, 0, error, 0));
+ }
+ private MockClient.RequestMatcher produceRequestMatcher(final long pid, final short epoch) {
+ return new MockClient.RequestMatcher() {
@Override
public boolean matches(AbstractRequest body) {
ProduceRequest produceRequest = (ProduceRequest) body;
@@ -1640,12 +1740,24 @@ public class TransactionManagerTest {
assertEquals(transactionalId, produceRequest.transactionalId());
return true;
}
- }, produceResponse(tp0, 0, error, 0));
+ };
+ }
+ private void prepareAddPartitionsToTxnResponse(Errors error, final TopicPartition topicPartition,
+ final short epoch, final long pid) {
+ client.prepareResponse(addPartitionsRequestMatcher(topicPartition, epoch, pid),
+ new AddPartitionsToTxnResponse(0, singletonMap(topicPartition, error)));
}
- private void prepareAddPartitionsToTxnResponse(Errors error, final TopicPartition topicPartition, final short epoch, final long pid) {
- client.prepareResponse(new MockClient.RequestMatcher() {
+ private void sendAddPartitionsToTxnResponse(Errors error, final TopicPartition topicPartition,
+ final short epoch, final long pid) {
+ client.respond(addPartitionsRequestMatcher(topicPartition, epoch, pid),
+ new AddPartitionsToTxnResponse(0, singletonMap(topicPartition, error)));
+ }
+
+ private MockClient.RequestMatcher addPartitionsRequestMatcher(final TopicPartition topicPartition,
+ final short epoch, final long pid) {
+ return new MockClient.RequestMatcher() {
@Override
public boolean matches(AbstractRequest body) {
AddPartitionsToTxnRequest addPartitionsToTxnRequest = (AddPartitionsToTxnRequest) body;
@@ -1655,7 +1767,7 @@ public class TransactionManagerTest {
assertEquals(transactionalId, addPartitionsToTxnRequest.transactionalId());
return true;
}
- }, new AddPartitionsToTxnResponse(0, singletonMap(topicPartition, error)));
+ };
}
private void prepareEndTxnResponse(Errors error, final TransactionResult result, final long pid, final short epoch) {