You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/09/29 02:20:46 UTC

kafka git commit: KAFKA-5957; Prevent second deallocate if response for aborted batch returns

Repository: kafka
Updated Branches:
  refs/heads/trunk 021d8a8e9 -> a86873be5


KAFKA-5957; Prevent second deallocate if response for aborted batch returns

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Apurva Mehta <ap...@confluent.io>, Ismael Juma <is...@juma.me.uk>

Closes #3942 from hachikuji/KAFKA-5957


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a86873be
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a86873be
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a86873be

Branch: refs/heads/trunk
Commit: a86873be50b3bf1585098a13b2ce710a717c0321
Parents: 021d8a8
Author: Jason Gustafson <ja...@confluent.io>
Authored: Fri Sep 29 03:20:36 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Sep 29 03:20:36 2017 +0100

----------------------------------------------------------------------
 .../producer/internals/ProducerBatch.java       |   6 +-
 .../clients/producer/internals/Sender.java      |   9 +-
 .../producer/internals/ProducerBatchTest.java   |  44 +++--
 .../clients/producer/internals/SenderTest.java  | 161 ++++++++-----------
 4 files changed, 112 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a86873be/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index 93c843b..ea0f0f7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -163,8 +163,9 @@ public final class ProducerBatch {
      * @param baseOffset The base offset of the messages assigned by the server
      * @param logAppendTime The log append time or -1 if CreateTime is being used
      * @param exception The exception that occurred (or null if the request was successful)
+     * @return true if the batch was completed successfully and false if the batch was previously aborted
      */
-    public void done(long baseOffset, long logAppendTime, RuntimeException exception) {
+    public boolean done(long baseOffset, long logAppendTime, RuntimeException exception) {
         final FinalState finalState;
         if (exception == null) {
             log.trace("Successfully produced messages to {} with base offset {}.", topicPartition, baseOffset);
@@ -177,13 +178,14 @@ public final class ProducerBatch {
         if (!this.finalState.compareAndSet(null, finalState)) {
             if (this.finalState.get() == FinalState.ABORTED) {
                 log.debug("ProduceResponse returned for {} after batch had already been aborted.", topicPartition);
-                return;
+                return false;
             } else {
                 throw new IllegalStateException("Batch has already been completed in final state " + this.finalState.get());
             }
         }
 
         completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);
+        return true;
     }
 
     private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a86873be/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index a15a250..8b5780b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -181,6 +181,7 @@ public class Sender implements Runnable {
         if (forceClose) {
             // We need to fail all the incomplete batches and wake up the threads waiting on
             // the futures.
+            log.debug("Aborting incomplete batches due to forced shutdown");
             this.accumulator.abortIncompleteBatches();
         }
         try {
@@ -587,8 +588,8 @@ public class Sender implements Runnable {
             transactionManager.removeInFlightBatch(batch);
         }
 
-        batch.done(response.baseOffset, response.logAppendTime, null);
-        this.accumulator.deallocate(batch);
+        if (batch.done(response.baseOffset, response.logAppendTime, null))
+            this.accumulator.deallocate(batch);
     }
 
     private void failBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, RuntimeException exception, boolean adjustSequenceNumbers) {
@@ -623,8 +624,8 @@ public class Sender implements Runnable {
 
         this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
 
-        batch.done(baseOffset, logAppendTime, exception);
-        this.accumulator.deallocate(batch);
+        if (batch.done(baseOffset, logAppendTime, exception))
+            this.accumulator.deallocate(batch);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/a86873be/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
index 41aa5c6..2f89d79 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.producer.internals;
 
+import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
@@ -33,7 +34,6 @@ import org.junit.Test;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Deque;
-import java.util.Iterator;
 import java.util.concurrent.ExecutionException;
 
 import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V0;
@@ -64,15 +64,20 @@ public class ProducerBatchTest {
     @Test
     public void testBatchAbort() throws Exception {
         ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now);
-        FutureRecordMetadata future = batch.tryAppend(now, null, new byte[10], Record.EMPTY_HEADERS, null, now);
+        MockCallback callback = new MockCallback();
+        FutureRecordMetadata future = batch.tryAppend(now, null, new byte[10], Record.EMPTY_HEADERS, callback, now);
 
         KafkaException exception = new KafkaException();
         batch.abort(exception);
         assertTrue(future.isDone());
+        assertEquals(1, callback.invocations);
+        assertEquals(exception, callback.exception);
+        assertNull(callback.metadata);
 
         // subsequent completion should be ignored
-        batch.done(500L, 2342342341L, null);
-        batch.done(-1, -1, new KafkaException());
+        assertFalse(batch.done(500L, 2342342341L, null));
+        assertFalse(batch.done(-1, -1, new KafkaException()));
+        assertEquals(1, callback.invocations);
 
         assertTrue(future.isDone());
         try {
@@ -86,9 +91,13 @@ public class ProducerBatchTest {
     @Test
     public void testBatchCannotAbortTwice() throws Exception {
         ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now);
-        FutureRecordMetadata future = batch.tryAppend(now, null, new byte[10], Record.EMPTY_HEADERS, null, now);
+        MockCallback callback = new MockCallback();
+        FutureRecordMetadata future = batch.tryAppend(now, null, new byte[10], Record.EMPTY_HEADERS, callback, now);
         KafkaException exception = new KafkaException();
         batch.abort(exception);
+        assertEquals(1, callback.invocations);
+        assertEquals(exception, callback.exception);
+        assertNull(callback.metadata);
 
         try {
             batch.abort(new KafkaException());
@@ -97,6 +106,7 @@ public class ProducerBatchTest {
             // expected
         }
 
+        assertEquals(1, callback.invocations);
         assertTrue(future.isDone());
         try {
             future.get();
@@ -109,8 +119,12 @@ public class ProducerBatchTest {
     @Test
     public void testBatchCannotCompleteTwice() throws Exception {
         ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now);
-        FutureRecordMetadata future = batch.tryAppend(now, null, new byte[10], Record.EMPTY_HEADERS, null, now);
+        MockCallback callback = new MockCallback();
+        FutureRecordMetadata future = batch.tryAppend(now, null, new byte[10], Record.EMPTY_HEADERS, callback, now);
         batch.done(500L, 10L, null);
+        assertEquals(1, callback.invocations);
+        assertNull(callback.exception);
+        assertNotNull(callback.metadata);
 
         try {
             batch.done(1000L, 20L, null);
@@ -166,9 +180,7 @@ public class ProducerBatchTest {
 
             for (ProducerBatch splitProducerBatch : batches) {
                 for (RecordBatch splitBatch : splitProducerBatch.records().batches()) {
-                    Iterator<Record> iter = splitBatch.iterator();
-                    while (iter.hasNext()) {
-                        Record record = iter.next();
+                    for (Record record : splitBatch) {
                         assertTrue("Header size should be 1.", record.headers().length == 1);
                         assertTrue("Header key should be 'header-key'.", record.headers()[0].key().equals("header-key"));
                         assertTrue("Header value should be 'header-value'.", new String(record.headers()[0].value()).equals("header-value"));
@@ -260,4 +272,18 @@ public class ProducerBatchTest {
         assertFalse(memoryRecordsBuilder.hasRoomFor(now, null, new byte[10], Record.EMPTY_HEADERS));
         assertEquals(null, batch.tryAppend(now + 1, null, new byte[10], Record.EMPTY_HEADERS, null, now + 1));
     }
+
+    private static class MockCallback implements Callback {
+        private int invocations = 0;
+        private RecordMetadata metadata;
+        private Exception exception;
+
+        @Override
+        public void onCompletion(RecordMetadata metadata, Exception exception) {
+            invocations++;
+            this.metadata = metadata;
+            this.exception = exception;
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a86873be/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 3a92e63..a32688b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.MetricNameTemplate;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.NetworkException;
 import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.TimeoutException;
@@ -93,7 +94,6 @@ public class SenderTest {
 
     private static final int MAX_REQUEST_SIZE = 1024 * 1024;
     private static final short ACKS_ALL = -1;
-    private static final int MAX_RETRIES = 0;
     private static final String CLIENT_ID = "clientId";
     private static final double EPS = 0.0001;
     private static final int MAX_BLOCK_TIMEOUT = 1000;
@@ -349,7 +349,7 @@ public class SenderTest {
                 sender.run(time.milliseconds()); // resend
             }
             sender.run(time.milliseconds());
-            completedWithError(future, Errors.NETWORK_EXCEPTION);
+            assertFutureFailure(future, NetworkException.class);
         } finally {
             m.close();
         }
@@ -713,14 +713,7 @@ public class SenderTest {
         sendIdempotentProducerResponse(0, tp0, Errors.MESSAGE_TOO_LARGE, -1L);
 
         sender.run(time.milliseconds()); // receive response 0, should adjust sequences of future batches.
-
-        assertTrue(request1.isDone());
-        try {
-            request1.get();
-            fail("Should have raised an error");
-        } catch (Exception e) {
-            assertTrue(e.getCause() instanceof RecordTooLargeException);
-        }
+        assertFutureFailure(request1, RecordTooLargeException.class);
 
         assertEquals(1, client.inFlightRequestCount());
         assertEquals(-1, transactionManager.lastAckedSequence(tp0));
@@ -787,14 +780,7 @@ public class SenderTest {
         sendIdempotentProducerResponse(2, tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L);
 
         sender.run(time.milliseconds());
-        assertTrue(request2.isDone());
-
-        try {
-            request2.get();
-            fail("Expected an OutOfOrderSequenceException");
-        } catch (ExecutionException e) {
-            assert e.getCause() instanceof OutOfOrderSequenceException;
-        }
+        assertFutureFailure(request2, OutOfOrderSequenceException.class);
     }
 
     @Test
@@ -966,13 +952,7 @@ public class SenderTest {
 
         sender.run(time.milliseconds());
 
-        assertTrue(request1.isDone());
-        try {
-            request1.get();
-            fail("Should have raised timeout exception");
-        } catch (ExecutionException e) {
-            assertTrue(e.getCause() instanceof TimeoutException);
-        }
+        assertFutureFailure(request1, TimeoutException.class);
         assertFalse(transactionManager.hasUnresolvedSequence(tp0));
     }
 
@@ -1004,13 +984,7 @@ public class SenderTest {
         client.blackout(node, 10);
 
         sender.run(time.milliseconds()); // now expire the first batch.
-        assertTrue(request1.isDone());
-        try {
-            request1.get();
-            fail("Should have raised timeout exception");
-        } catch (ExecutionException e) {
-            assertTrue(e.getCause() instanceof TimeoutException);
-        }
+        assertFutureFailure(request1, TimeoutException.class);
         assertTrue(transactionManager.hasUnresolvedSequence(tp0));
         // let's enqueue another batch, which should not be dequeued until the unresolved state is clear.
         Future<RecordMetadata> request3 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
@@ -1068,13 +1042,7 @@ public class SenderTest {
         client.blackout(node, 10);
 
         sender.run(time.milliseconds()); // now expire the first batch.
-        assertTrue(request1.isDone());
-        try {
-            request1.get();
-            fail("Should have raised timeout exception");
-        } catch (ExecutionException e) {
-            assertTrue(e.getCause() instanceof TimeoutException);
-        }
+        assertFutureFailure(request1, TimeoutException.class);
         assertTrue(transactionManager.hasUnresolvedSequence(tp0));
         // let's enqueue another batch, which should not be dequeued until the unresolved state is clear.
         Future<RecordMetadata> request3 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT).future;
@@ -1086,14 +1054,7 @@ public class SenderTest {
         sender.run(time.milliseconds());  // send second request
         sendIdempotentProducerResponse(1, tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 1);
         sender.run(time.milliseconds()); // receive second response, the third request shouldn't be sent since we are in an unresolved state.
-        assertTrue(request2.isDone());
-
-        try {
-            request2.get();
-            fail("should have failed with an exception");
-        } catch (Exception e) {
-            assertTrue(e.getCause() instanceof OutOfOrderSequenceException);
-        }
+        assertFutureFailure(request2, OutOfOrderSequenceException.class);
 
         Deque<ProducerBatch> batches = accumulator.batches().get(tp0);
 
@@ -1132,13 +1093,7 @@ public class SenderTest {
 
         sender.run(time.milliseconds()); // now expire the batch.
 
-        assertTrue(request1.isDone());
-        try {
-            request1.get();
-            fail("Should have raised timeout exception");
-        } catch (ExecutionException e) {
-            assertTrue(e.getCause() instanceof TimeoutException);
-        }
+        assertFutureFailure(request1, TimeoutException.class);
         assertTrue(transactionManager.hasUnresolvedSequence(tp0));
         assertFalse(client.hasInFlightRequests());
         Deque<ProducerBatch> batches = accumulator.batches().get(tp0);
@@ -1444,14 +1399,7 @@ public class SenderTest {
 
         sendIdempotentProducerResponse(1, tp0, Errors.UNKNOWN_PRODUCER_ID, -1L, 10L);
         sender.run(time.milliseconds()); // receive response 0, should cause a producerId reset since the logStartOffset < lastAckedOffset
-
-        assertTrue(request2.isDone());
-        try {
-            request2.get();
-            fail("Should have raised an OutOfOrderSequenceException");
-        } catch (Exception e) {
-            assertTrue(e.getCause() instanceof OutOfOrderSequenceException);
-        }
+        assertFutureFailure(request2, OutOfOrderSequenceException.class);
 
     }
     void sendIdempotentProducerResponse(int expectedSequence, TopicPartition tp, Errors responseError, long responseOffset) {
@@ -1497,13 +1445,7 @@ public class SenderTest {
         }, produceResponse(tp0, -1, Errors.CLUSTER_AUTHORIZATION_FAILED, 0));
 
         sender.run(time.milliseconds());
-        assertTrue(future.isDone());
-        try {
-            future.get();
-            fail("Future should have raised ClusterAuthorizationException");
-        } catch (ExecutionException e) {
-            assertTrue(e.getCause() instanceof ClusterAuthorizationException);
-        }
+        assertFutureFailure(future, ClusterAuthorizationException.class);
 
         // cluster authorization errors are fatal, so we should continue seeing it on future sends
         assertTrue(transactionManager.hasFatalError());
@@ -1511,6 +1453,49 @@ public class SenderTest {
     }
 
     @Test
+    public void testCancelInFlightRequestAfterFatalError() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = new TransactionManager();
+        setupWithTransactionState(transactionManager);
+
+        client.setNode(new Node(1, "localhost", 33343));
+        prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+        assertTrue(transactionManager.hasProducerId());
+
+        // cluster authorization is a fatal error for the producer
+        Future<RecordMetadata> future1 = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(),
+                null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+
+        Future<RecordMetadata> future2 = accumulator.append(tp1, time.milliseconds(), "key".getBytes(), "value".getBytes(),
+                null, null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+
+        client.respond(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                return body instanceof ProduceRequest && ((ProduceRequest) body).isIdempotent();
+            }
+        }, produceResponse(tp0, -1, Errors.CLUSTER_AUTHORIZATION_FAILED, 0));
+
+        sender.run(time.milliseconds());
+        assertTrue(transactionManager.hasFatalError());
+        assertFutureFailure(future1, ClusterAuthorizationException.class);
+
+        sender.run(time.milliseconds());
+        assertFutureFailure(future2, ClusterAuthorizationException.class);
+
+        // Should be fine if the second response eventually returns
+        client.respond(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                return body instanceof ProduceRequest && ((ProduceRequest) body).isIdempotent();
+            }
+        }, produceResponse(tp1, 0, Errors.NONE, 0));
+        sender.run(time.milliseconds());
+    }
+
+    @Test
     public void testUnsupportedForMessageFormatInProduceRequest() throws Exception {
         final long producerId = 343434L;
         TransactionManager transactionManager = new TransactionManager();
@@ -1530,13 +1515,7 @@ public class SenderTest {
         }, produceResponse(tp0, -1, Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, 0));
 
         sender.run(time.milliseconds());
-        assertTrue(future.isDone());
-        try {
-            future.get();
-            fail("Future should have raised UnsupportedForMessageFormat");
-        } catch (ExecutionException e) {
-            assertTrue(e.getCause() instanceof UnsupportedForMessageFormatException);
-        }
+        assertFutureFailure(future, UnsupportedForMessageFormatException.class);
 
         // unsupported for message format is not a fatal error
         assertFalse(transactionManager.hasError());
@@ -1562,13 +1541,7 @@ public class SenderTest {
         });
 
         sender.run(time.milliseconds());
-        assertTrue(future.isDone());
-        try {
-            future.get();
-            fail("Future should have raised UnsupportedVersionException");
-        } catch (ExecutionException e) {
-            assertTrue(e.getCause() instanceof UnsupportedVersionException);
-        }
+        assertFutureFailure(future, UnsupportedVersionException.class);
 
         // unsupported version errors are fatal, so we should continue seeing it on future sends
         assertTrue(transactionManager.hasFatalError());
@@ -1826,16 +1799,6 @@ public class SenderTest {
         };
     }
 
-    private void completedWithError(Future<RecordMetadata> future, Errors error) throws Exception {
-        assertTrue("Request should be completed", future.isDone());
-        try {
-            future.get();
-            fail("Should have thrown an exception.");
-        } catch (ExecutionException e) {
-            assertEquals(error.exception().getClass(), e.getCause().getClass());
-        }
-    }
-
     private ProduceResponse produceResponse(TopicPartition tp, long offset, Errors error, int throttleTimeMs, long logStartOffset) {
         ProduceResponse.PartitionResponse resp = new ProduceResponse.PartitionResponse(error, offset, RecordBatch.NO_TIMESTAMP, logStartOffset);
         Map<TopicPartition, ProduceResponse.PartitionResponse> partResp = Collections.singletonMap(tp, resp);
@@ -1906,4 +1869,16 @@ public class SenderTest {
         client.prepareResponse(new InitProducerIdResponse(0, error, pid, epoch));
     }
 
+    private void assertFutureFailure(Future<?> future, Class<? extends Exception> expectedExceptionType)
+            throws InterruptedException {
+        assertTrue(future.isDone());
+        try {
+            future.get();
+            fail("Future should have raised " + expectedExceptionType.getName());
+        } catch (ExecutionException e) {
+            Class<? extends Throwable> causeType = e.getCause().getClass();
+            assertTrue("Unexpected cause " + causeType.getName(), expectedExceptionType.isAssignableFrom(causeType));
+        }
+    }
+
 }