You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/31 09:28:25 UTC

kafka git commit: KAFKA-5349; Fix illegal state error in consumer's ListOffset handler

Repository: kafka
Updated Branches:
  refs/heads/trunk 31cc8885e -> aebba89a2


KAFKA-5349; Fix illegal state error in consumer's ListOffset handler

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Apurva Mehta <ap...@confluent.io>, Ismael Juma <is...@juma.me.uk>

Closes #3175 from hachikuji/KAFKA-5349


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aebba89a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aebba89a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aebba89a

Branch: refs/heads/trunk
Commit: aebba89a2b9b5ea6a7cab2599555232ef3fe21ad
Parents: 31cc888
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed May 31 10:23:39 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed May 31 10:27:16 2017 +0100

----------------------------------------------------------------------
 .../kafka/clients/consumer/internals/Fetcher.java | 10 ++++++++--
 .../clients/consumer/internals/FetcherTest.java   | 18 +++++++++++++++++-
 2 files changed, 25 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/aebba89a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index e3f2355..c2beff8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -680,7 +680,10 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
      * Callback for the response of the list offset call above.
      * @param timestampsToSearch The mapping from partitions to target timestamps
      * @param listOffsetResponse The response from the server.
-     * @param future The future to be completed by the response.
+     * @param future The future to be completed when the response returns. Note that any partition-level errors will
+     *               generally fail the entire future result. The one exception is UNSUPPORTED_FOR_MESSAGE_FORMAT,
+     *               which indicates that the broker does not support the v1 message format and results in a null
+     *               being inserted into the resulting map.
      */
     @SuppressWarnings("deprecation")
     private void handleListOffsetResponse(Map<TopicPartition, Long> timestampsToSearch,
@@ -728,13 +731,16 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
                 log.debug("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.",
                         topicPartition);
                 future.raise(error);
+                return;
             } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                 log.warn("Received unknown topic or partition error in ListOffset request for partition {}. The topic/partition " +
-                        "may not exist or the user may not have Describe access to it", topicPartition);
+                        "may not exist or the user may not have Describe access to it.", topicPartition);
                 future.raise(error);
+                return;
             } else {
                 log.warn("Attempt to fetch offsets for partition {} failed due to: {}", topicPartition, error.message());
                 future.raise(new StaleMetadataException());
+                return;
             }
         }
         if (!future.isDone())

http://git-wip-us.apache.org/repos/asf/kafka/blob/aebba89a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index fedec2a..7d48623 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1332,6 +1332,22 @@ public class FetcherTest {
         testGetOffsetsForTimesWithError(Errors.BROKER_NOT_AVAILABLE, Errors.NONE, 10L, 100L, 10L, 100L);
     }
 
+    @Test(expected = TimeoutException.class)
+    public void testBatchedListOffsetsMetadataErrors() {
+        Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = new HashMap<>();
+        partitionData.put(tp1, new ListOffsetResponse.PartitionData(Errors.NOT_LEADER_FOR_PARTITION,
+                ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET));
+        partitionData.put(tp2, new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
+                ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET));
+        client.prepareResponse(new ListOffsetResponse(0, partitionData));
+
+        Map<TopicPartition, Long> offsetsToSearch = new HashMap<>();
+        offsetsToSearch.put(tp1, ListOffsetRequest.EARLIEST_TIMESTAMP);
+        offsetsToSearch.put(tp2, ListOffsetRequest.EARLIEST_TIMESTAMP);
+
+        fetcher.getOffsetsByTimes(offsetsToSearch, 0);
+    }
+
     @Test
     public void testSkippingAbortedTransactions() {
         Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(), new ByteArrayDeserializer(),
@@ -1343,7 +1359,7 @@ public class FetcherTest {
                 new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()),
                 new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()));
 
-        currentOffset += abortTransaction(buffer, 1L, currentOffset);
+        abortTransaction(buffer, 1L, currentOffset);
 
         buffer.flip();