You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/06/06 12:59:18 UTC
kafka git commit: KAFKA-5364;
ensurePartitionAdded does not handle pending partitions in abortable
error state [Forced Update!]
Repository: kafka
Updated Branches:
refs/heads/trunk 3eed19458 -> cb78ba129 (forced update)
KAFKA-5364; ensurePartitionAdded does not handle pending partitions in abortable error state
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Apurva Mehta <ap...@confluent.io>, Guozhang Wang <wa...@gmail.com>, Ismael Juma <is...@juma.me.uk>
Closes #3231 from hachikuji/KAFKA-5364
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cb78ba12
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cb78ba12
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cb78ba12
Branch: refs/heads/trunk
Commit: cb78ba1294d1a27b2c4d842a125486ffec593d98
Parents: 313f8d7
Author: Jason Gustafson <ja...@confluent.io>
Authored: Tue Jun 6 13:57:20 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue Jun 6 13:58:51 2017 +0100
----------------------------------------------------------------------
.../kafka/clients/producer/KafkaProducer.java | 16 +-
.../producer/internals/RecordAccumulator.java | 4 +-
.../producer/internals/TransactionManager.java | 78 +++--
.../internals/RecordAccumulatorTest.java | 1 -
.../clients/producer/internals/SenderTest.java | 4 +-
.../internals/TransactionManagerTest.java | 342 ++++++++++++++++++-
6 files changed, 381 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/cb78ba12/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 1d16721..0f109d8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -608,7 +608,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
*/
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
if (transactionManager != null)
- ensureProperTransactionalState();
+ transactionManager.failIfUnreadyForSend();
TopicPartition tp = null;
try {
@@ -691,20 +691,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
}
}
- private void ensureProperTransactionalState() {
- if (transactionManager.isTransactional() && !transactionManager.hasProducerId())
- throw new IllegalStateException("Cannot perform a 'send' before completing a call to initTransactions " +
- "when transactions are enabled.");
-
- if (transactionManager.hasError()) {
- Exception lastError = transactionManager.lastError();
- throw new KafkaException("Cannot perform send because at least one previous transactional or " +
- "idempotent request has failed with errors.", lastError);
- }
- if (transactionManager.isCompletingTransaction())
- throw new IllegalStateException("Cannot call send while a commit or abort is in progress.");
- }
-
private void setReadOnly(Headers headers) {
if (headers instanceof RecordHeaders) {
((RecordHeaders) headers).setReadOnly();
http://git-wip-us.apache.org/repos/asf/kafka/blob/cb78ba12/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 2c4917d..0315b13 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -469,7 +469,7 @@ public final class RecordAccumulator {
ProducerIdAndEpoch producerIdAndEpoch = null;
boolean isTransactional = false;
if (transactionManager != null) {
- if (!transactionManager.ensurePartitionAdded(tp))
+ if (!transactionManager.sendToPartitionAllowed(tp))
break;
producerIdAndEpoch = transactionManager.producerIdAndEpoch();
@@ -477,7 +477,7 @@ public final class RecordAccumulator {
// we cannot send the batch until we have refreshed the producer id
break;
- isTransactional = transactionManager.isInTransaction();
+ isTransactional = transactionManager.isTransactional();
}
ProducerBatch batch = deque.pollFirst();
http://git-wip-us.apache.org/repos/asf/kafka/blob/cb78ba12/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index c081b23..2842cd1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -205,7 +205,7 @@ public class TransactionManager {
}
public synchronized TransactionalRequestResult sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
- String consumerGroupId) {
+ String consumerGroupId) {
ensureTransactional();
maybeFailWithError();
if (currentState != State.IN_TRANSACTION)
@@ -221,34 +221,39 @@ public class TransactionManager {
}
public synchronized void maybeAddPartitionToTransaction(TopicPartition topicPartition) {
- if (!isInTransaction())
- throw new IllegalArgumentException("Cannot add partitions to a transaction in state " + currentState);
+ if (currentState != State.IN_TRANSACTION)
+ throw new IllegalStateException("Cannot add partitions to a transaction in state " + currentState);
- if (partitionsInTransaction.contains(topicPartition))
+ if (partitionsInTransaction.contains(topicPartition) || pendingPartitionsInTransaction.contains(topicPartition))
return;
log.debug("{}Begin adding new partition {} to transaction", logPrefix, topicPartition);
newPartitionsInTransaction.add(topicPartition);
}
- public RuntimeException lastError() {
+ RuntimeException lastError() {
return lastError;
}
- public synchronized boolean ensurePartitionAdded(TopicPartition tp) {
+ public synchronized void failIfUnreadyForSend() {
+ if (hasError())
+ throw new KafkaException("Cannot perform send because at least one previous transactional or " +
+ "idempotent request has failed with errors.", lastError);
+
+ if (isTransactional()) {
+ if (!hasProducerId())
+ throw new IllegalStateException("Cannot perform a 'send' before completing a call to initTransactions " +
+ "when transactions are enabled.");
+
+ if (currentState != State.IN_TRANSACTION)
+ throw new IllegalStateException("Cannot call send in state " + currentState);
+ }
+ }
+
+ synchronized boolean sendToPartitionAllowed(TopicPartition tp) {
if (hasFatalError())
return false;
- if (isInTransaction() || hasAbortableError()) {
- // We should enter this branch in an error state because if this partition is already in the transaction,
- // there is a chance that the corresponding batch is in retry. So we must let it completely flush.
- if (!(partitionsInTransaction.contains(tp) || isPartitionPending(tp))) {
- transitionToFatalError(new IllegalStateException("Attempted to dequeue a record batch to send " +
- "for partition " + tp + ", which would never be added to the transaction."));
- return false;
- }
- return partitionsInTransaction.contains(tp);
- }
- return true;
+ return !isTransactional() || partitionsInTransaction.contains(tp);
}
public String transactionalId() {
@@ -263,26 +268,22 @@ public class TransactionManager {
return transactionalId != null;
}
- public synchronized boolean hasPartitionsToAdd() {
+ synchronized boolean hasPartitionsToAdd() {
return !newPartitionsInTransaction.isEmpty() || !pendingPartitionsInTransaction.isEmpty();
}
- public synchronized boolean isCompletingTransaction() {
+ synchronized boolean isCompletingTransaction() {
return currentState == State.COMMITTING_TRANSACTION || currentState == State.ABORTING_TRANSACTION;
}
- public synchronized boolean hasError() {
+ synchronized boolean hasError() {
return currentState == State.ABORTABLE_ERROR || currentState == State.FATAL_ERROR;
}
- public synchronized boolean isAborting() {
+ synchronized boolean isAborting() {
return currentState == State.ABORTING_TRANSACTION;
}
- synchronized boolean isInTransaction() {
- return currentState == State.IN_TRANSACTION || isCompletingTransaction();
- }
-
synchronized void transitionToAbortableError(RuntimeException exception) {
transitionTo(State.ABORTABLE_ERROR, exception);
}
@@ -291,6 +292,16 @@ public class TransactionManager {
transitionTo(State.FATAL_ERROR, exception);
}
+ // visible for testing
+ synchronized boolean isPartitionAdded(TopicPartition partition) {
+ return partitionsInTransaction.contains(partition);
+ }
+
+ // visible for testing
+ synchronized boolean isPartitionPendingAdd(TopicPartition partition) {
+ return newPartitionsInTransaction.contains(partition) || pendingPartitionsInTransaction.contains(partition);
+ }
+
/**
* Get the current producer id and epoch without blocking. Callers must use {@link ProducerIdAndEpoch#isValid()} to
* verify that the result is valid.
@@ -437,21 +448,23 @@ public class TransactionManager {
// visible for testing
synchronized boolean transactionContainsPartition(TopicPartition topicPartition) {
- return isInTransaction() && partitionsInTransaction.contains(topicPartition);
+ return partitionsInTransaction.contains(topicPartition);
}
// visible for testing
synchronized boolean hasPendingOffsetCommits() {
- return isInTransaction() && !pendingTxnOffsetCommits.isEmpty();
+ return !pendingTxnOffsetCommits.isEmpty();
}
// visible for testing
- synchronized boolean isReady() {
- return isTransactional() && currentState == State.READY;
+ synchronized boolean hasOngoingTransaction() {
+ // transactions are considered ongoing once started until completion or a fatal error
+ return currentState == State.IN_TRANSACTION || isCompletingTransaction() || hasAbortableError();
}
- private synchronized boolean isPartitionPending(TopicPartition tp) {
- return isInTransaction() && (pendingPartitionsInTransaction.contains(tp) || newPartitionsInTransaction.contains(tp));
+ // visible for testing
+ synchronized boolean isReady() {
+ return isTransactional() && currentState == State.READY;
}
private void transitionTo(State target) {
@@ -472,8 +485,7 @@ public class TransactionManager {
}
if (lastError != null)
- log.error("{}Transition from state {} to error state {}", logPrefix, currentState,
- target, lastError);
+ log.debug("{}Transition from state {} to error state {}", logPrefix, currentState, target, lastError);
else
log.debug("{}Transition from state {} to {}", logPrefix, currentState, target);
http://git-wip-us.apache.org/repos/asf/kafka/blob/cb78ba12/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 2875ba2..7b9f26b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -394,7 +394,6 @@ public class RecordAccumulatorTest {
accum.abortIncompleteBatches();
assertEquals(numExceptionReceivedInCallback.get(), 100);
assertFalse(accum.hasUnsent());
-
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/cb78ba12/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 927a937..26093d7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -624,7 +624,7 @@ public class SenderTest {
assertTrue("Client ready status should be true", client.isReady(node, 0L));
responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L));
- client.respond(produceRequestMatcher(tp, producerIdAndEpoch, 0, txnManager.isInTransaction()),
+ client.respond(produceRequestMatcher(tp, producerIdAndEpoch, 0, txnManager.isTransactional()),
new ProduceResponse(responseMap));
sender.run(time.milliseconds()); // receive
@@ -640,7 +640,7 @@ public class SenderTest {
assertTrue("Client ready status should be true", client.isReady(node, 0L));
responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.NONE, 1L, 0L));
- client.respond(produceRequestMatcher(tp, producerIdAndEpoch, 1, txnManager.isInTransaction()),
+ client.respond(produceRequestMatcher(tp, producerIdAndEpoch, 1, txnManager.isTransactional()),
new ProduceResponse(responseMap));
sender.run(time.milliseconds()); // receive
http://git-wip-us.apache.org/repos/asf/kafka/blob/cb78ba12/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 6e633ec..f661baf 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -123,6 +123,329 @@ public class TransactionManagerTest {
}
@Test(expected = IllegalStateException.class)
+ public void testFailIfUnreadyForSendNoProducerId() {
+ transactionManager.failIfUnreadyForSend();
+ }
+
+ @Test
+ public void testFailIfUnreadyForSendIdempotentProducer() {
+ TransactionManager idempotentTransactionManager = new TransactionManager();
+ idempotentTransactionManager.failIfUnreadyForSend();
+ }
+
+ @Test(expected = KafkaException.class)
+ public void testFailIfUnreadyForSendIdempotentProducerFatalError() {
+ TransactionManager idempotentTransactionManager = new TransactionManager();
+ idempotentTransactionManager.transitionToFatalError(new KafkaException());
+ idempotentTransactionManager.failIfUnreadyForSend();
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testFailIfUnreadyForSendNoOngoingTransaction() {
+ long pid = 13131L;
+ short epoch = 1;
+ doInitTransactions(pid, epoch);
+ transactionManager.failIfUnreadyForSend();
+ }
+
+ @Test(expected = KafkaException.class)
+ public void testFailIfUnreadyForSendAfterAbortableError() {
+ long pid = 13131L;
+ short epoch = 1;
+ doInitTransactions(pid, epoch);
+ transactionManager.beginTransaction();
+ transactionManager.transitionToAbortableError(new KafkaException());
+ transactionManager.failIfUnreadyForSend();
+ }
+
+ @Test(expected = KafkaException.class)
+ public void testFailIfUnreadyForSendAfterFatalError() {
+ long pid = 13131L;
+ short epoch = 1;
+ doInitTransactions(pid, epoch);
+ transactionManager.transitionToFatalError(new KafkaException());
+ transactionManager.failIfUnreadyForSend();
+ }
+
+ @Test
+ public void testHasOngoingTransactionSuccessfulAbort() {
+ long pid = 13131L;
+ short epoch = 1;
+ TopicPartition partition = new TopicPartition("foo", 0);
+
+ assertFalse(transactionManager.hasOngoingTransaction());
+ doInitTransactions(pid, epoch);
+ assertFalse(transactionManager.hasOngoingTransaction());
+
+ transactionManager.beginTransaction();
+ assertTrue(transactionManager.hasOngoingTransaction());
+
+ transactionManager.maybeAddPartitionToTransaction(partition);
+ assertTrue(transactionManager.hasOngoingTransaction());
+
+ prepareAddPartitionsToTxn(partition, Errors.NONE);
+ sender.run(time.milliseconds());
+
+ transactionManager.beginAbortingTransaction();
+ assertTrue(transactionManager.hasOngoingTransaction());
+
+ prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
+ sender.run(time.milliseconds());
+ assertFalse(transactionManager.hasOngoingTransaction());
+ }
+
+ @Test
+ public void testHasOngoingTransactionSuccessfulCommit() {
+ long pid = 13131L;
+ short epoch = 1;
+ TopicPartition partition = new TopicPartition("foo", 0);
+
+ assertFalse(transactionManager.hasOngoingTransaction());
+ doInitTransactions(pid, epoch);
+ assertFalse(transactionManager.hasOngoingTransaction());
+
+ transactionManager.beginTransaction();
+ assertTrue(transactionManager.hasOngoingTransaction());
+
+ transactionManager.maybeAddPartitionToTransaction(partition);
+ assertTrue(transactionManager.hasOngoingTransaction());
+
+ prepareAddPartitionsToTxn(partition, Errors.NONE);
+ sender.run(time.milliseconds());
+
+ transactionManager.beginCommittingTransaction();
+ assertTrue(transactionManager.hasOngoingTransaction());
+
+ prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
+ sender.run(time.milliseconds());
+ assertFalse(transactionManager.hasOngoingTransaction());
+ }
+
+ @Test
+ public void testHasOngoingTransactionAbortableError() {
+ long pid = 13131L;
+ short epoch = 1;
+ TopicPartition partition = new TopicPartition("foo", 0);
+
+ assertFalse(transactionManager.hasOngoingTransaction());
+ doInitTransactions(pid, epoch);
+ assertFalse(transactionManager.hasOngoingTransaction());
+
+ transactionManager.beginTransaction();
+ assertTrue(transactionManager.hasOngoingTransaction());
+
+ transactionManager.maybeAddPartitionToTransaction(partition);
+ assertTrue(transactionManager.hasOngoingTransaction());
+
+ prepareAddPartitionsToTxn(partition, Errors.NONE);
+ sender.run(time.milliseconds());
+
+ transactionManager.transitionToAbortableError(new KafkaException());
+ assertTrue(transactionManager.hasOngoingTransaction());
+
+ transactionManager.beginAbortingTransaction();
+ assertTrue(transactionManager.hasOngoingTransaction());
+
+ prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
+ sender.run(time.milliseconds());
+ assertFalse(transactionManager.hasOngoingTransaction());
+ }
+
+ @Test
+ public void testHasOngoingTransactionFatalError() {
+ long pid = 13131L;
+ short epoch = 1;
+ TopicPartition partition = new TopicPartition("foo", 0);
+
+ assertFalse(transactionManager.hasOngoingTransaction());
+ doInitTransactions(pid, epoch);
+ assertFalse(transactionManager.hasOngoingTransaction());
+
+ transactionManager.beginTransaction();
+ assertTrue(transactionManager.hasOngoingTransaction());
+
+ transactionManager.maybeAddPartitionToTransaction(partition);
+ assertTrue(transactionManager.hasOngoingTransaction());
+
+ prepareAddPartitionsToTxn(partition, Errors.NONE);
+ sender.run(time.milliseconds());
+
+ transactionManager.transitionToFatalError(new KafkaException());
+ assertFalse(transactionManager.hasOngoingTransaction());
+ }
+
+ @Test
+ public void testMaybeAddPartitionToTransaction() {
+ long pid = 13131L;
+ short epoch = 1;
+ TopicPartition partition = new TopicPartition("foo", 0);
+ doInitTransactions(pid, epoch);
+ transactionManager.beginTransaction();
+
+ transactionManager.maybeAddPartitionToTransaction(partition);
+ assertTrue(transactionManager.hasPartitionsToAdd());
+ assertFalse(transactionManager.isPartitionAdded(partition));
+ assertTrue(transactionManager.isPartitionPendingAdd(partition));
+
+ prepareAddPartitionsToTxn(partition, Errors.NONE);
+ sender.run(time.milliseconds());
+
+ assertFalse(transactionManager.hasPartitionsToAdd());
+ assertTrue(transactionManager.isPartitionAdded(partition));
+ assertFalse(transactionManager.isPartitionPendingAdd(partition));
+
+ // adding the partition again should not have any effect
+ transactionManager.maybeAddPartitionToTransaction(partition);
+ assertFalse(transactionManager.hasPartitionsToAdd());
+ assertTrue(transactionManager.isPartitionAdded(partition));
+ assertFalse(transactionManager.isPartitionPendingAdd(partition));
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testMaybeAddPartitionToTransactionBeforeInitTransactions() {
+ transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0));
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testMaybeAddPartitionToTransactionBeforeBeginTransaction() {
+ long pid = 13131L;
+ short epoch = 1;
+ doInitTransactions(pid, epoch);
+ transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0));
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testMaybeAddPartitionToTransactionAfterAbortableError() {
+ long pid = 13131L;
+ short epoch = 1;
+ doInitTransactions(pid, epoch);
+ transactionManager.beginTransaction();
+ transactionManager.transitionToAbortableError(new KafkaException());
+ transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0));
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testMaybeAddPartitionToTransactionAfterFatalError() {
+ long pid = 13131L;
+ short epoch = 1;
+ doInitTransactions(pid, epoch);
+ transactionManager.transitionToFatalError(new KafkaException());
+ transactionManager.maybeAddPartitionToTransaction(new TopicPartition("foo", 0));
+ }
+
+ @Test
+ public void testSendToPartitionAllowedWithPendingPartitionAfterAbortableError() {
+ final long pid = 13131L;
+ final short epoch = 1;
+
+ doInitTransactions(pid, epoch);
+
+ transactionManager.beginTransaction();
+ transactionManager.maybeAddPartitionToTransaction(tp0);
+ transactionManager.transitionToAbortableError(new KafkaException());
+
+ assertFalse(transactionManager.sendToPartitionAllowed(tp0));
+ assertTrue(transactionManager.hasAbortableError());
+ }
+
+ @Test
+ public void testSendToPartitionAllowedWithInFlightPartitionAddAfterAbortableError() {
+ final long pid = 13131L;
+ final short epoch = 1;
+
+ doInitTransactions(pid, epoch);
+
+ transactionManager.beginTransaction();
+ transactionManager.maybeAddPartitionToTransaction(tp0);
+
+ // Send the AddPartitionsToTxn request and leave it in-flight
+ sender.run(time.milliseconds());
+ transactionManager.transitionToAbortableError(new KafkaException());
+
+ assertFalse(transactionManager.sendToPartitionAllowed(tp0));
+ assertTrue(transactionManager.hasAbortableError());
+ }
+
+ @Test
+ public void testSendToPartitionAllowedWithPendingPartitionAfterFatalError() {
+ final long pid = 13131L;
+ final short epoch = 1;
+
+ doInitTransactions(pid, epoch);
+
+ transactionManager.beginTransaction();
+ transactionManager.maybeAddPartitionToTransaction(tp0);
+ transactionManager.transitionToFatalError(new KafkaException());
+
+ assertFalse(transactionManager.sendToPartitionAllowed(tp0));
+ assertTrue(transactionManager.hasFatalError());
+ }
+
+ @Test
+ public void testSendToPartitionAllowedWithInFlightPartitionAddAfterFatalError() {
+ final long pid = 13131L;
+ final short epoch = 1;
+
+ doInitTransactions(pid, epoch);
+
+ transactionManager.beginTransaction();
+ transactionManager.maybeAddPartitionToTransaction(tp0);
+
+ // Send the AddPartitionsToTxn request and leave it in-flight
+ sender.run(time.milliseconds());
+ transactionManager.transitionToFatalError(new KafkaException());
+
+ assertFalse(transactionManager.sendToPartitionAllowed(tp0));
+ assertTrue(transactionManager.hasFatalError());
+ }
+
+ @Test
+ public void testSendToPartitionAllowedWithAddedPartitionAfterAbortableError() {
+ final long pid = 13131L;
+ final short epoch = 1;
+
+ doInitTransactions(pid, epoch);
+
+ transactionManager.beginTransaction();
+
+ transactionManager.maybeAddPartitionToTransaction(tp0);
+ prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
+ sender.run(time.milliseconds());
+ assertFalse(transactionManager.hasPartitionsToAdd());
+ transactionManager.transitionToAbortableError(new KafkaException());
+
+ assertTrue(transactionManager.sendToPartitionAllowed(tp0));
+ assertTrue(transactionManager.hasAbortableError());
+ }
+
+ @Test
+ public void testSendToPartitionAllowedWithAddedPartitionAfterFatalError() {
+ final long pid = 13131L;
+ final short epoch = 1;
+
+ doInitTransactions(pid, epoch);
+
+ transactionManager.beginTransaction();
+ transactionManager.maybeAddPartitionToTransaction(tp0);
+ prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
+ sender.run(time.milliseconds());
+ assertFalse(transactionManager.hasPartitionsToAdd());
+ transactionManager.transitionToFatalError(new KafkaException());
+
+ assertFalse(transactionManager.sendToPartitionAllowed(tp0));
+ assertTrue(transactionManager.hasFatalError());
+ }
+
+ @Test
+ public void testSendToPartitionAllowedWithUnaddedPartition() {
+ final long pid = 13131L;
+ final short epoch = 1;
+ doInitTransactions(pid, epoch);
+ transactionManager.beginTransaction();
+ assertFalse(transactionManager.sendToPartitionAllowed(tp0));
+ }
+
+ @Test(expected = IllegalStateException.class)
public void testInvalidSequenceIncrement() {
TransactionManager transactionManager = new TransactionManager();
transactionManager.incrementSequenceNumber(tp0, 3333);
@@ -148,8 +471,6 @@ public class TransactionManagerTest {
@Test
public void testBasicTransaction() throws InterruptedException {
- // This is called from the initTransactions method in the producer as the first order of business.
- // It finds the coordinator and then gets a PID.
final long pid = 13131L;
final short epoch = 1;
@@ -166,11 +487,11 @@ public class TransactionManagerTest {
prepareProduceResponse(Errors.NONE, pid, epoch);
assertFalse(transactionManager.transactionContainsPartition(tp0));
- assertFalse(transactionManager.ensurePartitionAdded(tp0));
+ assertFalse(transactionManager.sendToPartitionAllowed(tp0));
sender.run(time.milliseconds()); // send addPartitions.
// Check that only addPartitions was sent.
assertTrue(transactionManager.transactionContainsPartition(tp0));
- assertTrue(transactionManager.ensurePartitionAdded(tp0));
+ assertTrue(transactionManager.sendToPartitionAllowed(tp0));
assertFalse(responseFuture.isDone());
sender.run(time.milliseconds()); // send produce request.
@@ -210,7 +531,7 @@ public class TransactionManagerTest {
prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
sender.run(time.milliseconds()); // commit.
- assertFalse(transactionManager.isInTransaction());
+ assertFalse(transactionManager.hasOngoingTransaction());
assertFalse(transactionManager.isCompletingTransaction());
assertFalse(transactionManager.transactionContainsPartition(tp0));
}
@@ -507,12 +828,12 @@ public class TransactionManagerTest {
prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
assertFalse(commitResult.isCompleted());
- assertTrue(transactionManager.isInTransaction());
+ assertTrue(transactionManager.hasOngoingTransaction());
assertTrue(transactionManager.isCompletingTransaction());
sender.run(time.milliseconds());
assertTrue(commitResult.isCompleted());
- assertFalse(transactionManager.isInTransaction());
+ assertFalse(transactionManager.hasOngoingTransaction());
}
@Test
@@ -910,8 +1231,8 @@ public class TransactionManagerTest {
accumulator.append(tp1, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT);
- assertFalse(transactionManager.ensurePartitionAdded(tp0));
- assertFalse(transactionManager.ensurePartitionAdded(tp1));
+ assertFalse(transactionManager.sendToPartitionAllowed(tp0));
+ assertFalse(transactionManager.sendToPartitionAllowed(tp1));
Node node1 = new Node(0, "localhost", 1111);
Node node2 = new Node(1, "localhost", 1112);
@@ -951,7 +1272,7 @@ public class TransactionManagerTest {
sender.run(time.milliseconds()); // Send AddPartitions, should be in abortable state.
assertTrue(transactionManager.hasAbortableError());
- assertTrue(transactionManager.ensurePartitionAdded(tp1));
+ assertTrue(transactionManager.sendToPartitionAllowed(tp1));
// Try to drain a message destined for tp1, it should get drained.
Node node1 = new Node(1, "localhost", 1112);
@@ -993,7 +1314,6 @@ public class TransactionManagerTest {
// We shouldn't drain batches which haven't been added to the transaction yet.
assertTrue(drainedBatches.containsKey(node1.id()));
assertTrue(drainedBatches.get(node1.id()).isEmpty());
- assertTrue(transactionManager.hasFatalError());
}
private void verifyAddPartitionsFailsWithPartitionLevelError(final Errors error) throws InterruptedException {