You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2018/11/20 17:46:49 UTC

[kafka] branch trunk updated: KAFKA-7616; Make MockConsumer only add entries to the partition map returned by poll() if there are any records to return

This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1d4ca5a  KAFKA-7616; Make MockConsumer only add entries to the partition map returned by poll() if there are any records to return
1d4ca5a is described below

commit 1d4ca5adf360224940984d20d819a85c94d95144
Author: Stig Rohde Døssing <st...@gmail.com>
AuthorDate: Tue Nov 20 23:16:21 2018 +0530

    KAFKA-7616; Make MockConsumer only add entries to the partition map returned by poll() if there are any records to return
    
    …eturned by poll() if there are any records to return
    
    The MockConsumer behaves unlike the real consumer in that it can return a non-empty ConsumerRecords from poll, that also has a count of 0. This change makes the MockConsumer only add partitions to the ConsumerRecords if there are records to return for those partitions.
    
    A unit test in MockConsumerTest demonstrates the issue.
    
    Author: Stig Rohde Døssing <st...@gmail.com>
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>
    
    Closes #5901 from srdo/KAFKA-7616
---
 .../org/apache/kafka/clients/consumer/MockConsumer.java    |  5 +----
 .../apache/kafka/clients/consumer/MockConsumerTest.java    | 14 ++++++++++++++
 2 files changed, 15 insertions(+), 4 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index f1dcb32..e43c292 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -183,16 +183,13 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
 
         // update the consumed offset
         final Map<TopicPartition, List<ConsumerRecord<K, V>>> results = new HashMap<>();
-        for (final TopicPartition topicPartition : records.keySet()) {
-            results.put(topicPartition, new ArrayList<>());
-        }
 
         for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : this.records.entrySet()) {
             if (!subscriptions.isPaused(entry.getKey())) {
                 final List<ConsumerRecord<K, V>> recs = entry.getValue();
                 for (final ConsumerRecord<K, V> rec : recs) {
                     if (assignment().contains(entry.getKey()) && rec.offset() >= subscriptions.position(entry.getKey())) {
-                        results.get(entry.getKey()).add(rec);
+                        results.computeIfAbsent(entry.getKey(), partition -> new ArrayList<>()).add(rec);
                         subscriptions.position(entry.getKey(), rec.offset() + 1);
                     }
                 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
index 1d01eb6..03013e6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -26,8 +26,10 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 
+import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 
 public class MockConsumerTest {
     
@@ -84,4 +86,16 @@ public class MockConsumerTest {
         assertEquals(2L, consumer.committed(new TopicPartition("test", 0)).offset());
     }
 
+    @Test
+    public void testConsumerRecordsIsEmptyWhenReturningNoRecords() {
+        TopicPartition partition = new TopicPartition("test", 0);
+        consumer.assign(Collections.singleton(partition));
+        consumer.addRecord(new ConsumerRecord<String, String>("test", 0, 0, null, null));
+        consumer.updateEndOffsets(Collections.singletonMap(partition, 1L));
+        consumer.seekToEnd(Collections.singleton(partition));
+        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1));
+        assertThat(records.count(), is(0));
+        assertThat(records.isEmpty(), is(true));
+    }
+
 }