You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/03/17 18:52:37 UTC

kafka git commit: KAFKA-3392: ConsumerRecords iterator throws NoSuchElementException when a TopicPartition is empty

Repository: kafka
Updated Branches:
  refs/heads/trunk 958e10c87 -> dce06766d


KAFKA-3392: ConsumerRecords iterator throws NoSuchElementException when a TopicPartition is empty

This contribution is my original work, and I license it under the project's open source license.

CC jkreps

Author: Drausin Wulsin <da...@gmail.com>
Author: John Doe <da...@gmail.com>

Reviewers: Jason Gustafson

Closes #1055 from drausin/bugfix/consumer-records-iterator


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dce06766
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dce06766
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dce06766

Branch: refs/heads/trunk
Commit: dce06766da245ca95951c9c7e82d6a113db7cb13
Parents: 958e10c
Author: Drausin Wulsin <da...@gmail.com>
Authored: Thu Mar 17 10:52:33 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Mar 17 10:52:33 2016 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/ConsumerRecords.java |  2 +-
 .../clients/consumer/ConsumerRecordsTest.java   | 58 ++++++++++++++++++++
 2 files changed, 59 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/dce06766/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
index 3d7ec60..5b83f0c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
@@ -103,7 +103,7 @@ public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
                 Iterator<ConsumerRecord<K, V>> current;
 
                 public ConsumerRecord<K, V> makeNext() {
-                    if (current == null || !current.hasNext()) {
+                    while (current == null || !current.hasNext()) {
                         if (iters.hasNext())
                             current = iters.next().iterator();
                         else

http://git-wip-us.apache.org/repos/asf/kafka/blob/dce06766/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java
new file mode 100644
index 0000000..d68a341
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.clients.consumer;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
+import org.junit.Test;
+
+public class ConsumerRecordsTest {
+
+    @Test
+    public void iterator() throws Exception {
+
+        Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = new LinkedHashMap<>();
+
+        String topic = "topic";
+        records.put(new TopicPartition(topic, 0), new ArrayList<ConsumerRecord<Integer, String>>());
+        ConsumerRecord<Integer, String> record1 = new ConsumerRecord<>(topic, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, "value1");
+        ConsumerRecord<Integer, String> record2 = new ConsumerRecord<>(topic, 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 2, "value2");
+        records.put(new TopicPartition(topic, 1), Arrays.asList(record1, record2));
+        records.put(new TopicPartition(topic, 2), new ArrayList<ConsumerRecord<Integer, String>>());
+
+        ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
+        Iterator<ConsumerRecord<Integer, String>> iter = consumerRecords.iterator();
+
+        int c = 0;
+        for (; iter.hasNext(); c++) {
+            ConsumerRecord<Integer, String> record = iter.next();
+            assertEquals(1, record.partition());
+            assertEquals(topic, record.topic());
+            assertEquals(c, record.offset());
+        }
+        assertEquals(2, c);
+    }
+}
\ No newline at end of file