You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/09/29 02:20:46 UTC
kafka git commit: KAFKA-5957;
Prevent second deallocate if response for aborted batch returns
Repository: kafka
Updated Branches:
refs/heads/trunk 021d8a8e9 -> a86873be5
KAFKA-5957; Prevent second deallocate if response for aborted batch returns
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Apurva Mehta <ap...@confluent.io>, Ismael Juma <is...@juma.me.uk>
Closes #3942 from hachikuji/KAFKA-5957
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a86873be
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a86873be
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a86873be
Branch: refs/heads/trunk
Commit: a86873be50b3bf1585098a13b2ce710a717c0321
Parents: 021d8a8
Author: Jason Gustafson <ja...@confluent.io>
Authored: Fri Sep 29 03:20:36 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Sep 29 03:20:36 2017 +0100
----------------------------------------------------------------------
.../producer/internals/ProducerBatch.java | 6 +-
.../clients/producer/internals/Sender.java | 9 +-
.../producer/internals/ProducerBatchTest.java | 44 +++--
.../clients/producer/internals/SenderTest.java | 161 ++++++++-----------
4 files changed, 112 insertions(+), 108 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/a86873be/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index 93c843b..ea0f0f7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -163,8 +163,9 @@ public final class ProducerBatch {
* @param baseOffset The base offset of the messages assigned by the server
* @param logAppendTime The log append time or -1 if CreateTime is being used
* @param exception The exception that occurred (or null if the request was successful)
+ * @return true if the batch was completed successfully and false if the batch was previously aborted
*/
- public void done(long baseOffset, long logAppendTime, RuntimeException exception) {
+ public boolean done(long baseOffset, long logAppendTime, RuntimeException exception) {
final FinalState finalState;
if (exception == null) {
log.trace("Successfully produced messages to {} with base offset {}.", topicPartition, baseOffset);
@@ -177,13 +178,14 @@ public final class ProducerBatch {
if (!this.finalState.compareAndSet(null, finalState)) {
if (this.finalState.get() == FinalState.ABORTED) {
log.debug("ProduceResponse returned for {} after batch had already been aborted.", topicPartition);
- return;
+ return false;
} else {
throw new IllegalStateException("Batch has already been completed in final state " + this.finalState.get());
}
}
completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);
+ return true;
}
private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/a86873be/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index a15a250..8b5780b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -181,6 +181,7 @@ public class Sender implements Runnable {
if (forceClose) {
// We need to fail all the incomplete batches and wake up the threads waiting on
// the futures.
+ log.debug("Aborting incomplete batches due to forced shutdown");
this.accumulator.abortIncompleteBatches();
}
try {
@@ -587,8 +588,8 @@ public class Sender implements Runnable {
transactionManager.removeInFlightBatch(batch);
}
- batch.done(response.baseOffset, response.logAppendTime, null);
- this.accumulator.deallocate(batch);
+ if (batch.done(response.baseOffset, response.logAppendTime, null))
+ this.accumulator.deallocate(batch);
}
private void failBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, RuntimeException exception, boolean adjustSequenceNumbers) {
@@ -623,8 +624,8 @@ public class Sender implements Runnable {
this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
- batch.done(baseOffset, logAppendTime, exception);
- this.accumulator.deallocate(batch);
+ if (batch.done(baseOffset, logAppendTime, exception))
+ this.accumulator.deallocate(batch);
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/a86873be/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
index 41aa5c6..2f89d79 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.producer.internals;
+import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
@@ -33,7 +34,6 @@ import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Deque;
-import java.util.Iterator;
import java.util.concurrent.ExecutionException;
import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V0;
@@ -64,15 +64,20 @@ public class ProducerBatchTest {
@Test
public void testBatchAbort() throws Exception {
ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now);
- FutureRecordMetadata future = batch.tryAppend(now, null, new byte[10], Record.EMPTY_HEADERS, null, now);
+ MockCallback callback = new MockCallback();
+ FutureRecordMetadata future = batch.tryAppend(now, null, new byte[10], Record.EMPTY_HEADERS, callback, now);
KafkaException exception = new KafkaException();
batch.abort(exception);
assertTrue(future.isDone());
+ assertEquals(1, callback.invocations);
+ assertEquals(exception, callback.exception);
+ assertNull(callback.metadata);
// subsequent completion should be ignored
- batch.done(500L, 2342342341L, null);
- batch.done(-1, -1, new KafkaException());
+ assertFalse(batch.done(500L, 2342342341L, null));
+ assertFalse(batch.done(-1, -1, new KafkaException()));
+ assertEquals(1, callback.invocations);
assertTrue(future.isDone());
try {
@@ -86,9 +91,13 @@ public class ProducerBatchTest {
@Test
public void testBatchCannotAbortTwice() throws Exception {
ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now);
- FutureRecordMetadata future = batch.tryAppend(now, null, new byte[10], Record.EMPTY_HEADERS, null, now);
+ MockCallback callback = new MockCallback();
+ FutureRecordMetadata future = batch.tryAppend(now, null, new byte[10], Record.EMPTY_HEADERS, callback, now);
KafkaException exception = new KafkaException();
batch.abort(exception);
+ assertEquals(1, callback.invocations);
+ assertEquals(exception, callback.exception);
+ assertNull(callback.metadata);
try {
batch.abort(new KafkaException());
@@ -97,6 +106,7 @@ public class ProducerBatchTest {
// expected
}
+ assertEquals(1, callback.invocations);
assertTrue(future.isDone());
try {
future.get();
@@ -109,8 +119,12 @@ public class ProducerBatchTest {
@Test
public void testBatchCannotCompleteTwice() throws Exception {
ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now);
- FutureRecordMetadata future = batch.tryAppend(now, null, new byte[10], Record.EMPTY_HEADERS, null, now);
+ MockCallback callback = new MockCallback();
+ FutureRecordMetadata future = batch.tryAppend(now, null, new byte[10], Record.EMPTY_HEADERS, callback, now);
batch.done(500L, 10L, null);
+ assertEquals(1, callback.invocations);
+ assertNull(callback.exception);
+ assertNotNull(callback.metadata);
try {
batch.done(1000L, 20L, null);
@@ -166,9 +180,7 @@ public class ProducerBatchTest {
for (ProducerBatch splitProducerBatch : batches) {
for (RecordBatch splitBatch : splitProducerBatch.records().batches()) {
- Iterator<Record> iter = splitBatch.iterator();
- while (iter.hasNext()) {
- Record record = iter.next();
+ for (Record record : splitBatch) {
assertTrue("Header size should be 1.", record.headers().length == 1);
assertTrue("Header key should be 'header-key'.", record.headers()[0].key().equals("header-key"));
assertTrue("Header value should be 'header-value'.", new String(record.headers()[0].value()).equals("header-value"));
@@ -260,4 +272,18 @@ public class ProducerBatchTest {
assertFalse(memoryRecordsBuilder.hasRoomFor(now, null, new byte[10], Record.EMPTY_HEADERS));
assertEquals(null, batch.tryAppend(now + 1, null, new byte[10], Record.EMPTY_HEADERS, null, now + 1));
}
+
+ private static class MockCallback implements Callback {
+ private int invocations = 0;
+ private RecordMetadata metadata;
+ private Exception exception;
+
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception exception) {
+ invocations++;
+ this.metadata = metadata;
+ this.exception = exception;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a86873be/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 3a92e63..a32688b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TimeoutException;
@@ -93,7 +94,6 @@ public class SenderTest {
private static final int MAX_REQUEST_SIZE = 1024 * 1024;
private static final short ACKS_ALL = -1;
- private static final int MAX_RETRIES = 0;
private static final String CLIENT_ID = "clientId";
private static final double EPS = 0.0001;
private static final int MAX_BLOCK_TIMEOUT = 1000;
@@ -349,7 +349,7 @@ public class SenderTest {
sender.run(time.milliseconds()); // resend
}
sender.run(time.milliseconds());
- completedWithError(future, Errors.NETWORK_EXCEPTION);
+ assertFutureFailure(future, NetworkException.class);
} finally {
m.close();
}
@@ -713,14 +713,7 @@ public class SenderTest {
sendIdempotentProducerResponse(0, tp0, Errors.MESSAGE_TOO_LARGE, -1L);
sender.run(time.milliseconds()); // receive response 0, should adjust sequences of future batches.
-
- assertTrue(request1.isDone());
- try {
- request1.get();
- fail("Should have raised an error");
- } catch (Exception e) {
- assertTrue(e.getCause() instanceof RecordTooLargeException);
- }
+ assertFutureFailure(request1, RecordTooLargeException.class);
assertEquals(1, client.inFlightRequestCount());
assertEquals(-1, transactionManager.lastAckedSequence(tp0));
@@ -787,14 +780,7 @@ public class SenderTest {
sendIdempotentProducerResponse(2, tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L);
sender.run(time.milliseconds());
- assertTrue(request2.isDone());
-
- try {
- request2.get();
- fail("Expected an OutOfOrderSequenceException");
- } catch (ExecutionException e) {
- assert e.getCause() instanceof OutOfOrderSequenceException;
- }
+ assertFutureFailure(request2, OutOfOrderSequenceException.class);
}
@Test
@@ -966,13 +952,7 @@ public class SenderTest {
sender.run(time.milliseconds());
- assertTrue(request1.isDone());
- try {
- request1.get();
- fail("Should have raised timeout exception");
- } catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof TimeoutException);
- }
+ assertFutureFailure(request1, TimeoutException.class);
assertFalse(transactionManager.hasUnresolvedSequence(tp0));
}
@@ -1004,13 +984,7 @@ public class SenderTest {
client.blackout(node, 10);
sender.run(time.milliseconds()); // now expire the first batch.
- assertTrue(request1.isDone());
- try {
- request1.get();
- fail("Should have raised timeout exception");
- } catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof TimeoutException);
- }
+ assertFutureFailure(request1, TimeoutException.class);
assertTrue(transactionManager.hasUnresolvedSequence(tp0));
// let's enqueue another batch, which should not be dequeued until the unresolved state is clear.
Future<RecordMetadata> request3 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
@@ -1068,13 +1042,7 @@ public class SenderTest {
client.blackout(node, 10);
sender.run(time.milliseconds()); // now expire the first batch.
- assertTrue(request1.isDone());
- try {
- request1.get();
- fail("Should have raised timeout exception");
- } catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof TimeoutException);
- }
+ assertFutureFailure(request1, TimeoutException.class);
assertTrue(transactionManager.hasUnresolvedSequence(tp0));
// let's enqueue another batch, which should not be dequeued until the unresolved state is clear.
Future<RecordMetadata> request3 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
@@ -1086,14 +1054,7 @@ public class SenderTest {
sender.run(time.milliseconds()); // send second request
sendIdempotentProducerResponse(1, tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 1);
sender.run(time.milliseconds()); // receive second response, the third request shouldn't be sent since we are in an unresolved state.
- assertTrue(request2.isDone());
-
- try {
- request2.get();
- fail("should have failed with an exception");
- } catch (Exception e) {
- assertTrue(e.getCause() instanceof OutOfOrderSequenceException);
- }
+ assertFutureFailure(request2, OutOfOrderSequenceException.class);
Deque<ProducerBatch> batches = accumulator.batches().get(tp0);
@@ -1132,13 +1093,7 @@ public class SenderTest {
sender.run(time.milliseconds()); // now expire the batch.
- assertTrue(request1.isDone());
- try {
- request1.get();
- fail("Should have raised timeout exception");
- } catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof TimeoutException);
- }
+ assertFutureFailure(request1, TimeoutException.class);
assertTrue(transactionManager.hasUnresolvedSequence(tp0));
assertFalse(client.hasInFlightRequests());
Deque<ProducerBatch> batches = accumulator.batches().get(tp0);
@@ -1444,14 +1399,7 @@ public class SenderTest {
sendIdempotentProducerResponse(1, tp0, Errors.UNKNOWN_PRODUCER_ID, -1L, 10L);
sender.run(time.milliseconds()); // receive response 0, should cause a producerId reset since the logStartOffset < lastAckedOffset
-
- assertTrue(request2.isDone());
- try {
- request2.get();
- fail("Should have raised an OutOfOrderSequenceException");
- } catch (Exception e) {
- assertTrue(e.getCause() instanceof OutOfOrderSequenceException);
- }
+ assertFutureFailure(request2, OutOfOrderSequenceException.class);
}
void sendIdempotentProducerResponse(int expectedSequence, TopicPartition tp, Errors responseError, long responseOffset) {
@@ -1497,13 +1445,7 @@ public class SenderTest {
}, produceResponse(tp0, -1, Errors.CLUSTER_AUTHORIZATION_FAILED, 0));
sender.run(time.milliseconds());
- assertTrue(future.isDone());
- try {
- future.get();
- fail("Future should have raised ClusterAuthorizationException");
- } catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof ClusterAuthorizationException);
- }
+ assertFutureFailure(future, ClusterAuthorizationException.class);
// cluster authorization errors are fatal, so we should continue seeing it on future sends
assertTrue(transactionManager.hasFatalError());
@@ -1511,6 +1453,49 @@ public class SenderTest {
}
@Test
+ public void testCancelInFlightRequestAfterFatalError() throws Exception {
+ final long producerId = 343434L;
+ TransactionManager transactionManager = new TransactionManager();
+ setupWithTransactionState(transactionManager);
+
+ client.setNode(new Node(1, "localhost", 33343));
+ prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+ assertTrue(transactionManager.hasProducerId());
+
+ // cluster authorization is a fatal error for the producer
+ Future<RecordMetadata> future1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(),
+ null, null, MAX_BLOCK_TIMEOUT).future;
+ sender.run(time.milliseconds());
+
+ Future<RecordMetadata> future2 = accumulator.append(tp1, time.milliseconds(), "key".getBytes(), "value".getBytes(),
+ null, null, MAX_BLOCK_TIMEOUT).future;
+ sender.run(time.milliseconds());
+
+ client.respond(new MockClient.RequestMatcher() {
+ @Override
+ public boolean matches(AbstractRequest body) {
+ return body instanceof ProduceRequest && ((ProduceRequest) body).isIdempotent();
+ }
+ }, produceResponse(tp0, -1, Errors.CLUSTER_AUTHORIZATION_FAILED, 0));
+
+ sender.run(time.milliseconds());
+ assertTrue(transactionManager.hasFatalError());
+ assertFutureFailure(future1, ClusterAuthorizationException.class);
+
+ sender.run(time.milliseconds());
+ assertFutureFailure(future2, ClusterAuthorizationException.class);
+
+ // Should be fine if the second response eventually returns
+ client.respond(new MockClient.RequestMatcher() {
+ @Override
+ public boolean matches(AbstractRequest body) {
+ return body instanceof ProduceRequest && ((ProduceRequest) body).isIdempotent();
+ }
+ }, produceResponse(tp1, 0, Errors.NONE, 0));
+ sender.run(time.milliseconds());
+ }
+
+ @Test
public void testUnsupportedForMessageFormatInProduceRequest() throws Exception {
final long producerId = 343434L;
TransactionManager transactionManager = new TransactionManager();
@@ -1530,13 +1515,7 @@ public class SenderTest {
}, produceResponse(tp0, -1, Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, 0));
sender.run(time.milliseconds());
- assertTrue(future.isDone());
- try {
- future.get();
- fail("Future should have raised UnsupportedForMessageFormat");
- } catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof UnsupportedForMessageFormatException);
- }
+ assertFutureFailure(future, UnsupportedForMessageFormatException.class);
// unsupported for message format is not a fatal error
assertFalse(transactionManager.hasError());
@@ -1562,13 +1541,7 @@ public class SenderTest {
});
sender.run(time.milliseconds());
- assertTrue(future.isDone());
- try {
- future.get();
- fail("Future should have raised UnsupportedVersionException");
- } catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof UnsupportedVersionException);
- }
+ assertFutureFailure(future, UnsupportedVersionException.class);
// unsupported version errors are fatal, so we should continue seeing it on future sends
assertTrue(transactionManager.hasFatalError());
@@ -1826,16 +1799,6 @@ public class SenderTest {
};
}
- private void completedWithError(Future<RecordMetadata> future, Errors error) throws Exception {
- assertTrue("Request should be completed", future.isDone());
- try {
- future.get();
- fail("Should have thrown an exception.");
- } catch (ExecutionException e) {
- assertEquals(error.exception().getClass(), e.getCause().getClass());
- }
- }
-
private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs, long logStartOffset) {
ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, RecordBatch.NO_TIMESTAMP, logStartOffset);
Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = Collections.singletonMap(tp, resp);
@@ -1906,4 +1869,16 @@ public class SenderTest {
client.prepareResponse(new InitProducerIdResponse(0, error, pid, epoch));
}
+ private void assertFutureFailure(Future<?> future, Class<? extends Exception> expectedExceptionType)
+ throws InterruptedException {
+ assertTrue(future.isDone());
+ try {
+ future.get();
+ fail("Future should have raised " + expectedExceptionType.getName());
+ } catch (ExecutionException e) {
+ Class<? extends Throwable> causeType = e.getCause().getClass();
+ assertTrue("Unexpected cause " + causeType.getName(), expectedExceptionType.isAssignableFrom(causeType));
+ }
+ }
+
}