You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2015/12/09 02:49:40 UTC

kafka git commit: KAFKA-2667; Fix assertion depending on hash map order in KafkaBasedLogTest.testSendAndReadToEnd

Repository: kafka
Updated Branches:
  refs/heads/trunk b5f5be69e -> ee6b5e044


KAFKA-2667; Fix assertion depending on hash map order in KafkaBasedLogTest.testSendAndReadToEnd

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>

Closes #642 from hachikuji/KAFKA-2667


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ee6b5e04
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ee6b5e04
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ee6b5e04

Branch: refs/heads/trunk
Commit: ee6b5e044c5a4b8ec996b05da76a21c191a95149
Parents: b5f5be6
Author: Jason Gustafson <ja...@confluent.io>
Authored: Tue Dec 8 17:48:52 2015 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Dec 8 17:48:52 2015 -0800

----------------------------------------------------------------------
 .../kafka/connect/util/KafkaBasedLogTest.java   | 29 ++++++++++++++------
 1 file changed, 20 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ee6b5e04/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
index 2ead813..ab370e3 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
@@ -113,11 +113,17 @@ public class KafkaBasedLogTest {
     private KafkaProducer<String, String> producer;
     private MockConsumer<String, String> consumer;
 
-    private List<ConsumerRecord<String, String>> consumedRecords = new ArrayList<>();
+    private Map<TopicPartition, List<ConsumerRecord<String, String>>> consumedRecords = new HashMap<>();
     private Callback<ConsumerRecord<String, String>> consumedCallback = new Callback<ConsumerRecord<String, String>>() {
         @Override
         public void onCompletion(Throwable error, ConsumerRecord<String, String> record) {
-            consumedRecords.add(record);
+            TopicPartition partition = new TopicPartition(record.topic(), record.partition());
+            List<ConsumerRecord<String, String>> records = consumedRecords.get(partition);
+            if (records == null) {
+                records = new ArrayList<>();
+                consumedRecords.put(partition, records);
+            }
+            records.add(record);
         }
     };
 
@@ -200,8 +206,9 @@ public class KafkaBasedLogTest {
 
         assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
         assertEquals(2, consumedRecords.size());
-        assertEquals(TP0_VALUE, consumedRecords.get(0).value());
-        assertEquals(TP1_VALUE, consumedRecords.get(1).value());
+
+        assertEquals(TP0_VALUE, consumedRecords.get(TP0).get(0).value());
+        assertEquals(TP1_VALUE, consumedRecords.get(TP1).get(0).value());
 
         store.stop();
 
@@ -308,11 +315,15 @@ public class KafkaBasedLogTest {
         });
         readEndFutureCallback.get(10000, TimeUnit.MILLISECONDS);
         assertTrue(getInvoked.get());
-        assertEquals(4, consumedRecords.size());
-        assertEquals(TP0_VALUE, consumedRecords.get(0).value());
-        assertEquals(TP0_VALUE_NEW, consumedRecords.get(1).value());
-        assertEquals(TP1_VALUE, consumedRecords.get(2).value());
-        assertEquals(TP1_VALUE_NEW, consumedRecords.get(3).value());
+        assertEquals(2, consumedRecords.size());
+
+        assertEquals(2, consumedRecords.get(TP0).size());
+        assertEquals(TP0_VALUE, consumedRecords.get(TP0).get(0).value());
+        assertEquals(TP0_VALUE_NEW, consumedRecords.get(TP0).get(1).value());
+
+        assertEquals(2, consumedRecords.get(TP1).size());
+        assertEquals(TP1_VALUE, consumedRecords.get(TP1).get(0).value());
+        assertEquals(TP1_VALUE_NEW, consumedRecords.get(TP1).get(1).value());
 
         // Cleanup
         store.stop();