You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/10/08 01:44:46 UTC
kafka git commit: KAFKA-4274;
Consumer `offsetForTimes` times out on empty map
Repository: kafka
Updated Branches:
refs/heads/trunk e876df8b3 -> 6b91f83fb
KAFKA-4274; Consumer `offsetForTimes` times out on empty map
Author: Jiangjie Qin <be...@gmail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>
Closes #1993 from becketqin/KAFKA-4274
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6b91f83f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6b91f83f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6b91f83f
Branch: refs/heads/trunk
Commit: 6b91f83fbaef26fefb9ec221529c29872a04c004
Parents: e876df8
Author: Jiangjie Qin <be...@gmail.com>
Authored: Fri Oct 7 18:43:24 2016 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Oct 7 18:43:24 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/kafka/clients/consumer/internals/Fetcher.java | 3 +++
.../org/apache/kafka/clients/consumer/internals/FetcherTest.java | 2 ++
core/src/main/scala/kafka/api/ApiVersion.scala | 2 +-
3 files changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/6b91f83f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 17ab398..9e9ae92 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -362,6 +362,9 @@ public class Fetcher<K, V> {
public Map<TopicPartition, OffsetAndTimestamp> getOffsetsByTimes(Map<TopicPartition, Long> timestampsToSearch,
long timeout) {
+ if (timestampsToSearch.isEmpty())
+ return Collections.emptyMap();
+
long startMs = time.milliseconds();
long remaining = timeout;
do {
http://git-wip-us.apache.org/repos/asf/kafka/blob/6b91f83f/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 4f56796..faf6efa 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -648,6 +648,8 @@ public class FetcherTest {
@Test
public void testGetOffsetsForTimes() {
+ // Empty map
+ assertTrue(fetcher.getOffsetsByTimes(new HashMap<TopicPartition, Long>(), 100L).isEmpty());
// Error code none with unknown offset
testGetOffsetsForTimesWithError(Errors.NONE, Errors.NONE, -1L, 100L, null, 100L);
// Error code none with known offset
http://git-wip-us.apache.org/repos/asf/kafka/blob/6b91f83f/core/src/main/scala/kafka/api/ApiVersion.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index 0d9775a..895c1b1 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -57,7 +57,7 @@ object ApiVersion {
"0.10.1-IV0" -> KAFKA_0_10_1_IV0,
// 0.10.1-IV1 is introduced for KIP-74(fetch response size limit).
"0.10.1-IV1" -> KAFKA_0_10_1_IV1,
- // introduced ListGroupRequest v1 in KIP-79
+ // introduced ListOffsetRequest v1 in KIP-79
"0.10.1-IV2" -> KAFKA_0_10_1_IV2,
"0.10.1" -> KAFKA_0_10_1_IV2