You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/31 09:28:25 UTC
kafka git commit: KAFKA-5349;
Fix illegal state error in consumer's ListOffset handler
Repository: kafka
Updated Branches:
refs/heads/trunk 31cc8885e -> aebba89a2
KAFKA-5349; Fix illegal state error in consumer's ListOffset handler
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Apurva Mehta <ap...@confluent.io>, Ismael Juma <is...@juma.me.uk>
Closes #3175 from hachikuji/KAFKA-5349
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aebba89a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aebba89a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aebba89a
Branch: refs/heads/trunk
Commit: aebba89a2b9b5ea6a7cab2599555232ef3fe21ad
Parents: 31cc888
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed May 31 10:23:39 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed May 31 10:27:16 2017 +0100
----------------------------------------------------------------------
.../kafka/clients/consumer/internals/Fetcher.java | 10 ++++++++--
.../clients/consumer/internals/FetcherTest.java | 18 +++++++++++++++++-
2 files changed, 25 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/aebba89a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index e3f2355..c2beff8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -680,7 +680,10 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
* Callback for the response of the list offset call above.
* @param timestampsToSearch The mapping from partitions to target timestamps
* @param listOffsetResponse The response from the server.
- * @param future The future to be completed by the response.
+ * @param future The future to be completed when the response returns. Note that any partition-level errors will
+ * generally fail the entire future result. The one exception is UNSUPPORTED_FOR_MESSAGE_FORMAT,
+ * which indicates that the broker does not support the v1 message format and results in a null
+ * being inserted into the resulting map.
*/
@SuppressWarnings("deprecation")
private void handleListOffsetResponse(Map<TopicPartition, Long> timestampsToSearch,
@@ -728,13 +731,16 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
log.debug("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.",
topicPartition);
future.raise(error);
+ return;
} else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
log.warn("Received unknown topic or partition error in ListOffset request for partition {}. The topic/partition " +
- "may not exist or the user may not have Describe access to it", topicPartition);
+ "may not exist or the user may not have Describe access to it.", topicPartition);
future.raise(error);
+ return;
} else {
log.warn("Attempt to fetch offsets for partition {} failed due to: {}", topicPartition, error.message());
future.raise(new StaleMetadataException());
+ return;
}
}
if (!future.isDone())
http://git-wip-us.apache.org/repos/asf/kafka/blob/aebba89a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index fedec2a..7d48623 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1332,6 +1332,22 @@ public class FetcherTest {
testGetOffsetsForTimesWithError(Errors.BROKER_NOT_AVAILABLE, Errors.NONE, 10L, 100L, 10L, 100L);
}
+ @Test(expected = TimeoutException.class)
+ public void testBatchedListOffsetsMetadataErrors() {
+ Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = new HashMap<>();
+ partitionData.put(tp1, new ListOffsetResponse.PartitionData(Errors.NOT_LEADER_FOR_PARTITION,
+ ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET));
+ partitionData.put(tp2, new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
+ ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET));
+ client.prepareResponse(new ListOffsetResponse(0, partitionData));
+
+ Map<TopicPartition, Long> offsetsToSearch = new HashMap<>();
+ offsetsToSearch.put(tp1, ListOffsetRequest.EARLIEST_TIMESTAMP);
+ offsetsToSearch.put(tp2, ListOffsetRequest.EARLIEST_TIMESTAMP);
+
+ fetcher.getOffsetsByTimes(offsetsToSearch, 0);
+ }
+
@Test
public void testSkippingAbortedTransactions() {
Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(), new ByteArrayDeserializer(),
@@ -1343,7 +1359,7 @@ public class FetcherTest {
new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()),
new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()));
- currentOffset += abortTransaction(buffer, 1L, currentOffset);
+ abortTransaction(buffer, 1L, currentOffset);
buffer.flip();