You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/06/16 00:18:11 UTC
kafka git commit: KAFKA-5449;
Fix race condition on producer dequeuing of EndTxn request
Repository: kafka
Updated Branches:
refs/heads/trunk 6cc29e321 -> 54a3718a9
KAFKA-5449; Fix race condition on producer dequeuing of EndTxn request
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Apurva Mehta <ap...@confluent.io>, Ismael Juma <is...@juma.me.uk>
Closes #3348 from hachikuji/fix-has-unflushed-synchronization
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/54a3718a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/54a3718a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/54a3718a
Branch: refs/heads/trunk
Commit: 54a3718a900a5286baf2193713ea8d58ca2c08f6
Parents: 6cc29e3
Author: Jason Gustafson <ja...@confluent.io>
Authored: Fri Jun 16 01:17:54 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Jun 16 01:17:54 2017 +0100
----------------------------------------------------------------------
.../kafka/clients/producer/KafkaProducer.java | 4 +-
.../producer/internals/IncompleteBatches.java | 59 ++++++
.../producer/internals/ProducerBatch.java | 2 +-
.../producer/internals/RecordAccumulator.java | 64 ++----
.../clients/producer/internals/Sender.java | 22 +-
.../producer/internals/TransactionManager.java | 26 ++-
.../internals/RecordAccumulatorTest.java | 80 +++++++-
.../internals/TransactionManagerTest.java | 199 +++++++++++++++----
8 files changed, 341 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/54a3718a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 1be30f1..5be20e2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -496,7 +496,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
public void commitTransaction() throws ProducerFencedException {
if (transactionManager == null)
throw new IllegalStateException("Cannot commit transaction since transactions are not enabled");
- TransactionalRequestResult result = transactionManager.beginCommittingTransaction();
+ TransactionalRequestResult result = transactionManager.beginCommit();
sender.wakeup();
result.await();
}
@@ -510,7 +510,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
public void abortTransaction() throws ProducerFencedException {
if (transactionManager == null)
throw new IllegalStateException("Cannot abort transaction since transactions are not enabled.");
- TransactionalRequestResult result = transactionManager.beginAbortingTransaction();
+ TransactionalRequestResult result = transactionManager.beginAbort();
sender.wakeup();
result.await();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/54a3718a/clients/src/main/java/org/apache/kafka/clients/producer/internals/IncompleteBatches.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/IncompleteBatches.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/IncompleteBatches.java
new file mode 100644
index 0000000..93b9596
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/IncompleteBatches.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+
+/*
+ * A thread-safe helper class to hold batches that haven't been acknowledged yet (including those
+ * which have and have not been sent).
+ */
+class IncompleteBatches {
+ private final Set<ProducerBatch> incomplete;
+
+ public IncompleteBatches() {
+ this.incomplete = new HashSet<>();
+ }
+
+ public void add(ProducerBatch batch) {
+ synchronized (incomplete) {
+ this.incomplete.add(batch);
+ }
+ }
+
+ public void remove(ProducerBatch batch) {
+ synchronized (incomplete) {
+ boolean removed = this.incomplete.remove(batch);
+ if (!removed)
+ throw new IllegalStateException("Remove from the incomplete set failed. This should be impossible.");
+ }
+ }
+
+ public Iterable<ProducerBatch> copyAll() {
+ synchronized (incomplete) {
+ return new ArrayList<>(this.incomplete);
+ }
+ }
+
+ public boolean isEmpty() {
+ synchronized (incomplete) {
+ return incomplete.isEmpty();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/54a3718a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index 9d8b82d..5679c26 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -56,7 +56,7 @@ public final class ProducerBatch {
private static final Logger log = LoggerFactory.getLogger(ProducerBatch.class);
- private enum FinalState { ABORTED, FAILED, SUCCEEDED };
+ private enum FinalState { ABORTED, FAILED, SUCCEEDED }
final long createdMs;
final TopicPartition topicPartition;
http://git-wip-us.apache.org/repos/asf/kafka/blob/54a3718a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 505417c..10c68d8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -100,8 +100,8 @@ public final class RecordAccumulator {
* @param metrics The metrics
* @param time The time instance to use
* @param apiVersions Request API versions for current connected brokers
- * @param transactionManager The shared transaction state object which tracks Pids, epochs, and sequence numbers per
- * partition.
+ * @param transactionManager The shared transaction state object which tracks producer IDs, epochs, and sequence
+ * numbers per partition.
*/
public RecordAccumulator(int batchSize,
long totalSize,
@@ -399,9 +399,9 @@ public final class RecordAccumulator {
}
/**
- * @return Whether there is any unsent record in the accumulator.
+ * Check whether there are any batches which haven't been drained
*/
- public boolean hasUnsent() {
+ public boolean hasUndrained() {
for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
Deque<ProducerBatch> deque = entry.getValue();
synchronized (deque) {
@@ -569,19 +569,18 @@ public final class RecordAccumulator {
*/
public void awaitFlushCompletion() throws InterruptedException {
try {
- for (ProducerBatch batch : this.incomplete.all())
+ for (ProducerBatch batch : this.incomplete.copyAll())
batch.produceFuture.await();
} finally {
this.flushesInProgress.decrementAndGet();
}
}
- public boolean hasUnflushedBatches() {
- for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches().entrySet()) {
- if (!entry.getValue().isEmpty())
- return true;
- }
- return !this.incomplete.incomplete.isEmpty();
+ /**
+ * Check whether there are any pending batches (whether sent or unsent).
+ */
+ public boolean hasIncomplete() {
+ return !this.incomplete.isEmpty();
}
/**
@@ -610,8 +609,11 @@ public final class RecordAccumulator {
abortBatches(new IllegalStateException("Producer is closed forcefully."));
}
+ /**
+ * Abort all incomplete batches (whether they have been sent or not)
+ */
void abortBatches(final RuntimeException reason) {
- for (ProducerBatch batch : incomplete.all()) {
+ for (ProducerBatch batch : incomplete.copyAll()) {
Deque<ProducerBatch> dq = getDeque(batch.topicPartition);
synchronized (dq) {
batch.abortRecordAppends();
@@ -622,8 +624,11 @@ public final class RecordAccumulator {
}
}
- void abortOpenBatches(RuntimeException reason) {
- for (ProducerBatch batch : incomplete.all()) {
+ /**
+ * Abort any batches which have not been drained
+ */
+ void abortUndrainedBatches(RuntimeException reason) {
+ for (ProducerBatch batch : incomplete.copyAll()) {
Deque<ProducerBatch> dq = getDeque(batch.topicPartition);
boolean aborted = false;
synchronized (dq) {
@@ -685,35 +690,4 @@ public final class RecordAccumulator {
}
}
- /*
- * A threadsafe helper class to hold batches that haven't been ack'd yet
- */
- private final static class IncompleteBatches {
- private final Set<ProducerBatch> incomplete;
-
- public IncompleteBatches() {
- this.incomplete = new HashSet<>();
- }
-
- public void add(ProducerBatch batch) {
- synchronized (incomplete) {
- this.incomplete.add(batch);
- }
- }
-
- public void remove(ProducerBatch batch) {
- synchronized (incomplete) {
- boolean removed = this.incomplete.remove(batch);
- if (!removed)
- throw new IllegalStateException("Remove from the incomplete set failed. This should be impossible.");
- }
- }
-
- public Iterable<ProducerBatch> all() {
- synchronized (incomplete) {
- return new ArrayList<>(this.incomplete);
- }
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/54a3718a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index e2bff27..27eb244 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -170,7 +170,7 @@ public class Sender implements Runnable {
// okay we stopped accepting requests but there may still be
// requests in the accumulator or waiting for acknowledgment,
// wait until these are completed.
- while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) {
+ while (!forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) {
try {
run(time.milliseconds());
} catch (Exception e) {
@@ -201,7 +201,7 @@ public class Sender implements Runnable {
if (!transactionManager.isTransactional()) {
// this is an idempotent producer, so make sure we have a producer id
maybeWaitForProducerId();
- } else if (transactionManager.hasInflightRequest() || maybeSendTransactionalRequest(now)) {
+ } else if (transactionManager.hasInFlightRequest() || maybeSendTransactionalRequest(now)) {
// as long as there are outstanding transactional requests, we simply wait for them to return
client.poll(retryBackoffMs, now);
return;
@@ -216,7 +216,7 @@ public class Sender implements Runnable {
client.poll(retryBackoffMs, now);
return;
} else if (transactionManager.hasAbortableError()) {
- accumulator.abortOpenBatches(transactionManager.lastError());
+ accumulator.abortUndrainedBatches(transactionManager.lastError());
}
}
@@ -302,13 +302,9 @@ public class Sender implements Runnable {
}
private boolean maybeSendTransactionalRequest(long now) {
- if (transactionManager.isCompletingTransaction() &&
- !transactionManager.hasPartitionsToAdd() &&
- accumulator.hasUnflushedBatches()) {
-
- // If the transaction is being aborted, then we can clear any unsent produce requests
+ if (transactionManager.isCompleting() && accumulator.hasIncomplete()) {
if (transactionManager.isAborting())
- accumulator.abortOpenBatches(new KafkaException("Failing batch since transaction was aborted"));
+ accumulator.abortUndrainedBatches(new KafkaException("Failing batch since transaction was aborted"));
// There may still be requests left which are being retried. Since we do not know whether they had
// been successfully appended to the broker log, we must resend them until their final status is clear.
@@ -316,13 +312,9 @@ public class Sender implements Runnable {
// be correct which would lead to an OutOfSequenceException.
if (!accumulator.flushInProgress())
accumulator.beginFlush();
-
- // Do not send the EndTxn until all pending batches have been completed
- if (accumulator.hasUnflushedBatches())
- return false;
}
- TransactionManager.TxnRequestHandler nextRequestHandler = transactionManager.nextRequestHandler();
+ TransactionManager.TxnRequestHandler nextRequestHandler = transactionManager.nextRequestHandler(accumulator.hasIncomplete());
if (nextRequestHandler == null)
return false;
@@ -377,7 +369,7 @@ public class Sender implements Runnable {
}
private void maybeAbortBatches(RuntimeException exception) {
- if (accumulator.hasUnflushedBatches()) {
+ if (accumulator.hasIncomplete()) {
String logPrefix = "";
if (transactionManager != null)
logPrefix = transactionManager.logPrefix;
http://git-wip-us.apache.org/repos/asf/kafka/blob/54a3718a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 5c498db..f2deca3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -173,14 +173,14 @@ public class TransactionManager {
transitionTo(State.IN_TRANSACTION);
}
- public synchronized TransactionalRequestResult beginCommittingTransaction() {
+ public synchronized TransactionalRequestResult beginCommit() {
ensureTransactional();
maybeFailWithError();
transitionTo(State.COMMITTING_TRANSACTION);
return beginCompletingTransaction(TransactionResult.COMMIT);
}
- public synchronized TransactionalRequestResult beginAbortingTransaction() {
+ public synchronized TransactionalRequestResult beginAbort() {
ensureTransactional();
if (currentState != State.ABORTABLE_ERROR)
maybeFailWithError();
@@ -268,7 +268,7 @@ public class TransactionManager {
return !newPartitionsInTransaction.isEmpty() || !pendingPartitionsInTransaction.isEmpty();
}
- synchronized boolean isCompletingTransaction() {
+ synchronized boolean isCompleting() {
return currentState == State.COMMITTING_TRANSACTION || currentState == State.ABORTING_TRANSACTION;
}
@@ -377,18 +377,26 @@ public class TransactionManager {
sequenceNumbers.put(topicPartition, currentSequenceNumber);
}
- synchronized TxnRequestHandler nextRequestHandler() {
+ synchronized TxnRequestHandler nextRequestHandler(boolean hasIncompleteBatches) {
if (!newPartitionsInTransaction.isEmpty())
enqueueRequest(addPartitionsToTransactionHandler());
- TxnRequestHandler nextRequestHandler = pendingRequests.poll();
- if (nextRequestHandler != null && maybeTerminateRequestWithError(nextRequestHandler)) {
+ TxnRequestHandler nextRequestHandler = pendingRequests.peek();
+ if (nextRequestHandler == null)
+ return null;
+
+ // Do not send the EndTxn until all batches have been flushed
+ if (nextRequestHandler.isEndTxn() && hasIncompleteBatches)
+ return null;
+
+ pendingRequests.poll();
+ if (maybeTerminateRequestWithError(nextRequestHandler)) {
log.trace("{}Not sending transactional request {} because we are in an error state",
logPrefix, nextRequestHandler.requestBuilder());
return null;
}
- if (nextRequestHandler != null && nextRequestHandler.isEndTxn() && !transactionStarted) {
+ if (nextRequestHandler.isEndTxn() && !transactionStarted) {
nextRequestHandler.result.done();
if (currentState != State.FATAL_ERROR) {
log.debug("{}Not sending EndTxn for completed transaction since no partitions " +
@@ -432,7 +440,7 @@ public class TransactionManager {
inFlightRequestCorrelationId = NO_INFLIGHT_REQUEST_CORRELATION_ID;
}
- boolean hasInflightRequest() {
+ boolean hasInFlightRequest() {
return inFlightRequestCorrelationId != NO_INFLIGHT_REQUEST_CORRELATION_ID;
}
@@ -459,7 +467,7 @@ public class TransactionManager {
// visible for testing
synchronized boolean hasOngoingTransaction() {
// transactions are considered ongoing once started until completion or a fatal error
- return currentState == State.IN_TRANSACTION || isCompletingTransaction() || hasAbortableError();
+ return currentState == State.IN_TRANSACTION || isCompleting() || hasAbortableError();
}
// visible for testing
http://git-wip-us.apache.org/repos/asf/kafka/blob/54a3718a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 3929156..81487b9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
@@ -322,8 +323,10 @@ public class RecordAccumulatorTest {
long lingerMs = Long.MAX_VALUE;
final RecordAccumulator accum = new RecordAccumulator(4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024,
CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions(), null);
- for (int i = 0; i < 100; i++)
+ for (int i = 0; i < 100; i++) {
accum.append(new TopicPartition(topic, i % 3), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs);
+ assertTrue(accum.hasIncomplete());
+ }
RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
@@ -332,13 +335,16 @@ public class RecordAccumulatorTest {
// drain and deallocate all batches
Map<Integer, List<ProducerBatch>> results = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
+ assertTrue(accum.hasIncomplete());
+
for (List<ProducerBatch> batches: results.values())
for (ProducerBatch batch: batches)
accum.deallocate(batch);
// should be complete with no unsent records.
accum.awaitFlushCompletion();
- assertFalse(accum.hasUnsent());
+ assertFalse(accum.hasUndrained());
+ assertFalse(accum.hasIncomplete());
}
@@ -369,12 +375,13 @@ public class RecordAccumulatorTest {
}
}
-
@Test
public void testAbortIncompleteBatches() throws Exception {
long lingerMs = Long.MAX_VALUE;
+ int numRecords = 100;
+
final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0);
- final RecordAccumulator accum = new RecordAccumulator(4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024,
+ final RecordAccumulator accum = new RecordAccumulator(128 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024,
CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions(), null);
class TestCallback implements Callback {
@Override
@@ -383,14 +390,71 @@ public class RecordAccumulatorTest {
numExceptionReceivedInCallback.incrementAndGet();
}
}
- for (int i = 0; i < 100; i++)
+ for (int i = 0; i < numRecords; i++)
accum.append(new TopicPartition(topic, i % 3), 0L, key, value, null, new TestCallback(), maxBlockTimeMs);
RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
- assertEquals("No nodes should be ready.", 0, result.readyNodes.size());
+ assertFalse(result.readyNodes.isEmpty());
+ Map<Integer, List<ProducerBatch>> drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
+ assertTrue(accum.hasUndrained());
+ assertTrue(accum.hasIncomplete());
+
+ int numDrainedRecords = 0;
+ for (Map.Entry<Integer, List<ProducerBatch>> drainedEntry : drained.entrySet()) {
+ for (ProducerBatch batch : drainedEntry.getValue()) {
+ assertTrue(batch.isClosed());
+ assertFalse(batch.produceFuture.completed());
+ numDrainedRecords += batch.recordCount;
+ }
+ }
+ assertTrue(numDrainedRecords > 0 && numDrainedRecords < numRecords);
accum.abortIncompleteBatches();
- assertEquals(numExceptionReceivedInCallback.get(), 100);
- assertFalse(accum.hasUnsent());
+ assertEquals(numRecords, numExceptionReceivedInCallback.get());
+ assertFalse(accum.hasUndrained());
+ assertFalse(accum.hasIncomplete());
+ }
+
+ @Test
+ public void testAbortUnsentBatches() throws Exception {
+ long lingerMs = Long.MAX_VALUE;
+ int numRecords = 100;
+
+ final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0);
+ final RecordAccumulator accum = new RecordAccumulator(128 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024,
+ CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions(), null);
+ final KafkaException cause = new KafkaException();
+
+ class TestCallback implements Callback {
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception exception) {
+ assertEquals(cause, exception);
+ numExceptionReceivedInCallback.incrementAndGet();
+ }
+ }
+ for (int i = 0; i < numRecords; i++)
+ accum.append(new TopicPartition(topic, i % 3), 0L, key, value, null, new TestCallback(), maxBlockTimeMs);
+ RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
+ assertFalse(result.readyNodes.isEmpty());
+ Map<Integer, List<ProducerBatch>> drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE,
+ time.milliseconds());
+ assertTrue(accum.hasUndrained());
+ assertTrue(accum.hasIncomplete());
+
+ accum.abortUndrainedBatches(cause);
+ int numDrainedRecords = 0;
+ for (Map.Entry<Integer, List<ProducerBatch>> drainedEntry : drained.entrySet()) {
+ for (ProducerBatch batch : drainedEntry.getValue()) {
+ assertTrue(batch.isClosed());
+ assertFalse(batch.produceFuture.completed());
+ numDrainedRecords += batch.recordCount;
+ }
+ }
+
+ assertTrue(numDrainedRecords > 0);
+ assertTrue(numExceptionReceivedInCallback.get() > 0);
+ assertEquals(numRecords, numExceptionReceivedInCallback.get() + numDrainedRecords);
+ assertFalse(accum.hasUndrained());
+ assertTrue(accum.hasIncomplete());
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/54a3718a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index bdad483..14b2835 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -124,6 +124,23 @@ public class TransactionManagerTest {
client.setNode(brokerNode);
}
+ @Test
+ public void testEndTxnNotSentIfIncompleteBatches() {
+ long pid = 13131L;
+ short epoch = 1;
+ doInitTransactions(pid, epoch);
+ transactionManager.beginTransaction();
+
+ transactionManager.maybeAddPartitionToTransaction(tp0);
+ prepareAddPartitionsToTxn(tp0, Errors.NONE);
+ sender.run(time.milliseconds());
+ assertTrue(transactionManager.isPartitionAdded(tp0));
+
+ transactionManager.beginCommit();
+ assertNull(transactionManager.nextRequestHandler(true));
+ assertTrue(transactionManager.nextRequestHandler(false).isEndTxn());
+ }
+
@Test(expected = IllegalStateException.class)
public void testFailIfNotReadyForSendNoProducerId() {
transactionManager.failIfNotReadyForSend();
@@ -188,7 +205,7 @@ public class TransactionManagerTest {
prepareAddPartitionsToTxn(partition, Errors.NONE);
sender.run(time.milliseconds());
- transactionManager.beginAbortingTransaction();
+ transactionManager.beginAbort();
assertTrue(transactionManager.hasOngoingTransaction());
prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
@@ -215,7 +232,7 @@ public class TransactionManagerTest {
prepareAddPartitionsToTxn(partition, Errors.NONE);
sender.run(time.milliseconds());
- transactionManager.beginCommittingTransaction();
+ transactionManager.beginCommit();
assertTrue(transactionManager.hasOngoingTransaction());
prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
@@ -245,7 +262,7 @@ public class TransactionManagerTest {
transactionManager.transitionToAbortableError(new KafkaException());
assertTrue(transactionManager.hasOngoingTransaction());
- transactionManager.beginAbortingTransaction();
+ transactionManager.beginAbort();
assertTrue(transactionManager.hasOngoingTransaction());
prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
@@ -529,12 +546,12 @@ public class TransactionManagerTest {
assertFalse(transactionManager.hasPendingOffsetCommits());
assertTrue(addOffsetsResult.isCompleted()); // We should only be done after both RPCs complete.
- transactionManager.beginCommittingTransaction();
+ transactionManager.beginCommit();
prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
sender.run(time.milliseconds()); // commit.
assertFalse(transactionManager.hasOngoingTransaction());
- assertFalse(transactionManager.isCompletingTransaction());
+ assertFalse(transactionManager.isCompleting());
assertFalse(transactionManager.transactionContainsPartition(tp0));
}
@@ -866,7 +883,7 @@ public class TransactionManagerTest {
sender.run(time.milliseconds());
assertTrue(transactionManager.hasAbortableError());
- transactionManager.beginAbortingTransaction();
+ transactionManager.beginAbort();
sender.run(time.milliseconds());
assertTrue(responseFuture.isDone());
assertFutureFailed(responseFuture);
@@ -875,7 +892,7 @@ public class TransactionManagerTest {
sender.run(time.milliseconds());
assertTrue(transactionManager.isReady());
assertFalse(transactionManager.hasPartitionsToAdd());
- assertFalse(accumulator.hasUnflushedBatches());
+ assertFalse(accumulator.hasIncomplete());
// ensure we can now start a new transaction
@@ -890,7 +907,7 @@ public class TransactionManagerTest {
assertTrue(transactionManager.isPartitionAdded(tp0));
assertFalse(transactionManager.hasPartitionsToAdd());
- transactionManager.beginCommittingTransaction();
+ transactionManager.beginCommit();
prepareProduceResponse(Errors.NONE, pid, epoch);
sender.run(time.milliseconds());
@@ -932,14 +949,14 @@ public class TransactionManagerTest {
assertFalse(unauthorizedTopicProduceFuture.isDone());
prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
- transactionManager.beginAbortingTransaction();
+ transactionManager.beginAbort();
sender.run(time.milliseconds());
// neither produce request has been sent, so they should both be failed immediately
assertFutureFailed(authorizedTopicProduceFuture);
assertFutureFailed(unauthorizedTopicProduceFuture);
assertTrue(transactionManager.isReady());
assertFalse(transactionManager.hasPartitionsToAdd());
- assertFalse(accumulator.hasUnflushedBatches());
+ assertFalse(accumulator.hasIncomplete());
// ensure we can now start a new transaction
@@ -954,7 +971,7 @@ public class TransactionManagerTest {
assertTrue(transactionManager.isPartitionAdded(tp0));
assertFalse(transactionManager.hasPartitionsToAdd());
- transactionManager.beginCommittingTransaction();
+ transactionManager.beginCommit();
prepareProduceResponse(Errors.NONE, pid, epoch);
sender.run(time.milliseconds());
@@ -988,7 +1005,7 @@ public class TransactionManagerTest {
prepareProduceResponse(Errors.REQUEST_TIMED_OUT, pid, epoch);
sender.run(time.milliseconds());
assertFalse(authorizedTopicProduceFuture.isDone());
- assertTrue(accumulator.hasUnflushedBatches());
+ assertTrue(accumulator.hasIncomplete());
transactionManager.maybeAddPartitionToTransaction(unauthorizedPartition);
Future<RecordMetadata> unauthorizedTopicProduceFuture = accumulator.append(unauthorizedPartition, time.milliseconds(),
@@ -1007,12 +1024,12 @@ public class TransactionManagerTest {
assertNotNull(authorizedTopicProduceFuture.get());
prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
- transactionManager.beginAbortingTransaction();
+ transactionManager.beginAbort();
sender.run(time.milliseconds());
// neither produce request has been sent, so they should both be failed immediately
assertTrue(transactionManager.isReady());
assertFalse(transactionManager.hasPartitionsToAdd());
- assertFalse(accumulator.hasUnflushedBatches());
+ assertFalse(accumulator.hasIncomplete());
// ensure we can now start a new transaction
@@ -1027,7 +1044,7 @@ public class TransactionManagerTest {
assertTrue(transactionManager.isPartitionAdded(tp0));
assertFalse(transactionManager.hasPartitionsToAdd());
- transactionManager.beginCommittingTransaction();
+ transactionManager.beginCommit();
prepareProduceResponse(Errors.NONE, pid, epoch);
sender.run(time.milliseconds());
@@ -1075,7 +1092,7 @@ public class TransactionManagerTest {
assertFalse(responseFuture.isDone());
- TransactionalRequestResult commitResult = transactionManager.beginCommittingTransaction();
+ TransactionalRequestResult commitResult = transactionManager.beginCommit();
// we have an append, an add partitions request, and now also an endtxn.
// The order should be:
@@ -1097,7 +1114,7 @@ public class TransactionManagerTest {
prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
assertFalse(commitResult.isCompleted());
assertTrue(transactionManager.hasOngoingTransaction());
- assertTrue(transactionManager.isCompletingTransaction());
+ assertTrue(transactionManager.isCompleting());
sender.run(time.milliseconds());
assertTrue(commitResult.isCompleted());
@@ -1193,7 +1210,7 @@ public class TransactionManagerTest {
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
- TransactionalRequestResult commitResult = transactionManager.beginCommittingTransaction();
+ TransactionalRequestResult commitResult = transactionManager.beginCommit();
assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
prepareProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, pid, epoch);
@@ -1219,7 +1236,7 @@ public class TransactionManagerTest {
}
// Commit is not allowed, so let's abort and try again.
- TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
+ TransactionalRequestResult abortResult = transactionManager.beginAbort();
prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
sender.run(time.milliseconds()); // Send abort request. It is valid to transition from ERROR to ABORT
@@ -1249,7 +1266,7 @@ public class TransactionManagerTest {
sender.run(time.milliseconds()); // Send AddPartitionsRequest
sender.run(time.milliseconds()); // Send Produce Request, returns OutOfOrderSequenceException.
- TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
+ TransactionalRequestResult abortResult = transactionManager.beginAbort();
sender.run(time.milliseconds()); // try to abort
assertTrue(abortResult.isCompleted());
assertTrue(abortResult.isSuccessful());
@@ -1275,7 +1292,7 @@ public class TransactionManagerTest {
sender.run(time.milliseconds()); // Send AddPartitionsRequest
sender.run(time.milliseconds()); // Send Produce Request
- TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
+ TransactionalRequestResult abortResult = transactionManager.beginAbort();
assertTrue(transactionManager.isAborting());
assertFalse(transactionManager.hasError());
@@ -1294,6 +1311,110 @@ public class TransactionManagerTest {
}
@Test
+ public void testCommitTransactionWithUnsentProduceRequest() throws Exception {
+ final long pid = 13131L;
+ final short epoch = 1;
+
+ doInitTransactions(pid, epoch);
+
+ transactionManager.beginTransaction();
+ transactionManager.maybeAddPartitionToTransaction(tp0);
+
+ Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
+
+ prepareAddPartitionsToTxn(tp0, Errors.NONE);
+ sender.run(time.milliseconds());
+ assertTrue(accumulator.hasUndrained());
+
+ // committing the transaction should cause the unsent batch to be flushed
+ transactionManager.beginCommit();
+ sender.run(time.milliseconds());
+ assertFalse(accumulator.hasUndrained());
+ assertTrue(accumulator.hasIncomplete());
+ assertFalse(transactionManager.hasInFlightRequest());
+ assertFalse(responseFuture.isDone());
+
+ // until the produce future returns, we will not send EndTxn
+ sender.run(time.milliseconds());
+ assertFalse(accumulator.hasUndrained());
+ assertTrue(accumulator.hasIncomplete());
+ assertFalse(transactionManager.hasInFlightRequest());
+ assertFalse(responseFuture.isDone());
+
+ // now the produce response returns
+ sendProduceResponse(Errors.NONE, pid, epoch);
+ sender.run(time.milliseconds());
+ assertTrue(responseFuture.isDone());
+ assertFalse(accumulator.hasUndrained());
+ assertFalse(accumulator.hasIncomplete());
+ assertFalse(transactionManager.hasInFlightRequest());
+
+ // now we send EndTxn
+ sender.run(time.milliseconds());
+ assertTrue(transactionManager.hasInFlightRequest());
+ sendEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
+ sender.run(time.milliseconds());
+ assertFalse(transactionManager.hasInFlightRequest());
+ assertTrue(transactionManager.isReady());
+ }
+
+ @Test
+ public void testCommitTransactionWithInFlightProduceRequest() throws Exception {
+ final long pid = 13131L;
+ final short epoch = 1;
+
+ doInitTransactions(pid, epoch);
+
+ transactionManager.beginTransaction();
+ transactionManager.maybeAddPartitionToTransaction(tp0);
+
+ Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
+
+ prepareAddPartitionsToTxn(tp0, Errors.NONE);
+ sender.run(time.milliseconds());
+ assertTrue(accumulator.hasUndrained());
+
+ accumulator.beginFlush();
+ sender.run(time.milliseconds());
+ assertFalse(accumulator.hasUndrained());
+ assertTrue(accumulator.hasIncomplete());
+ assertFalse(transactionManager.hasInFlightRequest());
+
+ // now we begin the commit with the produce request still pending
+ transactionManager.beginCommit();
+ sender.run(time.milliseconds());
+ assertFalse(accumulator.hasUndrained());
+ assertTrue(accumulator.hasIncomplete());
+ assertFalse(transactionManager.hasInFlightRequest());
+ assertFalse(responseFuture.isDone());
+
+ // until the produce future returns, we will not send EndTxn
+ sender.run(time.milliseconds());
+ assertFalse(accumulator.hasUndrained());
+ assertTrue(accumulator.hasIncomplete());
+ assertFalse(transactionManager.hasInFlightRequest());
+ assertFalse(responseFuture.isDone());
+
+ // now the produce response returns
+ sendProduceResponse(Errors.NONE, pid, epoch);
+ sender.run(time.milliseconds());
+ assertTrue(responseFuture.isDone());
+ assertFalse(accumulator.hasUndrained());
+ assertFalse(accumulator.hasIncomplete());
+ assertFalse(transactionManager.hasInFlightRequest());
+
+ // now we send EndTxn
+ sender.run(time.milliseconds());
+ assertTrue(transactionManager.hasInFlightRequest());
+ sendEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, epoch);
+ sender.run(time.milliseconds());
+ assertFalse(transactionManager.hasInFlightRequest());
+ assertTrue(transactionManager.isReady());
+ }
+
+ @Test
public void testFindCoordinatorAllowedInAbortableErrorState() throws InterruptedException {
final long pid = 13131L;
final short epoch = 1;
@@ -1336,7 +1457,7 @@ public class TransactionManagerTest {
assertFalse(responseFuture.isDone());
- TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
+ TransactionalRequestResult abortResult = transactionManager.beginAbort();
// note since no partitions were added to the transaction, no EndTxn will be sent
sender.run(time.milliseconds()); // try to abort
@@ -1369,7 +1490,7 @@ public class TransactionManagerTest {
sender.run(time.milliseconds()); // Send AddPartitions and let it fail
assertFalse(responseFuture.isDone());
- TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
+ TransactionalRequestResult abortResult = transactionManager.beginAbort();
// we should resend the AddPartitions
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, producerEpoch, producerId);
@@ -1410,7 +1531,7 @@ public class TransactionManagerTest {
assertFalse(responseFuture.isDone());
- TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
+ TransactionalRequestResult abortResult = transactionManager.beginAbort();
// we should resend the ProduceRequest before aborting
prepareProduceResponse(Errors.NONE, producerId, producerEpoch);
@@ -1518,7 +1639,7 @@ public class TransactionManagerTest {
prepareAddPartitionsToTxnResponse(Errors.TOPIC_AUTHORIZATION_FAILED, tp0, epoch, pid);
sender.run(time.milliseconds()); // Send AddPartitionsRequest
- TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
+ TransactionalRequestResult abortResult = transactionManager.beginAbort();
assertFalse(abortResult.isCompleted());
sender.run(time.milliseconds());
@@ -1540,7 +1661,7 @@ public class TransactionManagerTest {
transactionManager.sendOffsetsToTransaction(offsets, consumerGroupId);
- TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
+ TransactionalRequestResult abortResult = transactionManager.beginAbort();
prepareAddOffsetsToTxnResponse(Errors.GROUP_AUTHORIZATION_FAILED, consumerGroupId, pid, epoch);
sender.run(time.milliseconds()); // Send AddOffsetsToTxnRequest
@@ -1566,7 +1687,7 @@ public class TransactionManagerTest {
transactionManager.sendOffsetsToTransaction(offsets, consumerGroupId);
- TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
+ TransactionalRequestResult abortResult = transactionManager.beginAbort();
prepareAddOffsetsToTxnResponse(Errors.UNKNOWN, consumerGroupId, pid, epoch);
sender.run(time.milliseconds()); // Send AddOffsetsToTxnRequest
@@ -1839,7 +1960,7 @@ public class TransactionManagerTest {
assertTrue(transactionManager.isSendToPartitionAllowed(tp0));
assertFalse(responseFuture.isDone());
- TransactionalRequestResult commitResult = transactionManager.beginCommittingTransaction();
+ TransactionalRequestResult commitResult = transactionManager.beginCommit();
// Sleep 10 seconds to make sure that the batches in the queue would be expired if they can't be drained.
time.sleep(10000);
@@ -1866,10 +1987,10 @@ public class TransactionManagerTest {
assertTrue(transactionManager.hasAbortableError());
assertTrue(transactionManager.hasOngoingTransaction());
- assertFalse(transactionManager.isCompletingTransaction());
+ assertFalse(transactionManager.isCompleting());
assertTrue(transactionManager.transactionContainsPartition(tp0));
- TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
+ TransactionalRequestResult abortResult = transactionManager.beginAbort();
prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
@@ -1995,7 +2116,15 @@ public class TransactionManagerTest {
}
private void prepareEndTxnResponse(Errors error, final TransactionResult result, final long pid, final short epoch) {
- client.prepareResponse(new MockClient.RequestMatcher() {
+ client.prepareResponse(endTxnMatcher(result, pid, epoch), new EndTxnResponse(0, error));
+ }
+
+ private void sendEndTxnResponse(Errors error, final TransactionResult result, final long pid, final short epoch) {
+ client.respond(endTxnMatcher(result, pid, epoch), new EndTxnResponse(0, error));
+ }
+
+ private MockClient.RequestMatcher endTxnMatcher(final TransactionResult result, final long pid, final short epoch) {
+ return new MockClient.RequestMatcher() {
@Override
public boolean matches(AbstractRequest body) {
EndTxnRequest endTxnRequest = (EndTxnRequest) body;
@@ -2005,7 +2134,7 @@ public class TransactionManagerTest {
assertEquals(result, endTxnRequest.command());
return true;
}
- }, new EndTxnResponse(0, error));
+ };
}
private void prepareAddOffsetsToTxnResponse(Errors error, final String consumerGroupId, final long producerId,
@@ -2066,7 +2195,7 @@ public class TransactionManagerTest {
}
assertTrue(transactionManager.hasError());
- transactionManager.beginAbortingTransaction();
+ transactionManager.beginAbort();
assertFalse(transactionManager.hasError());
}
@@ -2074,7 +2203,7 @@ public class TransactionManagerTest {
assertTrue(transactionManager.hasError());
try {
- transactionManager.beginAbortingTransaction();
+ transactionManager.beginAbort();
fail("Should have raised " + cause.getSimpleName());
} catch (KafkaException e) {
assertTrue(cause.isAssignableFrom(e.getCause().getClass()));
@@ -2083,7 +2212,7 @@ public class TransactionManagerTest {
// Transaction abort cannot clear fatal error state
try {
- transactionManager.beginAbortingTransaction();
+ transactionManager.beginAbort();
fail("Should have raised " + cause.getSimpleName());
} catch (KafkaException e) {
assertTrue(cause.isAssignableFrom(e.getCause().getClass()));