You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/31 03:54:08 UTC
kafka git commit: KAFKA-5251;
Producer should cancel unsent AddPartitions and Produce requests on
abort
Repository: kafka
Updated Branches:
refs/heads/trunk 3250cc767 -> d41cf1b77
KAFKA-5251; Producer should cancel unsent AddPartitions and Produce requests on abort
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Apurva Mehta <ap...@confluent.io>, Guozhang Wang <wa...@gmail.com>
Closes #3161 from hachikuji/KAFKA-5251
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d41cf1b7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d41cf1b7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d41cf1b7
Branch: refs/heads/trunk
Commit: d41cf1b77819ede5716b31683d0137eb60cb7bfb
Parents: 3250cc7
Author: Jason Gustafson <ja...@confluent.io>
Authored: Tue May 30 20:32:51 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Tue May 30 20:32:51 2017 -0700
----------------------------------------------------------------------
.../kafka/clients/producer/KafkaProducer.java | 2 +-
.../producer/internals/RecordAccumulator.java | 19 +-
.../clients/producer/internals/Sender.java | 66 +++---
.../producer/internals/TransactionManager.java | 62 +++---
.../common/requests/AddOffsetsToTxnRequest.java | 3 +-
.../requests/AddPartitionsToTxnRequest.java | 7 +-
.../kafka/common/requests/EndTxnRequest.java | 3 +-
.../kafka/common/requests/FetchRequest.java | 7 +-
.../common/requests/TxnOffsetCommitRequest.java | 3 +-
.../clients/producer/internals/SenderTest.java | 2 +-
.../internals/TransactionManagerTest.java | 209 +++++++++++++------
.../kafka/api/TransactionsTest.scala | 1 +
12 files changed, 259 insertions(+), 125 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d41cf1b7/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 4fcbcc8..dc6b911 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -697,7 +697,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
throw new IllegalStateException("Cannot perform a 'send' before completing a call to initTransactions " +
"when transactions are enabled.");
- if (transactionManager.isInErrorState()) {
+ if (transactionManager.hasError()) {
Exception lastError = transactionManager.lastError();
throw new KafkaException("Cannot perform send because at least one previous transactional or " +
"idempotent request has failed with errors.", lastError);
http://git-wip-us.apache.org/repos/asf/kafka/blob/d41cf1b7/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index d3d1b82..330c244 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -621,7 +621,6 @@ public final class RecordAccumulator {
void abortBatches(final RuntimeException reason) {
for (ProducerBatch batch : incomplete.all()) {
Deque<ProducerBatch> dq = getDeque(batch.topicPartition);
- // Close the batch before aborting
synchronized (dq) {
batch.abort();
dq.remove(batch);
@@ -631,6 +630,24 @@ public final class RecordAccumulator {
}
}
+ void abortUnclosedBatches(RuntimeException reason) {
+ for (ProducerBatch batch : incomplete.all()) {
+ Deque<ProducerBatch> dq = getDeque(batch.topicPartition);
+ boolean aborted = false;
+ synchronized (dq) {
+ if (!batch.isClosed()) {
+ aborted = true;
+ batch.abort();
+ dq.remove(batch);
+ }
+ }
+ if (aborted) {
+ batch.done(-1L, RecordBatch.NO_TIMESTAMP, reason);
+ deallocate(batch);
+ }
+ }
+ }
+
public void mutePartition(TopicPartition tp) {
muted.add(tp);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d41cf1b7/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 3fa5903..f498f7d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClientUtils;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
@@ -46,6 +47,7 @@ import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.ProduceRequest;
@@ -204,7 +206,7 @@ public class Sender implements Runnable {
// do not continue sending if the transaction manager is in a failed state or if there
// is no producer id (for the idempotent case).
- if (transactionManager.isInErrorState() || !transactionManager.hasProducerId()) {
+ if (transactionManager.hasError() || !transactionManager.hasProducerId()) {
RuntimeException lastError = transactionManager.lastError();
if (lastError != null)
maybeAbortBatches(lastError);
@@ -295,24 +297,38 @@ public class Sender implements Runnable {
}
private boolean maybeSendTransactionalRequest(long now) {
- TransactionManager.TxnRequestHandler nextRequestHandler = transactionManager.nextRequestHandler();
- if (nextRequestHandler == null) {
- log.trace("TransactionalId: {} -- There are no pending transactional requests to send",
- transactionManager.transactionalId());
- return false;
- }
-
- if (nextRequestHandler.isEndTxn() && transactionManager.isCompletingTransaction() && accumulator.hasUnflushedBatches()) {
+ String transactionalId = transactionManager.transactionalId();
+ if (transactionManager.isCompletingTransaction() &&
+ !transactionManager.hasPartitionsToAdd() &&
+ accumulator.hasUnflushedBatches()) {
+
+ // If the transaction is being aborted, then we can clear any unsent produce requests
+ if (transactionManager.isAborting())
+ accumulator.abortUnclosedBatches(new KafkaException("Failing batch since transaction was aborted"));
+
+ // There may still be requests left which are being retried. Since we do not know whether they had
+ // been successfully appended to the broker log, we must resend them until their final status is clear.
+ // If they had been appended and we did not receive the error, then our sequence number would no longer
+ // be correct which would lead to an OutOfSequenceException.
if (!accumulator.flushInProgress())
accumulator.beginFlush();
- transactionManager.reenqueue(nextRequestHandler);
- log.trace("TransactionalId: {} -- Going to wait for pending ProducerBatches to flush before sending an " +
- "end transaction request", transactionManager.transactionalId());
+
+ // Do not send the EndTxn until all pending batches have been completed
+ if (accumulator.hasUnflushedBatches()) {
+ log.trace("TransactionalId: {} -- Waiting for pending batches to be flushed before completing transaction",
+ transactionalId);
+ return false;
+ }
+ }
+
+ TransactionManager.TxnRequestHandler nextRequestHandler = transactionManager.nextRequestHandler();
+ if (nextRequestHandler == null) {
+ log.trace("TransactionalId: {} -- There are no pending transactional requests to send", transactionalId);
return false;
}
- log.debug("TransactionalId: {} -- Sending transactional request {}", transactionManager.transactionalId(),
- nextRequestHandler.requestBuilder());
+ AbstractRequest.Builder<?> requestBuilder = nextRequestHandler.requestBuilder();
+ log.trace("TransactionalId: {} -- Preparing to send request {}", transactionalId, requestBuilder);
while (true) {
Node targetNode = null;
@@ -332,25 +348,25 @@ public class Sender implements Runnable {
}
if (targetNode != null) {
if (nextRequestHandler.isRetry()) {
- log.trace("TransactionalId: {} -- Waiting {}ms before resending a transactional request {}",
- transactionManager.transactionalId(), retryBackoffMs, nextRequestHandler.requestBuilder());
+ log.trace("TransactionalId: {} -- Waiting {}ms before resending request {}",
+ transactionalId,
+ retryBackoffMs, requestBuilder);
time.sleep(retryBackoffMs);
}
ClientRequest clientRequest = client.newClientRequest(targetNode.idString(),
- nextRequestHandler.requestBuilder(), now, true, nextRequestHandler);
+ requestBuilder, now, true, nextRequestHandler);
transactionManager.setInFlightRequestCorrelationId(clientRequest.correlationId());
- log.trace("TransactionalId: {} -- Sending transactional request {} to node {}", transactionManager.transactionalId(),
- nextRequestHandler.requestBuilder(), clientRequest.destination());
+ log.debug("TransactionalId: {} -- Sending transactional request {} to node {}",
+ transactionalId, requestBuilder, clientRequest.destination());
client.send(clientRequest, now);
return true;
}
} catch (IOException e) {
- log.debug("TransactionalId: {} -- Disconnect from {} while trying to send transactional " +
- "request {}. Going to back off and retry", transactionManager.transactionalId(),
- targetNode, nextRequestHandler.requestBuilder());
+ log.debug("TransactionalId: {} -- Disconnect from {} while trying to send request {}. Going " +
+ "to back off and retry", transactionalId, targetNode, requestBuilder);
}
- log.trace("TransactionalId: {}. About to wait for {}ms before trying to send another transactional request.",
- transactionManager.transactionalId(), retryBackoffMs);
+ log.trace("TransactionalId: {} -- About to wait for {}ms before trying to send another request.",
+ transactionalId, retryBackoffMs);
time.sleep(retryBackoffMs);
metadata.requestUpdate();
}
@@ -402,7 +418,7 @@ public class Sender implements Runnable {
}
private void maybeWaitForProducerId() {
- while (!transactionManager.hasProducerId() && !transactionManager.isInErrorState()) {
+ while (!transactionManager.hasProducerId() && !transactionManager.hasError()) {
try {
Node node = awaitLeastLoadedNodeReady(requestTimeout);
if (node != null) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d41cf1b7/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index ec7ced2..30fff86 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -67,8 +67,8 @@ public class TransactionManager {
private final Map<TopicPartition, Integer> sequenceNumbers;
private final PriorityQueue<TxnRequestHandler> pendingRequests;
- private final Set<TopicPartition> newPartitionsToBeAddedToTransaction;
- private final Set<TopicPartition> pendingPartitionsToBeAddedToTransaction;
+ private final Set<TopicPartition> newPartitionsInTransaction;
+ private final Set<TopicPartition> pendingPartitionsInTransaction;
private final Set<TopicPartition> partitionsInTransaction;
private final Map<TopicPartition, CommittedOffset> pendingTxnOffsetCommits;
@@ -139,8 +139,8 @@ public class TransactionManager {
this.transactionTimeoutMs = transactionTimeoutMs;
this.transactionCoordinator = null;
this.consumerGroupCoordinator = null;
- this.newPartitionsToBeAddedToTransaction = new HashSet<>();
- this.pendingPartitionsToBeAddedToTransaction = new HashSet<>();
+ this.newPartitionsInTransaction = new HashSet<>();
+ this.pendingPartitionsInTransaction = new HashSet<>();
this.partitionsInTransaction = new HashSet<>();
this.pendingTxnOffsetCommits = new HashMap<>();
this.pendingRequests = new PriorityQueue<>(10, new Comparator<TxnRequestHandler>() {
@@ -184,11 +184,14 @@ public class TransactionManager {
if (currentState != State.ABORTABLE_ERROR)
maybeFailWithError();
transitionTo(State.ABORTING_TRANSACTION);
+
+ // We're aborting the transaction, so there should be no need to add new partitions
+ newPartitionsInTransaction.clear();
return beginCompletingTransaction(false);
}
private TransactionalRequestResult beginCompletingTransaction(boolean isCommit) {
- if (!newPartitionsToBeAddedToTransaction.isEmpty()) {
+ if (!newPartitionsInTransaction.isEmpty()) {
pendingRequests.add(addPartitionsToTransactionHandler());
}
@@ -222,7 +225,7 @@ public class TransactionManager {
if (partitionsInTransaction.contains(topicPartition))
return;
- newPartitionsToBeAddedToTransaction.add(topicPartition);
+ newPartitionsInTransaction.add(topicPartition);
}
public RuntimeException lastError() {
@@ -241,14 +244,22 @@ public class TransactionManager {
return transactionalId != null;
}
+ public synchronized boolean hasPartitionsToAdd() {
+ return !newPartitionsInTransaction.isEmpty() || !pendingPartitionsInTransaction.isEmpty();
+ }
+
public synchronized boolean isCompletingTransaction() {
return currentState == State.COMMITTING_TRANSACTION || currentState == State.ABORTING_TRANSACTION;
}
- public synchronized boolean isInErrorState() {
+ public synchronized boolean hasError() {
return currentState == State.ABORTABLE_ERROR || currentState == State.FATAL_ERROR;
}
+ public synchronized boolean isAborting() {
+ return currentState == State.ABORTING_TRANSACTION;
+ }
+
synchronized boolean isInTransaction() {
return currentState == State.IN_TRANSACTION || isCompletingTransaction();
}
@@ -334,7 +345,7 @@ public class TransactionManager {
}
synchronized TxnRequestHandler nextRequestHandler() {
- if (!newPartitionsToBeAddedToTransaction.isEmpty())
+ if (!newPartitionsInTransaction.isEmpty())
pendingRequests.add(addPartitionsToTransactionHandler());
TxnRequestHandler nextRequestHandler = pendingRequests.poll();
@@ -345,15 +356,16 @@ public class TransactionManager {
}
if (nextRequestHandler != null && nextRequestHandler.isEndTxn() && !transactionStarted) {
- ((EndTxnHandler) nextRequestHandler).result.done();
+ nextRequestHandler.result.done();
if (currentState != State.FATAL_ERROR) {
+ log.debug("TransactionId: {} -- Not sending EndTxn for completed transaction since no partitions " +
+ "or offsets were successfully added", transactionalId);
completeTransaction();
}
return pendingRequests.poll();
}
return nextRequestHandler;
-
}
synchronized void retry(TxnRequestHandler request) {
@@ -403,7 +415,7 @@ public class TransactionManager {
}
// visible for testing
- synchronized boolean isReadyForTransaction() {
+ synchronized boolean isReady() {
return isTransactional() && currentState == State.READY;
}
@@ -434,12 +446,12 @@ public class TransactionManager {
}
private void maybeFailWithError() {
- if (isInErrorState())
+ if (hasError())
throw new KafkaException("Cannot execute transactional method because we are in an error state", lastError);
}
private boolean maybeTerminateRequestWithError(TxnRequestHandler requestHandler) {
- if (isInErrorState()) {
+ if (hasError()) {
if (requestHandler instanceof EndTxnHandler) {
// we allow abort requests to break out of the error state. The state and the last error
// will be cleared when the request returns
@@ -477,10 +489,10 @@ public class TransactionManager {
}
private synchronized TxnRequestHandler addPartitionsToTransactionHandler() {
- pendingPartitionsToBeAddedToTransaction.addAll(newPartitionsToBeAddedToTransaction);
- newPartitionsToBeAddedToTransaction.clear();
+ pendingPartitionsInTransaction.addAll(newPartitionsInTransaction);
+ newPartitionsInTransaction.clear();
AddPartitionsToTxnRequest.Builder builder = new AddPartitionsToTxnRequest.Builder(transactionalId,
- producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, new ArrayList<>(pendingPartitionsToBeAddedToTransaction));
+ producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, new ArrayList<>(pendingPartitionsInTransaction));
return new AddPartitionsToTxnHandler(builder);
}
@@ -497,7 +509,7 @@ public class TransactionManager {
return new TxnOffsetCommitHandler(result, builder);
}
- abstract class TxnRequestHandler implements RequestCompletionHandler {
+ abstract class TxnRequestHandler implements RequestCompletionHandler {
protected final TransactionalRequestResult result;
private boolean isRetry = false;
@@ -658,12 +670,13 @@ public class TransactionManager {
log.debug("TransactionalId {} -- Received AddPartitionsToTxn response with errors {}",
transactionalId, errors);
- for (TopicPartition topicPartition : pendingPartitionsToBeAddedToTransaction) {
- final Errors error = errors.get(topicPartition);
- if (error == Errors.NONE || error == null) {
+ for (Map.Entry<TopicPartition, Errors> topicPartitionErrorEntry : errors.entrySet()) {
+ TopicPartition topicPartition = topicPartitionErrorEntry.getKey();
+ Errors error = topicPartitionErrorEntry.getValue();
+
+ if (error == Errors.NONE) {
continue;
- }
- if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
+ } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
reenqueue();
return;
@@ -695,8 +708,9 @@ public class TransactionManager {
} else if (hasPartitionErrors) {
abortableError(new KafkaException("Could not add partitions to transaction due to partition level errors"));
} else {
- partitionsInTransaction.addAll(pendingPartitionsToBeAddedToTransaction);
- pendingPartitionsToBeAddedToTransaction.clear();
+ Set<TopicPartition> addedPartitions = errors.keySet();
+ partitionsInTransaction.addAll(addedPartitions);
+ pendingPartitionsInTransaction.removeAll(addedPartitions);
transactionStarted = true;
result.done();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d41cf1b7/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
index 3339470..36b290f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
@@ -54,7 +54,8 @@ public class AddOffsetsToTxnRequest extends AbstractRequest {
@Override
public String toString() {
StringBuilder bld = new StringBuilder();
- bld.append("(transactionalId=").append(transactionalId).
+ bld.append("(type=AddOffsetsToTxnRequest").
+ append(", transactionalId=").append(transactionalId).
append(", producerId=").append(producerId).
append(", producerEpoch=").append(producerEpoch).
append(", consumerGroupId=").append(consumerGroupId).
http://git-wip-us.apache.org/repos/asf/kafka/blob/d41cf1b7/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
index e24fa5a..6fe034c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
@@ -55,10 +55,15 @@ public class AddPartitionsToTxnRequest extends AbstractRequest {
return new AddPartitionsToTxnRequest(version, transactionalId, producerId, producerEpoch, partitions);
}
+ public List<TopicPartition> partitions() {
+ return partitions;
+ }
+
@Override
public String toString() {
StringBuilder bld = new StringBuilder();
- bld.append("(transactionalId=").append(transactionalId).
+ bld.append("(type=AddPartitionsToTxnRequest").
+ append(", transactionalId=").append(transactionalId).
append(", producerId=").append(producerId).
append(", producerEpoch=").append(producerEpoch).
append(", partitions=").append(partitions).
http://git-wip-us.apache.org/repos/asf/kafka/blob/d41cf1b7/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
index b9f052c..01d73b2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
@@ -54,7 +54,8 @@ public class EndTxnRequest extends AbstractRequest {
@Override
public String toString() {
StringBuilder bld = new StringBuilder();
- bld.append("(transactionalId=").append(transactionalId).
+ bld.append("(type=EndTxnRequest").
+ append(", transactionalId=").append(transactionalId).
append(", producerId=").append(producerId).
append(", producerEpoch=").append(producerEpoch).
append(", result=").append(result).
http://git-wip-us.apache.org/repos/asf/kafka/blob/d41cf1b7/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index fc7d53c..39c027b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -151,7 +151,7 @@ public class FetchRequest extends AbstractRequest {
@Override
public String toString() {
StringBuilder bld = new StringBuilder();
- bld.append("(type:FetchRequest").
+ bld.append("(type=FetchRequest").
append(", replicaId=").append(replicaId).
append(", maxWait=").append(maxWait).
append(", minBytes=").append(minBytes).
@@ -163,11 +163,6 @@ public class FetchRequest extends AbstractRequest {
}
private FetchRequest(short version, int replicaId, int maxWait, int minBytes, int maxBytes,
- LinkedHashMap<TopicPartition, PartitionData> fetchData) {
- this(version, replicaId, maxWait, minBytes, maxBytes, fetchData, IsolationLevel.READ_UNCOMMITTED);
- }
-
- private FetchRequest(short version, int replicaId, int maxWait, int minBytes, int maxBytes,
LinkedHashMap<TopicPartition, PartitionData> fetchData, IsolationLevel isolationLevel) {
super(version);
this.replicaId = replicaId;
http://git-wip-us.apache.org/repos/asf/kafka/blob/d41cf1b7/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
index 68fa3d2..2ea8ecf 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java
@@ -67,7 +67,8 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
@Override
public String toString() {
StringBuilder bld = new StringBuilder();
- bld.append("(transactionalId=").append(transactionalId).
+ bld.append("(type=TxnOffsetCommitRequest").
+ append(", transactionalId=").append(transactionalId).
append(", producerId=").append(producerId).
append(", producerEpoch=").append(producerEpoch).
append(", consumerGroupId=").append(consumerGroupId).
http://git-wip-us.apache.org/repos/asf/kafka/blob/d41cf1b7/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 77b1da8..faa6ea5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -403,7 +403,7 @@ public class SenderTest {
client.setNode(new Node(1, "localhost", 33343));
prepareAndReceiveInitProducerId(producerId, Errors.CLUSTER_AUTHORIZATION_FAILED);
assertFalse(transactionManager.hasProducerId());
- assertTrue(transactionManager.isInErrorState());
+ assertTrue(transactionManager.hasError());
assertTrue(transactionManager.lastError() instanceof ClusterAuthorizationException);
// cluster authorization is a fatal error for the producer
http://git-wip-us.apache.org/repos/asf/kafka/blob/d41cf1b7/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index a1bd970..ed7ec84 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
@@ -59,7 +60,6 @@ import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.Test;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -69,6 +69,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -79,7 +80,7 @@ import static org.junit.Assert.fail;
public class TransactionManagerTest {
private static final int MAX_REQUEST_SIZE = 1024 * 1024;
private static final short ACKS_ALL = -1;
- private static final int MAX_RETRIES = 0;
+ private static final int MAX_RETRIES = Integer.MAX_VALUE;
private static final String CLIENT_ID = "clientId";
private static final int MAX_BLOCK_TIMEOUT = 1000;
private static final int REQUEST_TIMEOUT = 1000;
@@ -109,19 +110,8 @@ public class TransactionManagerTest {
this.transactionManager = new TransactionManager(transactionalId, transactionTimeoutMs);
Metrics metrics = new Metrics(metricConfig, time);
this.accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, apiVersions, transactionManager);
- this.sender = new Sender(this.client,
- this.metadata,
- this.accumulator,
- true,
- MAX_REQUEST_SIZE,
- ACKS_ALL,
- MAX_RETRIES,
- metrics,
- this.time,
- REQUEST_TIMEOUT,
- 50,
- transactionManager,
- apiVersions);
+ this.sender = new Sender(this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL,
+ MAX_RETRIES, metrics, this.time, REQUEST_TIMEOUT, 50, transactionManager, apiVersions);
this.metadata.update(this.cluster, Collections.<String>emptySet(), time.milliseconds());
client.setNode(brokerNode);
}
@@ -163,7 +153,7 @@ public class TransactionManagerTest {
transactionManager.maybeAddPartitionToTransaction(tp0);
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
- "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
@@ -271,7 +261,7 @@ public class TransactionManagerTest {
sender.run(time.milliseconds()); // find coordinator
sender.run(time.milliseconds());
- assertTrue(transactionManager.isInErrorState());
+ assertTrue(transactionManager.hasError());
assertTrue(transactionManager.lastError() instanceof TransactionalIdAuthorizationException);
sender.run(time.milliseconds()); // one more run to fail the InitProducerId future
@@ -294,7 +284,7 @@ public class TransactionManagerTest {
prepareInitPidResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED, false, pid, RecordBatch.NO_PRODUCER_EPOCH);
sender.run(time.milliseconds());
- assertTrue(transactionManager.isInErrorState());
+ assertTrue(transactionManager.hasError());
assertTrue(initPidResult.isCompleted());
assertFalse(initPidResult.isSuccessful());
assertTrue(initPidResult.error() instanceof TransactionalIdAuthorizationException);
@@ -321,7 +311,7 @@ public class TransactionManagerTest {
prepareFindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, false, CoordinatorType.GROUP, consumerGroupId);
sender.run(time.milliseconds()); // FindCoordinator Failed
sender.run(time.milliseconds()); // TxnOffsetCommit Aborted
- assertTrue(transactionManager.isInErrorState());
+ assertTrue(transactionManager.hasError());
assertTrue(transactionManager.lastError() instanceof GroupAuthorizationException);
assertTrue(sendOffsetsResult.isCompleted());
assertFalse(sendOffsetsResult.isSuccessful());
@@ -356,7 +346,7 @@ public class TransactionManagerTest {
prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, singletonMap(tp, Errors.GROUP_AUTHORIZATION_FAILED));
sender.run(time.milliseconds()); // TxnOffsetCommit Handled
- assertTrue(transactionManager.isInErrorState());
+ assertTrue(transactionManager.hasError());
assertTrue(transactionManager.lastError() instanceof GroupAuthorizationException);
assertTrue(sendOffsetsResult.isCompleted());
assertFalse(sendOffsetsResult.isSuccessful());
@@ -384,7 +374,7 @@ public class TransactionManagerTest {
prepareAddOffsetsToTxnResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED, consumerGroupId, pid, epoch);
sender.run(time.milliseconds()); // AddOffsetsToTxn Handled
- assertTrue(transactionManager.isInErrorState());
+ assertTrue(transactionManager.hasError());
assertTrue(transactionManager.lastError() instanceof TransactionalIdAuthorizationException);
assertTrue(sendOffsetsResult.isCompleted());
assertFalse(sendOffsetsResult.isSuccessful());
@@ -416,7 +406,7 @@ public class TransactionManagerTest {
prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, singletonMap(tp, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED));
sender.run(time.milliseconds()); // TxnOffsetCommit Handled
- assertTrue(transactionManager.isInErrorState());
+ assertTrue(transactionManager.hasError());
assertTrue(transactionManager.lastError() instanceof TransactionalIdAuthorizationException);
assertTrue(sendOffsetsResult.isCompleted());
assertFalse(sendOffsetsResult.isSuccessful());
@@ -439,7 +429,7 @@ public class TransactionManagerTest {
prepareAddPartitionsToTxn(tp, Errors.TOPIC_AUTHORIZATION_FAILED);
sender.run(time.milliseconds());
- assertTrue(transactionManager.isInErrorState());
+ assertTrue(transactionManager.hasError());
assertTrue(transactionManager.lastError() instanceof TopicAuthorizationException);
TopicAuthorizationException exception = (TopicAuthorizationException) transactionManager.lastError();
@@ -462,7 +452,7 @@ public class TransactionManagerTest {
prepareAddPartitionsToTxn(tp, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED);
sender.run(time.milliseconds());
- assertTrue(transactionManager.isInErrorState());
+ assertTrue(transactionManager.hasError());
assertTrue(transactionManager.lastError() instanceof TransactionalIdAuthorizationException);
assertFatalError(TransactionalIdAuthorizationException.class);
@@ -584,7 +574,7 @@ public class TransactionManagerTest {
sender.run(time.milliseconds()); // send produce.
assertTrue(responseFuture.isDone());
- assertTrue(transactionManager.isInErrorState());
+ assertTrue(transactionManager.hasError());
responseFuture.get();
}
@@ -608,16 +598,22 @@ public class TransactionManagerTest {
sender.run(time.milliseconds()); // Send AddPartitionsRequest
assertFalse(commitResult.isCompleted());
-
sender.run(time.milliseconds()); // Send Produce Request, returns OutOfOrderSequenceException.
+
sender.run(time.milliseconds()); // try to commit.
assertTrue(commitResult.isCompleted()); // commit should be cancelled with exception without being sent.
try {
commitResult.await();
fail(); // the get() must throw an exception.
- } catch (RuntimeException e) {
- assertTrue(e instanceof KafkaException);
+ } catch (KafkaException e) {
+ }
+
+ try {
+ responseFuture.get();
+ fail("Expected produce future to raise an exception");
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof OutOfOrderSequenceException);
}
// Commit is not allowed, so let's abort and try again.
@@ -627,7 +623,7 @@ public class TransactionManagerTest {
assertTrue(abortResult.isCompleted());
assertTrue(abortResult.isSuccessful());
- assertTrue(transactionManager.isReadyForTransaction()); // make sure we are ready for a transaction now.
+ assertTrue(transactionManager.isReady()); // make sure we are ready for a transaction now.
}
@Test
@@ -643,20 +639,125 @@ public class TransactionManagerTest {
Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
"value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
- TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
prepareProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, pid, epoch);
prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
sender.run(time.milliseconds()); // Send AddPartitionsRequest
- assertFalse(abortResult.isCompleted());
-
sender.run(time.milliseconds()); // Send Produce Request, returns OutOfOrderSequenceException.
+
+ TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
+ sender.run(time.milliseconds()); // try to abort
+ assertTrue(abortResult.isCompleted());
+ assertTrue(abortResult.isSuccessful());
+ assertTrue(transactionManager.isReady()); // make sure we are ready for a transaction now.
+ }
+
+ @Test
+ public void testCancelUnsentAddPartitionsAndProduceOnAbort() throws InterruptedException {
+ final long pid = 13131L;
+ final short epoch = 1;
+
+ doInitTransactions(pid, epoch);
+
+ transactionManager.beginTransaction();
+ transactionManager.maybeAddPartitionToTransaction(tp0);
+
+ Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
+
+ assertFalse(responseFuture.isDone());
+
+ TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
+ // note since no partitions were added to the transaction, no EndTxn will be sent
+
sender.run(time.milliseconds()); // try to abort
assertTrue(abortResult.isCompleted());
assertTrue(abortResult.isSuccessful());
- assertTrue(transactionManager.isReadyForTransaction()); // make sure we are ready for a transaction now.
+ assertTrue(transactionManager.isReady()); // make sure we are ready for a transaction now.
+
+ try {
+ responseFuture.get();
+ fail("Expected produce future to raise an exception");
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof KafkaException);
+ }
+ }
+
+ @Test
+ public void testAbortResendsAddPartitionErrorIfRetried() throws InterruptedException {
+ final long producerId = 13131L;
+ final short producerEpoch = 1;
+
+ doInitTransactions(producerId, producerEpoch);
+
+ transactionManager.beginTransaction();
+ transactionManager.maybeAddPartitionToTransaction(tp0);
+ prepareAddPartitionsToTxnResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION, tp0, producerEpoch, producerId);
+
+ Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
+
+ sender.run(time.milliseconds()); // Send AddPartitions and let it fail
+ assertFalse(responseFuture.isDone());
+
+ TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
+
+ // we should resend the AddPartitions
+ prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, producerEpoch, producerId);
+ prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, producerId, producerEpoch);
+
+ sender.run(time.milliseconds()); // Resend AddPartitions
+ sender.run(time.milliseconds()); // Send EndTxn
+
+ assertTrue(abortResult.isCompleted());
+ assertTrue(abortResult.isSuccessful());
+ assertTrue(transactionManager.isReady()); // make sure we are ready for a transaction now.
+
+ try {
+ responseFuture.get();
+ fail("Expected produce future to raise an exception");
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof KafkaException);
+ }
+ }
+
+ @Test
+ public void testAbortResendsProduceRequestIfRetried() throws Exception {
+ final long producerId = 13131L;
+ final short producerEpoch = 1;
+
+ doInitTransactions(producerId, producerEpoch);
+
+ transactionManager.beginTransaction();
+ transactionManager.maybeAddPartitionToTransaction(tp0);
+ prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, producerEpoch, producerId);
+ prepareProduceResponse(Errors.REQUEST_TIMED_OUT, producerId, producerEpoch);
+
+ Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
+ "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
+
+ sender.run(time.milliseconds()); // Send AddPartitions
+ sender.run(time.milliseconds()); // Send ProduceRequest and let it fail
+
+ assertFalse(responseFuture.isDone());
+
+ TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
+
+ // we should resend the ProduceRequest before aborting
+ prepareProduceResponse(Errors.NONE, producerId, producerEpoch);
+ prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, producerId, producerEpoch);
+
+ sender.run(time.milliseconds()); // Resend ProduceRequest
+ sender.run(time.milliseconds()); // Send EndTxn
+
+ assertTrue(abortResult.isCompleted());
+ assertTrue(abortResult.isSuccessful());
+ assertTrue(transactionManager.isReady()); // make sure we are ready for a transaction now.
+
+ RecordMetadata recordMetadata = responseFuture.get();
+ assertEquals(tp0.topic(), recordMetadata.topic());
}
@Test
@@ -739,27 +840,18 @@ public class TransactionManagerTest {
@Test
public void shouldNotSendAbortTxnRequestWhenOnlyAddPartitionsRequestFailed() throws Exception {
- client.setNode(brokerNode);
- // This is called from the initTransactions method in the producer as the first order of business.
- // It finds the coordinator and then gets a PID.
final long pid = 13131L;
final short epoch = 1;
- transactionManager.initializeTransactions();
- prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
- sender.run(time.milliseconds()); // find coordinator
- sender.run(time.milliseconds());
-
- prepareInitPidResponse(Errors.NONE, false, pid, epoch);
- sender.run(time.milliseconds()); // get pid.
+ doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
transactionManager.maybeAddPartitionToTransaction(tp0);
- TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
-
prepareAddPartitionsToTxnResponse(Errors.TOPIC_AUTHORIZATION_FAILED, tp0, epoch, pid);
sender.run(time.milliseconds()); // Send AddPartitionsRequest
+
+ TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
assertFalse(abortResult.isCompleted());
sender.run(time.milliseconds());
@@ -769,19 +861,10 @@ public class TransactionManagerTest {
@Test
public void shouldNotSendAbortTxnRequestWhenOnlyAddOffsetsRequestFailed() throws Exception {
- client.setNode(brokerNode);
- // This is called from the initTransactions method in the producer as the first order of business.
- // It finds the coordinator and then gets a PID.
final long pid = 13131L;
final short epoch = 1;
- transactionManager.initializeTransactions();
- prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
- sender.run(time.milliseconds()); // find coordinator
- sender.run(time.milliseconds());
-
- prepareInitPidResponse(Errors.NONE, false, pid, epoch);
- sender.run(time.milliseconds()); // get pid.
+ doInitTransactions(pid, epoch);
transactionManager.beginTransaction();
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
@@ -815,7 +898,7 @@ public class TransactionManagerTest {
assertFalse(responseFuture.isDone());
prepareAddPartitionsToTxn(tp0, error);
sender.run(time.milliseconds()); // attempt send addPartitions.
- assertTrue(transactionManager.isInErrorState());
+ assertTrue(transactionManager.hasError());
assertFalse(transactionManager.transactionContainsPartition(tp0));
}
@@ -883,7 +966,7 @@ public class TransactionManagerTest {
AddPartitionsToTxnRequest addPartitionsToTxnRequest = (AddPartitionsToTxnRequest) body;
assertEquals(pid, addPartitionsToTxnRequest.producerId());
assertEquals(epoch, addPartitionsToTxnRequest.producerEpoch());
- assertEquals(Arrays.asList(topicPartition), addPartitionsToTxnRequest.partitions());
+ assertEquals(singletonList(topicPartition), addPartitionsToTxnRequest.partitions());
assertEquals(transactionalId, addPartitionsToTxnRequest.transactionalId());
return true;
}
@@ -958,23 +1041,23 @@ public class TransactionManagerTest {
fail("Should have raised " + cause.getSimpleName());
} catch (KafkaException e) {
assertTrue(cause.isAssignableFrom(e.getCause().getClass()));
- assertTrue(transactionManager.isInErrorState());
+ assertTrue(transactionManager.hasError());
}
- assertTrue(transactionManager.isInErrorState());
+ assertTrue(transactionManager.hasError());
transactionManager.beginAbortingTransaction();
- assertFalse(transactionManager.isInErrorState());
+ assertFalse(transactionManager.hasError());
}
private void assertFatalError(Class<? extends RuntimeException> cause) {
- assertTrue(transactionManager.isInErrorState());
+ assertTrue(transactionManager.hasError());
try {
transactionManager.beginAbortingTransaction();
fail("Should have raised " + cause.getSimpleName());
} catch (KafkaException e) {
assertTrue(cause.isAssignableFrom(e.getCause().getClass()));
- assertTrue(transactionManager.isInErrorState());
+ assertTrue(transactionManager.hasError());
}
// Transaction abort cannot clear fatal error state
@@ -983,7 +1066,7 @@ public class TransactionManagerTest {
fail("Should have raised " + cause.getSimpleName());
} catch (KafkaException e) {
assertTrue(cause.isAssignableFrom(e.getCause().getClass()));
- assertTrue(transactionManager.isInErrorState());
+ assertTrue(transactionManager.hasError());
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/d41cf1b7/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index fd9d884..205dc6e 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -70,6 +70,7 @@ class TransactionsTest extends KafkaServerTestHarness {
producer.beginTransaction()
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "2", willBeCommitted = false))
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "4", "4", willBeCommitted = false))
+ producer.flush()
producer.abortTransaction()
producer.beginTransaction()