You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/03/17 18:52:37 UTC
kafka git commit: KAFKA-3392: ConsumerRecords iterator throws
NoSuchElementException when a TopicPartition is empty
Repository: kafka
Updated Branches:
refs/heads/trunk 958e10c87 -> dce06766d
KAFKA-3392: ConsumerRecords iterator throws NoSuchElementException when a TopicPartition is empty
This contribution is my original work, and I license it under the project's open source license.
CC jkreps
Author: Drausin Wulsin <da...@gmail.com>
Author: John Doe <da...@gmail.com>
Reviewers: Jason Gustafson
Closes #1055 from drausin/bugfix/consumer-records-iterator
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dce06766
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dce06766
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dce06766
Branch: refs/heads/trunk
Commit: dce06766da245ca95951c9c7e82d6a113db7cb13
Parents: 958e10c
Author: Drausin Wulsin <da...@gmail.com>
Authored: Thu Mar 17 10:52:33 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Mar 17 10:52:33 2016 -0700
----------------------------------------------------------------------
.../kafka/clients/consumer/ConsumerRecords.java | 2 +-
.../clients/consumer/ConsumerRecordsTest.java | 58 ++++++++++++++++++++
2 files changed, 59 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/dce06766/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
index 3d7ec60..5b83f0c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
@@ -103,7 +103,7 @@ public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
Iterator<ConsumerRecord<K, V>> current;
public ConsumerRecord<K, V> makeNext() {
- if (current == null || !current.hasNext()) {
+ while (current == null || !current.hasNext()) {
if (iters.hasNext())
current = iters.next().iterator();
else
http://git-wip-us.apache.org/repos/asf/kafka/blob/dce06766/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java
new file mode 100644
index 0000000..d68a341
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.clients.consumer;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
+import org.junit.Test;
+
+public class ConsumerRecordsTest {
+
+ @Test
+ public void iterator() throws Exception {
+
+ Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = new LinkedHashMap<>();
+
+ String topic = "topic";
+ records.put(new TopicPartition(topic, 0), new ArrayList<ConsumerRecord<Integer, String>>());
+ ConsumerRecord<Integer, String> record1 = new ConsumerRecord<>(topic, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, "value1");
+ ConsumerRecord<Integer, String> record2 = new ConsumerRecord<>(topic, 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 2, "value2");
+ records.put(new TopicPartition(topic, 1), Arrays.asList(record1, record2));
+ records.put(new TopicPartition(topic, 2), new ArrayList<ConsumerRecord<Integer, String>>());
+
+ ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
+ Iterator<ConsumerRecord<Integer, String>> iter = consumerRecords.iterator();
+
+ int c = 0;
+ for (; iter.hasNext(); c++) {
+ ConsumerRecord<Integer, String> record = iter.next();
+ assertEquals(1, record.partition());
+ assertEquals(topic, record.topic());
+ assertEquals(c, record.offset());
+ }
+ assertEquals(2, c);
+ }
+}
\ No newline at end of file