You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jq...@apache.org on 2017/04/17 06:23:31 UTC

kafka git commit: KAFKA-5075; Defer exception to the next pollOnce() if consumer's fetch position has already increased

Repository: kafka
Updated Branches:
  refs/heads/trunk 148f8c254 -> 17ce2a730


KAFKA-5075; Defer exception to the next pollOnce() if consumer's fetch position has already increased

Author: Dong Lin <li...@gmail.com>
Author: Dong Lin <li...@users.noreply.github.com>

Reviewers: Jiangjie Qin <be...@gmail.com>

Closes #2859 from lindong28/KAFKA-5075


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/17ce2a73
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/17ce2a73
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/17ce2a73

Branch: refs/heads/trunk
Commit: 17ce2a7307222d7476eb60318fab2e672eebe559
Parents: 148f8c2
Author: Dong Lin <li...@gmail.com>
Authored: Sun Apr 16 23:23:25 2017 -0700
Committer: Jiangjie Qin <be...@gmail.com>
Committed: Sun Apr 16 23:23:25 2017 -0700

----------------------------------------------------------------------
 .../clients/consumer/internals/Fetcher.java     | 81 ++++++++++++++------
 .../clients/consumer/internals/FetcherTest.java | 77 +++++++++++++++++++
 .../kafka/api/AuthorizerIntegrationTest.scala   |  2 +
 3 files changed, 135 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/17ce2a73/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 2eeef11..6b0adc7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -99,8 +99,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
     private final ConcurrentLinkedQueue<CompletedFetch> completedFetches;
     private final Deserializer<K> keyDeserializer;
     private final Deserializer<V> valueDeserializer;
-
     private PartitionRecords nextInLineRecords = null;
+    private ExceptionMetadata nextInLineExceptionMetadata = null;
 
     public Fetcher(ConsumerNetworkClient client,
                    int minBytes,
@@ -467,37 +467,55 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
      *         the defaultResetPolicy is NONE
      */
     public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
+        if (nextInLineExceptionMetadata != null) {
+            ExceptionMetadata exceptionMetadata = nextInLineExceptionMetadata;
+            nextInLineExceptionMetadata = null;
+            TopicPartition tp = exceptionMetadata.partition;
+            if (subscriptions.isFetchable(tp) && subscriptions.position(tp) == exceptionMetadata.fetchedOffset)
+                throw exceptionMetadata.exception;
+        }
         Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();
         int recordsRemaining = maxPollRecords;
+        // Needed to construct ExceptionMetadata if any exception is found when processing completedFetch
+        TopicPartition fetchedPartition = null;
+        long fetchedOffset = -1;
 
-        while (recordsRemaining > 0) {
-            if (nextInLineRecords == null || nextInLineRecords.isFetched) {
-                CompletedFetch completedFetch = completedFetches.poll();
-                if (completedFetch == null)
-                    break;
-
-                nextInLineRecords = parseCompletedFetch(completedFetch);
-            } else {
-                TopicPartition partition = nextInLineRecords.partition;
-                List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining);
-                if (!records.isEmpty()) {
-                    List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition);
-                    if (currentRecords == null) {
-                        fetched.put(partition, records);
-                    } else {
-                        // this case shouldn't usually happen because we only send one fetch at a time per partition,
-                        // but it might conceivably happen in some rare cases (such as partition leader changes).
-                        // we have to copy to a new list because the old one may be immutable
-                        List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
-                        newRecords.addAll(currentRecords);
-                        newRecords.addAll(records);
-                        fetched.put(partition, newRecords);
+        try {
+            while (recordsRemaining > 0) {
+                if (nextInLineRecords == null || nextInLineRecords.isFetched) {
+                    CompletedFetch completedFetch = completedFetches.poll();
+                    if (completedFetch == null) break;
+
+                    fetchedPartition = completedFetch.partition;
+                    fetchedOffset = completedFetch.fetchedOffset;
+                    nextInLineRecords = parseCompletedFetch(completedFetch);
+                } else {
+                    fetchedPartition = nextInLineRecords.partition;
+                    fetchedOffset = nextInLineRecords.nextFetchOffset;
+                    List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining);
+                    TopicPartition partition = nextInLineRecords.partition;
+                    if (!records.isEmpty()) {
+                        List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition);
+                        if (currentRecords == null) {
+                            fetched.put(partition, records);
+                        } else {
+                            // this case shouldn't usually happen because we only send one fetch at a time per partition,
+                            // but it might conceivably happen in some rare cases (such as partition leader changes).
+                            // we have to copy to a new list because the old one may be immutable
+                            List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
+                            newRecords.addAll(currentRecords);
+                            newRecords.addAll(records);
+                            fetched.put(partition, newRecords);
+                        }
+                        recordsRemaining -= records.size();
                     }
-                    recordsRemaining -= records.size();
                 }
             }
+        } catch (KafkaException e) {
+            if (fetched.isEmpty())
+                throw e;
+            nextInLineExceptionMetadata = new ExceptionMetadata(fetchedPartition, fetchedOffset, e);
         }
-
         return fetched;
     }
 
@@ -969,6 +987,18 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
         }
     }
 
+    private static class ExceptionMetadata {
+        private final TopicPartition partition;
+        private final long fetchedOffset;
+        private final KafkaException exception;
+
+        private ExceptionMetadata(TopicPartition partition, long fetchedOffset, KafkaException exception) {
+            this.partition = partition;
+            this.fetchedOffset = fetchedOffset;
+            this.exception = exception;
+        }
+    }
+
     private static class CompletedFetch {
         private final TopicPartition partition;
         private final long fetchedOffset;
@@ -1189,6 +1219,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
     public void close() {
         if (nextInLineRecords != null)
             nextInLineRecords.drain();
+        nextInLineExceptionMetadata = null;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/17ce2a73/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 092f549..b8f493a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -593,6 +593,83 @@ public class FetcherTest {
     }
 
     @Test
+    public void testFetchPositionAfterException() {
+        // verify the advancement in the next fetch offset equals the number of fetched records when
+        // some fetched partitions cause Exception. This ensures that consumer won't lose record upon exception
+        subscriptionsNoAutoReset.assignFromUser(Utils.mkSet(tp1, tp2));
+        subscriptionsNoAutoReset.seek(tp1, 1);
+        subscriptionsNoAutoReset.seek(tp2, 1);
+
+        assertEquals(1, fetcherNoAutoReset.sendFetches());
+
+        Map<TopicPartition, FetchResponse.PartitionData> partitions = new HashMap<>();
+        partitions.put(tp1, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100,
+            FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY));
+        partitions.put(tp2, new FetchResponse.PartitionData(Errors.NONE, 100,
+            FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, records));
+        client.prepareResponse(new FetchResponse(new LinkedHashMap<>(partitions), 0));
+        consumerClient.poll(0);
+
+        List<ConsumerRecord<byte[], byte[]>> fetchedRecords = new ArrayList<>();
+        List<OffsetOutOfRangeException> exceptions = new ArrayList<>();
+
+        try {
+            for (List<ConsumerRecord<byte[], byte[]>> records: fetcherNoAutoReset.fetchedRecords().values())
+                fetchedRecords.addAll(records);
+        } catch (OffsetOutOfRangeException e) {
+            exceptions.add(e);
+        }
+
+        assertEquals(fetchedRecords.size(), subscriptionsNoAutoReset.position(tp2).longValue() - 1);
+
+        try {
+            for (List<ConsumerRecord<byte[], byte[]>> records: fetcherNoAutoReset.fetchedRecords().values())
+                fetchedRecords.addAll(records);
+        } catch (OffsetOutOfRangeException e) {
+            exceptions.add(e);
+        }
+
+        assertEquals(4, subscriptionsNoAutoReset.position(tp2).longValue());
+        assertEquals(3, fetchedRecords.size());
+
+        // Should have received one OffsetOutOfRangeException for partition tp1
+        assertEquals(1, exceptions.size());
+        OffsetOutOfRangeException e = exceptions.get(0);
+        assertTrue(e.offsetOutOfRangePartitions().containsKey(tp1));
+        assertEquals(e.offsetOutOfRangePartitions().size(), 1);
+    }
+
+    @Test
+    public void testSeekBeforeException() {
+        Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptionsNoAutoReset, new Metrics(time), 2);
+
+        subscriptionsNoAutoReset.assignFromUser(Utils.mkSet(tp1));
+        subscriptionsNoAutoReset.seek(tp1, 1);
+        assertEquals(1, fetcher.sendFetches());
+        Map<TopicPartition, FetchResponse.PartitionData> partitions = new HashMap<>();
+        partitions.put(tp1, new FetchResponse.PartitionData(Errors.NONE, 100,
+            FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, records));
+        client.prepareResponse(fetchResponse(this.records, Errors.NONE, 100L, 0));
+        consumerClient.poll(0);
+
+        assertEquals(2, fetcher.fetchedRecords().get(tp1).size());
+
+        subscriptionsNoAutoReset.assignFromUser(Utils.mkSet(tp1, tp2));
+        subscriptionsNoAutoReset.seek(tp2, 1);
+        assertEquals(1, fetcher.sendFetches());
+        partitions = new HashMap<>();
+        partitions.put(tp2, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100,
+            FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY));
+        client.prepareResponse(new FetchResponse(new LinkedHashMap<>(partitions), 0));
+        consumerClient.poll(0);
+        assertEquals(1, fetcher.fetchedRecords().get(tp1).size());
+
+        subscriptionsNoAutoReset.seek(tp2, 10);
+        // Should not throw OffsetOutOfRangeException after the seek
+        assertEquals(0, fetcher.fetchedRecords().size());
+    }
+
+    @Test
     public void testFetchDisconnected() {
         subscriptions.assignFromUser(singleton(tp1));
         subscriptions.seek(tp1, 0);

http://git-wip-us.apache.org/repos/asf/kafka/blob/17ce2a73/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 1300629..757e216 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -614,6 +614,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(consumerConfig))
     try {
       consumer.subscribe(Pattern.compile(".*"), new NoOpConsumerRebalanceListener)
+      // It is possible that the first call returns records of "topic" and the second call throws TopicAuthorizationException
+      consumeRecords(consumer)
       consumeRecords(consumer)
       Assert.fail("Expected TopicAuthorizationException")
     } catch {