You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jq...@apache.org on 2017/04/17 06:23:31 UTC
kafka git commit: KAFKA-5075;
Defer exception to the next pollOnce() if consumer's fetch position
has already increased
Repository: kafka
Updated Branches:
refs/heads/trunk 148f8c254 -> 17ce2a730
KAFKA-5075; Defer exception to the next pollOnce() if consumer's fetch position has already increased
Author: Dong Lin <li...@gmail.com>
Author: Dong Lin <li...@users.noreply.github.com>
Reviewers: Jiangjie Qin <be...@gmail.com>
Closes #2859 from lindong28/KAFKA-5075
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/17ce2a73
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/17ce2a73
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/17ce2a73
Branch: refs/heads/trunk
Commit: 17ce2a7307222d7476eb60318fab2e672eebe559
Parents: 148f8c2
Author: Dong Lin <li...@gmail.com>
Authored: Sun Apr 16 23:23:25 2017 -0700
Committer: Jiangjie Qin <be...@gmail.com>
Committed: Sun Apr 16 23:23:25 2017 -0700
----------------------------------------------------------------------
.../clients/consumer/internals/Fetcher.java | 81 ++++++++++++++------
.../clients/consumer/internals/FetcherTest.java | 77 +++++++++++++++++++
.../kafka/api/AuthorizerIntegrationTest.scala | 2 +
3 files changed, 135 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/17ce2a73/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 2eeef11..6b0adc7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -99,8 +99,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
private final ConcurrentLinkedQueue<CompletedFetch> completedFetches;
private final Deserializer<K> keyDeserializer;
private final Deserializer<V> valueDeserializer;
-
private PartitionRecords nextInLineRecords = null;
+ private ExceptionMetadata nextInLineExceptionMetadata = null;
public Fetcher(ConsumerNetworkClient client,
int minBytes,
@@ -467,37 +467,55 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
* the defaultResetPolicy is NONE
*/
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
+ if (nextInLineExceptionMetadata != null) {
+ ExceptionMetadata exceptionMetadata = nextInLineExceptionMetadata;
+ nextInLineExceptionMetadata = null;
+ TopicPartition tp = exceptionMetadata.partition;
+ if (subscriptions.isFetchable(tp) && subscriptions.position(tp) == exceptionMetadata.fetchedOffset)
+ throw exceptionMetadata.exception;
+ }
Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();
int recordsRemaining = maxPollRecords;
+ // Needed to construct ExceptionMetadata if any exception is found when processing completedFetch
+ TopicPartition fetchedPartition = null;
+ long fetchedOffset = -1;
- while (recordsRemaining > 0) {
- if (nextInLineRecords == null || nextInLineRecords.isFetched) {
- CompletedFetch completedFetch = completedFetches.poll();
- if (completedFetch == null)
- break;
-
- nextInLineRecords = parseCompletedFetch(completedFetch);
- } else {
- TopicPartition partition = nextInLineRecords.partition;
- List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining);
- if (!records.isEmpty()) {
- List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition);
- if (currentRecords == null) {
- fetched.put(partition, records);
- } else {
- // this case shouldn't usually happen because we only send one fetch at a time per partition,
- // but it might conceivably happen in some rare cases (such as partition leader changes).
- // we have to copy to a new list because the old one may be immutable
- List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
- newRecords.addAll(currentRecords);
- newRecords.addAll(records);
- fetched.put(partition, newRecords);
+ try {
+ while (recordsRemaining > 0) {
+ if (nextInLineRecords == null || nextInLineRecords.isFetched) {
+ CompletedFetch completedFetch = completedFetches.poll();
+ if (completedFetch == null) break;
+
+ fetchedPartition = completedFetch.partition;
+ fetchedOffset = completedFetch.fetchedOffset;
+ nextInLineRecords = parseCompletedFetch(completedFetch);
+ } else {
+ fetchedPartition = nextInLineRecords.partition;
+ fetchedOffset = nextInLineRecords.nextFetchOffset;
+ List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining);
+ TopicPartition partition = nextInLineRecords.partition;
+ if (!records.isEmpty()) {
+ List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition);
+ if (currentRecords == null) {
+ fetched.put(partition, records);
+ } else {
+ // this case shouldn't usually happen because we only send one fetch at a time per partition,
+ // but it might conceivably happen in some rare cases (such as partition leader changes).
+ // we have to copy to a new list because the old one may be immutable
+ List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
+ newRecords.addAll(currentRecords);
+ newRecords.addAll(records);
+ fetched.put(partition, newRecords);
+ }
+ recordsRemaining -= records.size();
}
- recordsRemaining -= records.size();
}
}
+ } catch (KafkaException e) {
+ if (fetched.isEmpty())
+ throw e;
+ nextInLineExceptionMetadata = new ExceptionMetadata(fetchedPartition, fetchedOffset, e);
}
-
return fetched;
}
@@ -969,6 +987,18 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
}
}
+ private static class ExceptionMetadata {
+ private final TopicPartition partition;
+ private final long fetchedOffset;
+ private final KafkaException exception;
+
+ private ExceptionMetadata(TopicPartition partition, long fetchedOffset, KafkaException exception) {
+ this.partition = partition;
+ this.fetchedOffset = fetchedOffset;
+ this.exception = exception;
+ }
+ }
+
private static class CompletedFetch {
private final TopicPartition partition;
private final long fetchedOffset;
@@ -1189,6 +1219,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
public void close() {
if (nextInLineRecords != null)
nextInLineRecords.drain();
+ nextInLineExceptionMetadata = null;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/17ce2a73/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 092f549..b8f493a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -593,6 +593,83 @@ public class FetcherTest {
}
@Test
+ public void testFetchPositionAfterException() {
+ // verify the advancement in the next fetch offset equals the number of fetched records when
+ // some fetched partitions cause Exception. This ensures that consumer won't lose record upon exception
+ subscriptionsNoAutoReset.assignFromUser(Utils.mkSet(tp1, tp2));
+ subscriptionsNoAutoReset.seek(tp1, 1);
+ subscriptionsNoAutoReset.seek(tp2, 1);
+
+ assertEquals(1, fetcherNoAutoReset.sendFetches());
+
+ Map<TopicPartition, FetchResponse.PartitionData> partitions = new HashMap<>();
+ partitions.put(tp1, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100,
+ FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY));
+ partitions.put(tp2, new FetchResponse.PartitionData(Errors.NONE, 100,
+ FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, records));
+ client.prepareResponse(new FetchResponse(new LinkedHashMap<>(partitions), 0));
+ consumerClient.poll(0);
+
+ List<ConsumerRecord<byte[], byte[]>> fetchedRecords = new ArrayList<>();
+ List<OffsetOutOfRangeException> exceptions = new ArrayList<>();
+
+ try {
+ for (List<ConsumerRecord<byte[], byte[]>> records: fetcherNoAutoReset.fetchedRecords().values())
+ fetchedRecords.addAll(records);
+ } catch (OffsetOutOfRangeException e) {
+ exceptions.add(e);
+ }
+
+ assertEquals(fetchedRecords.size(), subscriptionsNoAutoReset.position(tp2).longValue() - 1);
+
+ try {
+ for (List<ConsumerRecord<byte[], byte[]>> records: fetcherNoAutoReset.fetchedRecords().values())
+ fetchedRecords.addAll(records);
+ } catch (OffsetOutOfRangeException e) {
+ exceptions.add(e);
+ }
+
+ assertEquals(4, subscriptionsNoAutoReset.position(tp2).longValue());
+ assertEquals(3, fetchedRecords.size());
+
+ // Should have received one OffsetOutOfRangeException for partition tp1
+ assertEquals(1, exceptions.size());
+ OffsetOutOfRangeException e = exceptions.get(0);
+ assertTrue(e.offsetOutOfRangePartitions().containsKey(tp1));
+ assertEquals(e.offsetOutOfRangePartitions().size(), 1);
+ }
+
+ @Test
+ public void testSeekBeforeException() {
+ Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptionsNoAutoReset, new Metrics(time), 2);
+
+ subscriptionsNoAutoReset.assignFromUser(Utils.mkSet(tp1));
+ subscriptionsNoAutoReset.seek(tp1, 1);
+ assertEquals(1, fetcher.sendFetches());
+ Map<TopicPartition, FetchResponse.PartitionData> partitions = new HashMap<>();
+ partitions.put(tp1, new FetchResponse.PartitionData(Errors.NONE, 100,
+ FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, records));
+ client.prepareResponse(fetchResponse(this.records, Errors.NONE, 100L, 0));
+ consumerClient.poll(0);
+
+ assertEquals(2, fetcher.fetchedRecords().get(tp1).size());
+
+ subscriptionsNoAutoReset.assignFromUser(Utils.mkSet(tp1, tp2));
+ subscriptionsNoAutoReset.seek(tp2, 1);
+ assertEquals(1, fetcher.sendFetches());
+ partitions = new HashMap<>();
+ partitions.put(tp2, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100,
+ FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY));
+ client.prepareResponse(new FetchResponse(new LinkedHashMap<>(partitions), 0));
+ consumerClient.poll(0);
+ assertEquals(1, fetcher.fetchedRecords().get(tp1).size());
+
+ subscriptionsNoAutoReset.seek(tp2, 10);
+ // Should not throw OffsetOutOfRangeException after the seek
+ assertEquals(0, fetcher.fetchedRecords().size());
+ }
+
+ @Test
public void testFetchDisconnected() {
subscriptions.assignFromUser(singleton(tp1));
subscriptions.seek(tp1, 0);
http://git-wip-us.apache.org/repos/asf/kafka/blob/17ce2a73/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 1300629..757e216 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -614,6 +614,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(consumerConfig))
try {
consumer.subscribe(Pattern.compile(".*"), new NoOpConsumerRebalanceListener)
+ // It is possible that the first call returns records of "topic" and the second call throws TopicAuthorizationException
+ consumeRecords(consumer)
consumeRecords(consumer)
Assert.fail("Expected TopicAuthorizationException")
} catch {