You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2015/12/09 02:49:40 UTC
kafka git commit: KAFKA-2667;
Fix assertion depending on hash map order in
KafkaBasedLogTest.testSendAndReadToEnd
Repository: kafka
Updated Branches:
refs/heads/trunk b5f5be69e -> ee6b5e044
KAFKA-2667; Fix assertion depending on hash map order in KafkaBasedLogTest.testSendAndReadToEnd
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>
Closes #642 from hachikuji/KAFKA-2667
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ee6b5e04
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ee6b5e04
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ee6b5e04
Branch: refs/heads/trunk
Commit: ee6b5e044c5a4b8ec996b05da76a21c191a95149
Parents: b5f5be6
Author: Jason Gustafson <ja...@confluent.io>
Authored: Tue Dec 8 17:48:52 2015 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Dec 8 17:48:52 2015 -0800
----------------------------------------------------------------------
.../kafka/connect/util/KafkaBasedLogTest.java | 29 ++++++++++++++------
1 file changed, 20 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/ee6b5e04/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
index 2ead813..ab370e3 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
@@ -113,11 +113,17 @@ public class KafkaBasedLogTest {
private KafkaProducer<String, String> producer;
private MockConsumer<String, String> consumer;
- private List<ConsumerRecord<String, String>> consumedRecords = new ArrayList<>();
+ private Map<TopicPartition, List<ConsumerRecord<String, String>>> consumedRecords = new HashMap<>();
private Callback<ConsumerRecord<String, String>> consumedCallback = new Callback<ConsumerRecord<String, String>>() {
@Override
public void onCompletion(Throwable error, ConsumerRecord<String, String> record) {
- consumedRecords.add(record);
+ TopicPartition partition = new TopicPartition(record.topic(), record.partition());
+ List<ConsumerRecord<String, String>> records = consumedRecords.get(partition);
+ if (records == null) {
+ records = new ArrayList<>();
+ consumedRecords.put(partition, records);
+ }
+ records.add(record);
}
};
@@ -200,8 +206,9 @@ public class KafkaBasedLogTest {
assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
assertEquals(2, consumedRecords.size());
- assertEquals(TP0_VALUE, consumedRecords.get(0).value());
- assertEquals(TP1_VALUE, consumedRecords.get(1).value());
+
+ assertEquals(TP0_VALUE, consumedRecords.get(TP0).get(0).value());
+ assertEquals(TP1_VALUE, consumedRecords.get(TP1).get(0).value());
store.stop();
@@ -308,11 +315,15 @@ public class KafkaBasedLogTest {
});
readEndFutureCallback.get(10000, TimeUnit.MILLISECONDS);
assertTrue(getInvoked.get());
- assertEquals(4, consumedRecords.size());
- assertEquals(TP0_VALUE, consumedRecords.get(0).value());
- assertEquals(TP0_VALUE_NEW, consumedRecords.get(1).value());
- assertEquals(TP1_VALUE, consumedRecords.get(2).value());
- assertEquals(TP1_VALUE_NEW, consumedRecords.get(3).value());
+ assertEquals(2, consumedRecords.size());
+
+ assertEquals(2, consumedRecords.get(TP0).size());
+ assertEquals(TP0_VALUE, consumedRecords.get(TP0).get(0).value());
+ assertEquals(TP0_VALUE_NEW, consumedRecords.get(TP0).get(1).value());
+
+ assertEquals(2, consumedRecords.get(TP1).size());
+ assertEquals(TP1_VALUE, consumedRecords.get(TP1).get(0).value());
+ assertEquals(TP1_VALUE_NEW, consumedRecords.get(TP1).get(1).value());
// Cleanup
store.stop();