You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/06/12 22:05:46 UTC

kafka git commit: KAFKA-5427; Transactional producer should allow FindCoordinator in error state

Repository: kafka
Updated Branches:
  refs/heads/trunk 2724053ad -> 43e935a63


KAFKA-5427; Transactional producer should allow FindCoordinator in error state

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>, Apurva Mehta <ap...@confluent.io>, Guozhang Wang <wa...@gmail.com>

Closes #3297 from hachikuji/KAFKA-5427


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/43e935a6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/43e935a6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/43e935a6

Branch: refs/heads/trunk
Commit: 43e935a630eb0a7fa64c5a1a38bfee17f9b724dc
Parents: 2724053
Author: Jason Gustafson <ja...@confluent.io>
Authored: Mon Jun 12 15:04:05 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Mon Jun 12 15:04:05 2017 -0700

----------------------------------------------------------------------
 .../producer/internals/TransactionManager.java  |  32 +++--
 .../internals/TransactionManagerTest.java       | 124 ++++++++++++++++++-
 2 files changed, 133 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/43e935a6/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 821c56b..a26c3b7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -97,8 +97,7 @@ public class TransactionManager {
                 case INITIALIZING:
                     return source == UNINITIALIZED;
                 case READY:
-                    return source == INITIALIZING || source == COMMITTING_TRANSACTION
-                            || source == ABORTING_TRANSACTION || source == ABORTABLE_ERROR;
+                    return source == INITIALIZING || source == COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION;
                 case IN_TRANSACTION:
                     return source == READY;
                 case COMMITTING_TRANSACTION:
@@ -106,8 +105,7 @@ public class TransactionManager {
                 case ABORTING_TRANSACTION:
                     return source == IN_TRANSACTION || source == ABORTABLE_ERROR;
                 case ABORTABLE_ERROR:
-                    return source == IN_TRANSACTION || source == COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION
-                            || source == ABORTABLE_ERROR;
+                    return source == IN_TRANSACTION || source == COMMITTING_TRANSACTION || source == ABORTABLE_ERROR;
                 case FATAL_ERROR:
                 default:
                     // We can transition to FATAL_ERROR unconditionally.
@@ -179,7 +177,7 @@ public class TransactionManager {
         ensureTransactional();
         maybeFailWithError();
         transitionTo(State.COMMITTING_TRANSACTION);
-        return beginCompletingTransaction(true);
+        return beginCompletingTransaction(TransactionResult.COMMIT);
     }
 
     public synchronized TransactionalRequestResult beginAbortingTransaction() {
@@ -190,14 +188,12 @@ public class TransactionManager {
 
         // We're aborting the transaction, so there should be no need to add new partitions
         newPartitionsInTransaction.clear();
-        return beginCompletingTransaction(false);
+        return beginCompletingTransaction(TransactionResult.ABORT);
     }
 
-    private TransactionalRequestResult beginCompletingTransaction(boolean isCommit) {
+    private TransactionalRequestResult beginCompletingTransaction(TransactionResult transactionResult) {
         if (!newPartitionsInTransaction.isEmpty())
             enqueueRequest(addPartitionsToTransactionHandler());
-
-        TransactionResult transactionResult = isCommit ? TransactionResult.COMMIT : TransactionResult.ABORT;
         EndTxnRequest.Builder builder = new EndTxnRequest.Builder(transactionalId, producerIdAndEpoch.producerId,
                 producerIdAndEpoch.epoch, transactionResult);
         EndTxnHandler handler = new EndTxnHandler(builder);
@@ -225,7 +221,7 @@ public class TransactionManager {
         if (currentState != State.IN_TRANSACTION)
             throw new IllegalStateException("Cannot add partitions to a transaction in state " + currentState);
 
-        if (partitionsInTransaction.contains(topicPartition) || pendingPartitionsInTransaction.contains(topicPartition))
+        if (isPartitionAdded(topicPartition) || isPartitionPendingAdd(topicPartition))
             return;
 
         log.debug("{}Begin adding new partition {} to transaction", logPrefix, topicPartition);
@@ -286,6 +282,11 @@ public class TransactionManager {
     }
 
     synchronized void transitionToAbortableError(RuntimeException exception) {
+        if (currentState == State.ABORTING_TRANSACTION) {
+            log.debug("Skipping transition to abortable error state since the transaction is already being " +
+                    "aborted. Underlying exception: ", exception);
+            return;
+        }
         transitionTo(State.ABORTABLE_ERROR, exception);
     }
 
@@ -504,13 +505,10 @@ public class TransactionManager {
 
     private boolean maybeTerminateRequestWithError(TxnRequestHandler requestHandler) {
         if (hasError()) {
-            if (requestHandler instanceof EndTxnHandler) {
-                // we allow abort requests to break out of the error state. The state and the last error
-                // will be cleared when the request returns
-                EndTxnHandler endTxnHandler = (EndTxnHandler) requestHandler;
-                if (endTxnHandler.builder.result() == TransactionResult.ABORT)
-                    return false;
-            }
+            if (hasAbortableError() && requestHandler instanceof FindCoordinatorHandler)
+                // No harm letting the FindCoordinator request go through if we're expecting to abort
+                return false;
+
             requestHandler.fail(lastError);
             return true;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/43e935a6/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 8d5dbe9..c4abd3c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -80,6 +80,7 @@ import static java.util.Collections.singletonMap;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -1059,6 +1060,71 @@ public class TransactionManagerTest {
     }
 
     @Test
+    public void testAbortableErrorWhileAbortInProgress() throws InterruptedException {
+        final long pid = 13131L;
+        final short epoch = 1;
+
+        doInitTransactions(pid, epoch);
+
+        transactionManager.beginTransaction();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+
+        Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
+
+        assertFalse(responseFuture.isDone());
+        prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
+
+        sender.run(time.milliseconds());  // Send AddPartitionsRequest
+        sender.run(time.milliseconds());  // Send Produce Request
+
+        TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
+        assertTrue(transactionManager.isAborting());
+        assertFalse(transactionManager.hasError());
+
+        sendProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, pid, epoch);
+        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
+        sender.run(time.milliseconds());  // receive the produce response
+
+        // we do not transition to ABORTABLE_ERROR since we were already aborting
+        assertTrue(transactionManager.isAborting());
+        assertFalse(transactionManager.hasError());
+
+        sender.run(time.milliseconds());  // handle the abort
+        assertTrue(abortResult.isCompleted());
+        assertTrue(abortResult.isSuccessful());
+        assertTrue(transactionManager.isReady());  // make sure we are ready for a transaction now.
+    }
+
+    @Test
+    public void testFindCoordinatorAllowedInAbortableErrorState() throws InterruptedException {
+        final long pid = 13131L;
+        final short epoch = 1;
+
+        doInitTransactions(pid, epoch);
+
+        transactionManager.beginTransaction();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+
+        Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
+                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
+
+        assertFalse(responseFuture.isDone());
+        sender.run(time.milliseconds());  // Send AddPartitionsRequest
+
+        transactionManager.transitionToAbortableError(new KafkaException());
+        sendAddPartitionsToTxnResponse(Errors.NOT_COORDINATOR, tp0, epoch, pid);
+        sender.run(time.milliseconds()); // AddPartitions returns
+        assertTrue(transactionManager.hasAbortableError());
+
+        assertNull(transactionManager.coordinator(CoordinatorType.TRANSACTION));
+        prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId);
+        sender.run(time.milliseconds()); // FindCoordinator handled
+        assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION));
+        assertTrue(transactionManager.hasAbortableError());
+    }
+
+    @Test
     public void testCancelUnsentAddPartitionsAndProduceOnAbort() throws InterruptedException {
         final long pid = 13131L;
         final short epoch = 1;
@@ -1279,16 +1345,43 @@ public class TransactionManagerTest {
 
         TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
 
-        prepareAddOffsetsToTxnResponse(Errors.TOPIC_AUTHORIZATION_FAILED, consumerGroupId, pid, epoch);
+        prepareAddOffsetsToTxnResponse(Errors.GROUP_AUTHORIZATION_FAILED, consumerGroupId, pid, epoch);
         sender.run(time.milliseconds());  // Send AddOffsetsToTxnRequest
         assertFalse(abortResult.isCompleted());
 
         sender.run(time.milliseconds());
+        assertTrue(transactionManager.isReady());
         assertTrue(abortResult.isCompleted());
         assertTrue(abortResult.isSuccessful());
     }
 
     @Test
+    public void shouldFailAbortIfAddOffsetsFailsWithFatalError() throws Exception {
+        final long pid = 13131L;
+        final short epoch = 1;
+
+        doInitTransactions(pid, epoch);
+
+        transactionManager.beginTransaction();
+        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        offsets.put(tp1, new OffsetAndMetadata(1));
+        final String consumerGroupId = "myconsumergroup";
+
+        transactionManager.sendOffsetsToTransaction(offsets, consumerGroupId);
+
+        TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
+
+        prepareAddOffsetsToTxnResponse(Errors.UNKNOWN, consumerGroupId, pid, epoch);
+        sender.run(time.milliseconds());  // Send AddOffsetsToTxnRequest
+        assertFalse(abortResult.isCompleted());
+
+        sender.run(time.milliseconds());
+        assertTrue(abortResult.isCompleted());
+        assertFalse(abortResult.isSuccessful());
+        assertTrue(transactionManager.hasFatalError());
+    }
+
+    @Test
     public void testNoDrainWhenPartitionsPending() throws InterruptedException {
         final long pid = 13131L;
         final short epoch = 1;
@@ -1623,8 +1716,15 @@ public class TransactionManagerTest {
         }, new InitProducerIdResponse(0, error, pid, epoch), shouldDisconnect);
     }
 
+    private void sendProduceResponse(Errors error, final long pid, final short epoch) {
+        client.respond(produceRequestMatcher(pid, epoch), produceResponse(tp0, 0, error, 0));
+    }
+
     private void prepareProduceResponse(Errors error, final long pid, final short epoch) {
-        client.prepareResponse(new MockClient.RequestMatcher() {
+        client.prepareResponse(produceRequestMatcher(pid, epoch), produceResponse(tp0, 0, error, 0));
+    }
+    private MockClient.RequestMatcher produceRequestMatcher(final long pid, final short epoch) {
+        return new MockClient.RequestMatcher() {
             @Override
             public boolean matches(AbstractRequest body) {
                 ProduceRequest produceRequest = (ProduceRequest) body;
@@ -1640,12 +1740,24 @@ public class TransactionManagerTest {
                 assertEquals(transactionalId, produceRequest.transactionalId());
                 return true;
             }
-        }, produceResponse(tp0, 0, error, 0));
+        };
+    }
 
+    private void prepareAddPartitionsToTxnResponse(Errors error, final TopicPartition topicPartition,
+                                                   final short epoch, final long pid) {
+        client.prepareResponse(addPartitionsRequestMatcher(topicPartition, epoch, pid),
+                new AddPartitionsToTxnResponse(0, singletonMap(topicPartition, error)));
     }
 
-    private void prepareAddPartitionsToTxnResponse(Errors error, final TopicPartition topicPartition, final short epoch, final long pid) {
-        client.prepareResponse(new MockClient.RequestMatcher() {
+    private void sendAddPartitionsToTxnResponse(Errors error, final TopicPartition topicPartition,
+                                                final short epoch, final long pid) {
+        client.respond(addPartitionsRequestMatcher(topicPartition, epoch, pid),
+                new AddPartitionsToTxnResponse(0, singletonMap(topicPartition, error)));
+    }
+
+    private MockClient.RequestMatcher addPartitionsRequestMatcher(final TopicPartition topicPartition,
+                                                                  final short epoch, final long pid) {
+        return new MockClient.RequestMatcher() {
             @Override
             public boolean matches(AbstractRequest body) {
                 AddPartitionsToTxnRequest addPartitionsToTxnRequest = (AddPartitionsToTxnRequest) body;
@@ -1655,7 +1767,7 @@ public class TransactionManagerTest {
                 assertEquals(transactionalId, addPartitionsToTxnRequest.transactionalId());
                 return true;
             }
-        }, new AddPartitionsToTxnResponse(0, singletonMap(topicPartition, error)));
+        };
     }
 
     private void prepareEndTxnResponse(Errors error, final TransactionResult result, final long pid, final short epoch) {