You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2021/05/27 06:23:39 UTC

[kafka] branch trunk updated: KAFKA-12819: Add assert messages to MirrorMaker tests plus other quality of life improvements (#10762)

This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 56d9482  KAFKA-12819: Add assert messages to MirrorMaker tests plus other quality of life improvements (#10762)
56d9482 is described below

commit 56d9482462c2aa941b151015499fc59485fe7426
Author: Matthew de Detrich <md...@gmail.com>
AuthorDate: Thu May 27 08:21:45 2021 +0200

    KAFKA-12819: Add assert messages to MirrorMaker tests plus other quality of life improvements (#10762)
    
    Reviewers: Ryanne Dolan <ry...@gmail.com>, Konstantine Karantasis <k....@gmail.com>
---
 .../kafka/connect/mirror/CheckpointTest.java       | 12 ++--
 .../apache/kafka/connect/mirror/HeartbeatTest.java |  9 ++-
 .../mirror/MirrorCheckpointConnectorTest.java      | 30 ++++++----
 .../connect/mirror/MirrorCheckpointTaskTest.java   | 51 +++++++++++------
 .../connect/mirror/MirrorConnectorConfigTest.java  | 64 ++++++++++++++--------
 .../mirror/MirrorHeartBeatConnectorTest.java       |  4 +-
 .../connect/mirror/MirrorHeartbeatTaskTest.java    |  8 ++-
 .../connect/mirror/MirrorMakerConfigTest.java      | 20 +++----
 .../connect/mirror/MirrorSourceConnectorTest.java  |  8 +--
 .../kafka/connect/mirror/MirrorSourceTaskTest.java | 62 ++++++++++++---------
 .../kafka/connect/mirror/OffsetSyncStoreTest.java  | 15 +++--
 .../kafka/connect/mirror/OffsetSyncTest.java       |  9 ++-
 12 files changed, 180 insertions(+), 112 deletions(-)

diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/CheckpointTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/CheckpointTest.java
index fd5448c..f008f99 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/CheckpointTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/CheckpointTest.java
@@ -32,9 +32,13 @@ public class CheckpointTest {
         byte[] value = checkpoint.recordValue();
         ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>("any-topic", 7, 8, key, value);
         Checkpoint deserialized = Checkpoint.deserializeRecord(record);
-        assertEquals(checkpoint.consumerGroupId(), deserialized.consumerGroupId());
-        assertEquals(checkpoint.topicPartition(), deserialized.topicPartition());
-        assertEquals(checkpoint.upstreamOffset(), deserialized.upstreamOffset());
-        assertEquals(checkpoint.downstreamOffset(), deserialized.downstreamOffset());
+        assertEquals(checkpoint.consumerGroupId(), deserialized.consumerGroupId(),
+                "Failure on checkpoint consumerGroupId serde");
+        assertEquals(checkpoint.topicPartition(), deserialized.topicPartition(),
+                "Failure on checkpoint topicPartition serde");
+        assertEquals(checkpoint.upstreamOffset(), deserialized.upstreamOffset(),
+                "Failure on checkpoint upstreamOffset serde");
+        assertEquals(checkpoint.downstreamOffset(), deserialized.downstreamOffset(),
+                "Failure on checkpoint downstreamOffset serde");
     }
 }
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/HeartbeatTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/HeartbeatTest.java
index fb473f7..723b0dc 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/HeartbeatTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/HeartbeatTest.java
@@ -31,8 +31,11 @@ public class HeartbeatTest {
         byte[] value = heartbeat.recordValue();
         ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>("any-topic", 6, 7, key, value);
         Heartbeat deserialized = Heartbeat.deserializeRecord(record);
-        assertEquals(heartbeat.sourceClusterAlias(), deserialized.sourceClusterAlias());
-        assertEquals(heartbeat.targetClusterAlias(), deserialized.targetClusterAlias());
-        assertEquals(heartbeat.timestamp(), deserialized.timestamp());
+        assertEquals(heartbeat.sourceClusterAlias(), deserialized.sourceClusterAlias(),
+                "Failure on heartbeat sourceClusterAlias serde");
+        assertEquals(heartbeat.targetClusterAlias(), deserialized.targetClusterAlias(),
+                "Failure on heartbeat targetClusterAlias serde");
+        assertEquals(heartbeat.timestamp(), deserialized.timestamp(),
+                "Failure on heartbeat timestamp serde");
     }
 }
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java
index 3c8453c..1391e76 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java
@@ -38,6 +38,8 @@ import static org.mockito.Mockito.spy;
 
 public class MirrorCheckpointConnectorTest {
 
+    private static final String CONSUMER_GROUP = "consumer-group-1";
+
     @Test
     public void testMirrorCheckpointConnectorDisabled() {
         // disable the checkpoint emission
@@ -45,13 +47,13 @@ public class MirrorCheckpointConnectorTest {
             makeProps("emit.checkpoints.enabled", "false"));
 
         List<String> knownConsumerGroups = new ArrayList<>();
-        knownConsumerGroups.add("consumer-group-1");
+        knownConsumerGroups.add(CONSUMER_GROUP);
         // MirrorCheckpointConnector as minimum to run taskConfig()
         MirrorCheckpointConnector connector = new MirrorCheckpointConnector(knownConsumerGroups,
                                                                             config);
         List<Map<String, String>> output = connector.taskConfigs(1);
         // expect no task will be created
-        assertEquals(0, output.size());
+        assertEquals(0, output.size(), "MirrorCheckpointConnector not disabled");
     }
 
     @Test
@@ -61,14 +63,16 @@ public class MirrorCheckpointConnectorTest {
                 makeProps("emit.checkpoints.enabled", "true"));
 
         List<String> knownConsumerGroups = new ArrayList<>();
-        knownConsumerGroups.add("consumer-group-1");
+        knownConsumerGroups.add(CONSUMER_GROUP);
         // MirrorCheckpointConnector as minimum to run taskConfig()
         MirrorCheckpointConnector connector = new MirrorCheckpointConnector(knownConsumerGroups,
                 config);
         List<Map<String, String>> output = connector.taskConfigs(1);
         // expect 1 task will be created
-        assertEquals(1, output.size());
-        assertEquals("consumer-group-1", output.get(0).get(MirrorConnectorConfig.TASK_CONSUMER_GROUPS));
+        assertEquals(1, output.size(),
+                "MirrorCheckpointConnectorEnabled for " + CONSUMER_GROUP + " has incorrect size");
+        assertEquals(CONSUMER_GROUP, output.get(0).get(MirrorConnectorConfig.TASK_CONSUMER_GROUPS),
+                "MirrorCheckpointConnectorEnabled for " + CONSUMER_GROUP + " failed");
     }
 
     @Test
@@ -77,7 +81,7 @@ public class MirrorCheckpointConnectorTest {
         MirrorCheckpointConnector connector = new MirrorCheckpointConnector(new ArrayList<>(), config);
         List<Map<String, String>> output = connector.taskConfigs(1);
         // expect no task will be created
-        assertEquals(0, output.size());
+        assertEquals(0, output.size(), "ConsumerGroup shouldn't exist");
     }
 
     @Test
@@ -86,12 +90,12 @@ public class MirrorCheckpointConnectorTest {
         MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps("enabled", "false"));
 
         List<String> knownConsumerGroups = new ArrayList<>();
-        knownConsumerGroups.add("consumer-group-1");
+        knownConsumerGroups.add(CONSUMER_GROUP);
         // MirrorCheckpointConnector as minimum to run taskConfig()
         MirrorCheckpointConnector connector = new MirrorCheckpointConnector(knownConsumerGroups, config);
         List<Map<String, String>> output = connector.taskConfigs(1);
         // expect no task will be created
-        assertEquals(0, output.size());
+        assertEquals(0, output.size(), "Replication isn't disabled");
     }
 
     @Test
@@ -100,13 +104,14 @@ public class MirrorCheckpointConnectorTest {
         MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps("enabled", "true"));
 
         List<String> knownConsumerGroups = new ArrayList<>();
-        knownConsumerGroups.add("consumer-group-1");
+        knownConsumerGroups.add(CONSUMER_GROUP);
         // MirrorCheckpointConnector as minimum to run taskConfig()
         MirrorCheckpointConnector connector = new MirrorCheckpointConnector(knownConsumerGroups, config);
         List<Map<String, String>> output = connector.taskConfigs(1);
         // expect 1 task will be created
-        assertEquals(1, output.size());
-        assertEquals("consumer-group-1", output.get(0).get(MirrorConnectorConfig.TASK_CONSUMER_GROUPS));
+        assertEquals(1, output.size(), "Replication for consumer-group-1 has incorrect size");
+        assertEquals(CONSUMER_GROUP, output.get(0).get(MirrorConnectorConfig.TASK_CONSUMER_GROUPS),
+                "Replication for consumer-group-1 failed");
     }
 
     @Test
@@ -123,7 +128,8 @@ public class MirrorCheckpointConnectorTest {
         List<String> groupFound = connector.findConsumerGroups();
 
         Set<String> expectedGroups = groups.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toSet());
-        assertEquals(expectedGroups, new HashSet<>(groupFound));
+        assertEquals(expectedGroups, new HashSet<>(groupFound),
+                "Expected groups are not the same as findConsumerGroups");
     }
 
 }
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
index abd314b..7ef878a 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
@@ -36,11 +36,14 @@ public class MirrorCheckpointTaskTest {
         MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
             new DefaultReplicationPolicy(), null, Collections.emptyMap(), Collections.emptyMap());
         assertEquals(new TopicPartition("source1.topic3", 4),
-            mirrorCheckpointTask.renameTopicPartition(new TopicPartition("topic3", 4)));
+            mirrorCheckpointTask.renameTopicPartition(new TopicPartition("topic3", 4)),
+                "Renaming source1.topic3 failed");
         assertEquals(new TopicPartition("topic3", 5),
-            mirrorCheckpointTask.renameTopicPartition(new TopicPartition("target2.topic3", 5)));
+            mirrorCheckpointTask.renameTopicPartition(new TopicPartition("target2.topic3", 5)),
+                "Renaming target2.topic3 failed");
         assertEquals(new TopicPartition("source1.source6.topic7", 8),
-            mirrorCheckpointTask.renameTopicPartition(new TopicPartition("source6.topic7", 8)));
+            mirrorCheckpointTask.renameTopicPartition(new TopicPartition("source6.topic7", 8)),
+                "Renaming source1.source6.topic7 failed");
     }
 
     @Test
@@ -53,21 +56,33 @@ public class MirrorCheckpointTaskTest {
         Checkpoint checkpoint1 = mirrorCheckpointTask.checkpoint("group9", new TopicPartition("topic1", 2),
             new OffsetAndMetadata(10, null));
         SourceRecord sourceRecord1 = mirrorCheckpointTask.checkpointRecord(checkpoint1, 123L);
-        assertEquals(new TopicPartition("source1.topic1", 2), checkpoint1.topicPartition());
-        assertEquals("group9", checkpoint1.consumerGroupId());
-        assertEquals("group9", Checkpoint.unwrapGroup(sourceRecord1.sourcePartition()));
-        assertEquals(10, checkpoint1.upstreamOffset());
-        assertEquals(11, checkpoint1.downstreamOffset());
-        assertEquals(123L, sourceRecord1.timestamp().longValue());
+        assertEquals(new TopicPartition("source1.topic1", 2), checkpoint1.topicPartition(),
+                "checkpoint group9 source1.topic1 failed");
+        assertEquals("group9", checkpoint1.consumerGroupId(),
+                "checkpoint group9 consumerGroupId failed");
+        assertEquals("group9", Checkpoint.unwrapGroup(sourceRecord1.sourcePartition()),
+                "checkpoint group9 sourcePartition failed");
+        assertEquals(10, checkpoint1.upstreamOffset(),
+                "checkpoint group9 upstreamOffset failed");
+        assertEquals(11, checkpoint1.downstreamOffset(),
+                "checkpoint group9 downstreamOffset failed");
+        assertEquals(123L, sourceRecord1.timestamp().longValue(),
+                "checkpoint group9 timestamp failed");
         Checkpoint checkpoint2 = mirrorCheckpointTask.checkpoint("group11", new TopicPartition("target2.topic5", 6),
             new OffsetAndMetadata(12, null));
         SourceRecord sourceRecord2 = mirrorCheckpointTask.checkpointRecord(checkpoint2, 234L);
-        assertEquals(new TopicPartition("topic5", 6), checkpoint2.topicPartition());
-        assertEquals("group11", checkpoint2.consumerGroupId());
-        assertEquals("group11", Checkpoint.unwrapGroup(sourceRecord2.sourcePartition()));
-        assertEquals(12, checkpoint2.upstreamOffset());
-        assertEquals(13, checkpoint2.downstreamOffset());
-        assertEquals(234L, sourceRecord2.timestamp().longValue());
+        assertEquals(new TopicPartition("topic5", 6), checkpoint2.topicPartition(),
+                "checkpoint group11 topic5 failed");
+        assertEquals("group11", checkpoint2.consumerGroupId(),
+                "checkpoint group11 consumerGroupId failed");
+        assertEquals("group11", Checkpoint.unwrapGroup(sourceRecord2.sourcePartition()),
+                "checkpoint group11 sourcePartition failed");
+        assertEquals(12, checkpoint2.upstreamOffset(),
+                "checkpoint group11 upstreamOffset failed");
+        assertEquals(13, checkpoint2.downstreamOffset(),
+                "checkpoint group11 downstreamOffset failed");
+        assertEquals(234L, sourceRecord2.timestamp().longValue(),
+                    "checkpoint group11 timestamp failed");
     }
 
     @Test
@@ -118,7 +133,9 @@ public class MirrorCheckpointTaskTest {
 
         Map<String, Map<TopicPartition, OffsetAndMetadata>> output = mirrorCheckpointTask.syncGroupOffset();
 
-        assertEquals(101, output.get(consumer1).get(t1p0).offset());
-        assertEquals(51, output.get(consumer2).get(t2p0).offset());
+        assertEquals(101, output.get(consumer1).get(t1p0).offset(),
+                "Consumer 1 " + topic1 + " failed");
+        assertEquals(51, output.get(consumer2).get(t2p0).offset(),
+                "Consumer 2 " + topic2 + " failed");
     }
 }
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
index f53aa68..7abe30d 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
@@ -41,7 +41,8 @@ public class MirrorConnectorConfigTest {
         MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps());
         Map<String, String> props = config.taskConfigForTopicPartitions(topicPartitions);
         MirrorTaskConfig taskConfig = new MirrorTaskConfig(props);
-        assertEquals(taskConfig.taskTopicPartitions(), new HashSet<>(topicPartitions));
+        assertEquals(taskConfig.taskTopicPartitions(), new HashSet<>(topicPartitions),
+                "Setting topic property configuration failed");
     }
 
     @Test
@@ -50,29 +51,36 @@ public class MirrorConnectorConfigTest {
         MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps());
         Map<String, String> props = config.taskConfigForConsumerGroups(groups);
         MirrorTaskConfig taskConfig = new MirrorTaskConfig(props);
-        assertEquals(taskConfig.taskConsumerGroups(), new HashSet<>(groups));
+        assertEquals(taskConfig.taskConsumerGroups(), new HashSet<>(groups),
+                "Setting consumer groups property configuration failed");
     }
 
     @Test
     public void testTopicMatching() {
         MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps("topics", "topic1"));
-        assertTrue(config.topicFilter().shouldReplicateTopic("topic1"));
-        assertFalse(config.topicFilter().shouldReplicateTopic("topic2"));
+        assertTrue(config.topicFilter().shouldReplicateTopic("topic1"),
+                "topic1 replication property configuration failed");
+        assertFalse(config.topicFilter().shouldReplicateTopic("topic2"),
+                "topic2 replication property configuration failed");
     }
 
     @Test
     public void testGroupMatching() {
         MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps("groups", "group1"));
-        assertTrue(config.groupFilter().shouldReplicateGroup("group1"));
-        assertFalse(config.groupFilter().shouldReplicateGroup("group2"));
+        assertTrue(config.groupFilter().shouldReplicateGroup("group1"),
+                "topic1 group matching property configuration failed");
+        assertFalse(config.groupFilter().shouldReplicateGroup("group2"),
+                "topic2 group matching property configuration failed");
     }
 
     @Test
     public void testConfigPropertyMatching() {
         MirrorConnectorConfig config = new MirrorConnectorConfig(
             makeProps("config.properties.exclude", "prop2"));
-        assertTrue(config.configPropertyFilter().shouldReplicateConfigProperty("prop1"));
-        assertFalse(config.configPropertyFilter().shouldReplicateConfigProperty("prop2"));
+        assertTrue(config.configPropertyFilter().shouldReplicateConfigProperty("prop1"),
+                "config.properties.exclude incorrectly excluded prop1");
+        assertFalse(config.configPropertyFilter().shouldReplicateConfigProperty("prop2"),
+                "config.properties.exclude incorrectly included prop2");
     }
 
     @Test
@@ -92,24 +100,26 @@ public class MirrorConnectorConfigTest {
     @Test
     public void testNoTopics() {
         MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps("topics", ""));
-        assertFalse(config.topicFilter().shouldReplicateTopic("topic1"));
-        assertFalse(config.topicFilter().shouldReplicateTopic("topic2"));
-        assertFalse(config.topicFilter().shouldReplicateTopic(""));
+        assertFalse(config.topicFilter().shouldReplicateTopic("topic1"), "topic1 shouldn't exist");
+        assertFalse(config.topicFilter().shouldReplicateTopic("topic2"), "topic2 shouldn't exist");
+        assertFalse(config.topicFilter().shouldReplicateTopic(""), "Empty topic shouldn't exist");
     }
 
     @Test
     public void testAllTopics() {
         MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps("topics", ".*"));
-        assertTrue(config.topicFilter().shouldReplicateTopic("topic1"));
-        assertTrue(config.topicFilter().shouldReplicateTopic("topic2"));
+        assertTrue(config.topicFilter().shouldReplicateTopic("topic1"),
+                "topic1 created from wildcard should exist");
+        assertTrue(config.topicFilter().shouldReplicateTopic("topic2"),
+                "topic2 created from wildcard should exist");
     }
 
     @Test
     public void testListOfTopics() {
         MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps("topics", "topic1, topic2"));
-        assertTrue(config.topicFilter().shouldReplicateTopic("topic1"));
-        assertTrue(config.topicFilter().shouldReplicateTopic("topic2"));
-        assertFalse(config.topicFilter().shouldReplicateTopic("topic3"));
+        assertTrue(config.topicFilter().shouldReplicateTopic("topic1"), "topic1 created from list should exist");
+        assertTrue(config.topicFilter().shouldReplicateTopic("topic2"), "topic2 created from list should exist");
+        assertFalse(config.topicFilter().shouldReplicateTopic("topic3"), "topic3 created from list should exist");
     }
 
     @Test
@@ -156,7 +166,8 @@ public class MirrorConnectorConfigTest {
         connectorConsumerProps = config.sourceConsumerConfig();
         expectedConsumerProps.put("auto.offset.reset", "latest");
         expectedConsumerProps.remove("max.poll.interval.ms");
-        assertEquals(expectedConsumerProps, connectorConsumerProps);
+        assertEquals(expectedConsumerProps, connectorConsumerProps,
+                MirrorConnectorConfig.CONSUMER_CLIENT_PREFIX + " source consumer config not matching");
     }
 
     @Test
@@ -172,7 +183,8 @@ public class MirrorConnectorConfigTest {
         expectedConsumerProps.put("enable.auto.commit", "false");
         expectedConsumerProps.put("auto.offset.reset", "latest");
         expectedConsumerProps.put("max.poll.interval.ms", "100");
-        assertEquals(expectedConsumerProps, connectorConsumerProps);
+        assertEquals(expectedConsumerProps, connectorConsumerProps,
+                prefix + " source consumer config not matching");
     }
 
     @Test
@@ -184,7 +196,8 @@ public class MirrorConnectorConfigTest {
         Map<String, Object> connectorProducerProps = config.sourceProducerConfig();
         Map<String, Object> expectedProducerProps = new HashMap<>();
         expectedProducerProps.put("acks", "1");
-        assertEquals(expectedProducerProps, connectorProducerProps);
+        assertEquals(expectedProducerProps, connectorProducerProps,
+                MirrorConnectorConfig.PRODUCER_CLIENT_PREFIX  + " source product config not matching");
     }
 
     @Test
@@ -195,7 +208,8 @@ public class MirrorConnectorConfigTest {
         Map<String, Object> connectorProducerProps = config.sourceProducerConfig();
         Map<String, Object> expectedProducerProps = new HashMap<>();
         expectedProducerProps.put("acks", "1");
-        assertEquals(expectedProducerProps, connectorProducerProps);
+        assertEquals(expectedProducerProps, connectorProducerProps,
+                prefix + " source producer config not matching");
     }
 
     @Test
@@ -208,7 +222,8 @@ public class MirrorConnectorConfigTest {
         Map<String, Object> connectorAdminProps = config.sourceAdminConfig();
         Map<String, Object> expectedAdminProps = new HashMap<>();
         expectedAdminProps.put("connections.max.idle.ms", "10000");
-        assertEquals(expectedAdminProps, connectorAdminProps);
+        assertEquals(expectedAdminProps, connectorAdminProps,
+                MirrorConnectorConfig.ADMIN_CLIENT_PREFIX + " source connector admin props not matching");
     }
 
     @Test
@@ -219,7 +234,7 @@ public class MirrorConnectorConfigTest {
         Map<String, Object> connectorAdminProps = config.sourceAdminConfig();
         Map<String, Object> expectedAdminProps = new HashMap<>();
         expectedAdminProps.put("connections.max.idle.ms", "10000");
-        assertEquals(expectedAdminProps, connectorAdminProps);
+        assertEquals(expectedAdminProps, connectorAdminProps, prefix + " source connector admin props not matching");
     }
 
     @Test
@@ -232,7 +247,8 @@ public class MirrorConnectorConfigTest {
         Map<String, Object> connectorAdminProps = config.targetAdminConfig();
         Map<String, Object> expectedAdminProps = new HashMap<>();
         expectedAdminProps.put("connections.max.idle.ms", "10000");
-        assertEquals(expectedAdminProps, connectorAdminProps);
+        assertEquals(expectedAdminProps, connectorAdminProps,
+                MirrorConnectorConfig.ADMIN_CLIENT_PREFIX + " target connector admin props not matching");
     }
 
     @Test
@@ -243,7 +259,7 @@ public class MirrorConnectorConfigTest {
         Map<String, Object> connectorAdminProps = config.targetAdminConfig();
         Map<String, Object> expectedAdminProps = new HashMap<>();
         expectedAdminProps.put("connections.max.idle.ms", "10000");
-        assertEquals(expectedAdminProps, connectorAdminProps);
+        assertEquals(expectedAdminProps, connectorAdminProps, prefix + " source connector admin props not matching");
     }
 
 }
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartBeatConnectorTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartBeatConnectorTest.java
index b48c469..ec06919 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartBeatConnectorTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartBeatConnectorTest.java
@@ -34,7 +34,7 @@ public class MirrorHeartBeatConnectorTest {
         MirrorHeartbeatConnector connector = new MirrorHeartbeatConnector(config);
         List<Map<String, String>> output = connector.taskConfigs(1);
         // expect no task will be created
-        assertEquals(0, output.size());
+        assertEquals(0, output.size(), "Expected task to not be created");
     }
 
     @Test
@@ -47,6 +47,6 @@ public class MirrorHeartBeatConnectorTest {
         MirrorHeartbeatConnector connector = new MirrorHeartbeatConnector(config);
         List<Map<String, String>> output = connector.taskConfigs(1);
         // expect one task will be created, even the replication is disabled
-        assertEquals(1, output.size());
+        assertEquals(1, output.size(), "Task should have been created even with replication disabled");
     }
 }
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTaskTest.java
index d4f96e7..39fd6df 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTaskTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTaskTest.java
@@ -34,7 +34,9 @@ public class MirrorHeartbeatTaskTest {
         List<SourceRecord> records = heartbeatTask.poll();
         assertEquals(1, records.size());
         Map<String, ?> sourcePartition = records.iterator().next().sourcePartition();
-        assertEquals(sourcePartition.get(Heartbeat.SOURCE_CLUSTER_ALIAS_KEY), "testSource");
-        assertEquals(sourcePartition.get(Heartbeat.TARGET_CLUSTER_ALIAS_KEY), "testTarget");
+        assertEquals(sourcePartition.get(Heartbeat.SOURCE_CLUSTER_ALIAS_KEY), "testSource",
+                "sourcePartition's " + Heartbeat.SOURCE_CLUSTER_ALIAS_KEY + " record was not created");
+        assertEquals(sourcePartition.get(Heartbeat.TARGET_CLUSTER_ALIAS_KEY), "testTarget",
+                "sourcePartition's " + Heartbeat.TARGET_CLUSTER_ALIAS_KEY + " record was not created");
     }
-}
\ No newline at end of file
+}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
index f5fe2c3..4787ecd 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
@@ -137,13 +137,13 @@ public class MirrorMakerConfigTest {
         MirrorConnectorConfig connectorConfig = new MirrorConnectorConfig(connectorProps);
         assertEquals(100, (int) connectorConfig.getInt("tasks.max"),
             "Connector properties like tasks.max should be passed through to underlying Connectors.");
-        assertEquals(Arrays.asList("topic-1"), connectorConfig.getList("topics"),
+        assertEquals(Collections.singletonList("topic-1"), connectorConfig.getList("topics"),
             "Topics include should be passed through to underlying Connectors.");
-        assertEquals(Arrays.asList("group-2"), connectorConfig.getList("groups"),
+        assertEquals(Collections.singletonList("group-2"), connectorConfig.getList("groups"),
             "Groups include should be passed through to underlying Connectors.");
-        assertEquals(Arrays.asList("property-3"), connectorConfig.getList("config.properties.exclude"),
+        assertEquals(Collections.singletonList("property-3"), connectorConfig.getList("config.properties.exclude"),
             "Config properties exclude should be passed through to underlying Connectors.");
-        assertEquals(Arrays.asList("FakeMetricsReporter"), connectorConfig.getList("metric.reporters"),
+        assertEquals(Collections.singletonList("FakeMetricsReporter"), connectorConfig.getList("metric.reporters"),
             "Metrics reporters should be passed through to underlying Connectors.");
         assertEquals("DefaultTopicFilter", connectorConfig.getClass("topic.filter.class").getSimpleName(),
             "Filters should be passed through to underlying Connectors.");
@@ -168,13 +168,13 @@ public class MirrorMakerConfigTest {
         DefaultTopicFilter.TopicFilterConfig filterConfig =
             new DefaultTopicFilter.TopicFilterConfig(connectorProps);
 
-        assertEquals(Arrays.asList("topic3"), filterConfig.getList("topics.exclude"),
+        assertEquals(Collections.singletonList("topic3"), filterConfig.getList("topics.exclude"),
             "Topics exclude should be backwards compatible.");
 
-        assertEquals(Arrays.asList("group-7"), connectorConfig.getList("groups.exclude"),
+        assertEquals(Collections.singletonList("group-7"), connectorConfig.getList("groups.exclude"),
             "Groups exclude should be backwards compatible.");
 
-        assertEquals(Arrays.asList("property-3"), connectorConfig.getList("config.properties.exclude"),
+        assertEquals(Collections.singletonList("property-3"), connectorConfig.getList("config.properties.exclude"),
             "Config properties exclude should be backwards compatible.");
 
     }
@@ -193,10 +193,10 @@ public class MirrorMakerConfigTest {
         DefaultTopicFilter.TopicFilterConfig filterConfig =
             new DefaultTopicFilter.TopicFilterConfig(connectorProps);
 
-        assertEquals(Arrays.asList("topic3"), filterConfig.getList("topics.exclude"),
+        assertEquals(Collections.singletonList("topic3"), filterConfig.getList("topics.exclude"),
             "Topics exclude should be backwards compatible.");
 
-        assertEquals(Arrays.asList("group-7"), connectorConfig.getList("groups.exclude"),
+        assertEquals(Collections.singletonList("group-7"), connectorConfig.getList("groups.exclude"),
             "Groups exclude should be backwards compatible.");
     }
 
@@ -213,7 +213,7 @@ public class MirrorMakerConfigTest {
             new DefaultTopicFilter.TopicFilterConfig(connectorProps);
         assertEquals(Arrays.asList("topic1", "topic2"), filterConfig.getList("topics"),
             "source->target.topics should be passed through to TopicFilters.");
-        assertEquals(Arrays.asList("topic3"), filterConfig.getList("topics.exclude"),
+        assertEquals(Collections.singletonList("topic3"), filterConfig.getList("topics.exclude"),
             "source->target.topics.exclude should be passed through to TopicFilters.");
     }
 
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
index 42d7951..68d149c 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
@@ -167,13 +167,13 @@ public class MirrorSourceConnectorTest {
         // t3 -> [t0p2, t0p5, t1p0, t2p1]
 
         Map<String, String> t1 = output.get(0);
-        assertEquals("t0-0,t0-3,t0-6,t1-1", t1.get(TASK_TOPIC_PARTITIONS));
+        assertEquals("t0-0,t0-3,t0-6,t1-1", t1.get(TASK_TOPIC_PARTITIONS), "Config for t1 is incorrect");
 
         Map<String, String> t2 = output.get(1);
-        assertEquals("t0-1,t0-4,t0-7,t2-0", t2.get(TASK_TOPIC_PARTITIONS));
+        assertEquals("t0-1,t0-4,t0-7,t2-0", t2.get(TASK_TOPIC_PARTITIONS), "Config for t2 is incorrect");
 
         Map<String, String> t3 = output.get(2);
-        assertEquals("t0-2,t0-5,t1-0,t2-1", t3.get(TASK_TOPIC_PARTITIONS));
+        assertEquals("t0-2,t0-5,t1-0,t2-1", t3.get(TASK_TOPIC_PARTITIONS), "Config for t3 is incorrect");
     }
 
     @Test
@@ -201,7 +201,7 @@ public class MirrorSourceConnectorTest {
         Map<String, Long> expectedPartitionCounts = new HashMap<>();
         expectedPartitionCounts.put("source.topic", 1L);
         Map<String, String> configMap = MirrorSourceConnector.configToMap(topicConfig);
-        assertEquals(2, configMap.size());
+        assertEquals(2, configMap.size(), "configMap has incorrect size");
 
         Map<String, NewTopic> expectedNewTopics = new HashMap<>();
         expectedNewTopics.put("source.topic", new NewTopic("source.topic", 1, (short) 0).configs(configMap));
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
index 9cf09f8..feb2f7f 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
@@ -54,15 +54,22 @@ public class MirrorSourceTaskTest {
         MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(null, null, "cluster7",
                 new DefaultReplicationPolicy(), 50);
         SourceRecord sourceRecord = mirrorSourceTask.convertRecord(consumerRecord);
-        assertEquals("cluster7.topic1", sourceRecord.topic());
-        assertEquals(2, sourceRecord.kafkaPartition().intValue());
-        assertEquals(new TopicPartition("topic1", 2), MirrorUtils.unwrapPartition(sourceRecord.sourcePartition()));
-        assertEquals(3L, MirrorUtils.unwrapOffset(sourceRecord.sourceOffset()).longValue());
-        assertEquals(4L, sourceRecord.timestamp().longValue());
-        assertEquals(key, sourceRecord.key());
-        assertEquals(value, sourceRecord.value());
-        assertEquals(headers.lastHeader("header1").value(), sourceRecord.headers().lastWithName("header1").value());
-        assertEquals(headers.lastHeader("header2").value(), sourceRecord.headers().lastWithName("header2").value());
+        assertEquals("cluster7.topic1", sourceRecord.topic(),
+                "Failure on cluster7.topic1 consumerRecord serde");
+        assertEquals(2, sourceRecord.kafkaPartition().intValue(),
+                "sourceRecord kafka partition is incorrect");
+        assertEquals(new TopicPartition("topic1", 2), MirrorUtils.unwrapPartition(sourceRecord.sourcePartition()),
+                "topic1 unwrapped from sourcePartition is incorrect");
+        assertEquals(3L, MirrorUtils.unwrapOffset(sourceRecord.sourceOffset()).longValue(),
+                "sourceRecord's sourceOffset is incorrect");
+        assertEquals(4L, sourceRecord.timestamp().longValue(),
+                "sourceRecord's timestamp is incorrect");
+        assertEquals(key, sourceRecord.key(), "sourceRecord's key is incorrect");
+        assertEquals(value, sourceRecord.value(), "sourceRecord's value is incorrect");
+        assertEquals(headers.lastHeader("header1").value(), sourceRecord.headers().lastWithName("header1").value(),
+                "sourceRecord's header1 is incorrect");
+        assertEquals(headers.lastHeader("header2").value(), sourceRecord.headers().lastWithName("header2").value(),
+                "sourceRecord's header2 is incorrect");
     }
 
     @Test
@@ -86,16 +93,16 @@ public class MirrorSourceTaskTest {
         MirrorSourceTask.PartitionState partitionState = new MirrorSourceTask.PartitionState(0);
 
         // if max offset lag is zero, should always emit offset syncs
-        assertTrue(partitionState.update(0, 100));
-        assertTrue(partitionState.update(2, 102));
-        assertTrue(partitionState.update(3, 153));
-        assertTrue(partitionState.update(4, 154));
-        assertTrue(partitionState.update(5, 155));
-        assertTrue(partitionState.update(6, 207));
-        assertTrue(partitionState.update(2, 208));
-        assertTrue(partitionState.update(3, 209));
-        assertTrue(partitionState.update(4, 3));
-        assertTrue(partitionState.update(5, 4));
+        assertTrue(partitionState.update(0, 100), "zeroOffsetSync downStreamOffset 100 is incorrect");
+        assertTrue(partitionState.update(2, 102), "zeroOffsetSync downStreamOffset 102 is incorrect");
+        assertTrue(partitionState.update(3, 153), "zeroOffsetSync downStreamOffset 153 is incorrect");
+        assertTrue(partitionState.update(4, 154), "zeroOffsetSync downStreamOffset 154 is incorrect");
+        assertTrue(partitionState.update(5, 155), "zeroOffsetSync downStreamOffset 155 is incorrect");
+        assertTrue(partitionState.update(6, 207), "zeroOffsetSync downStreamOffset 207 is incorrect");
+        assertTrue(partitionState.update(2, 208), "zeroOffsetSync downStreamOffset 208 is incorrect");
+        assertTrue(partitionState.update(3, 209), "zeroOffsetSync downStreamOffset 209 is incorrect");
+        assertTrue(partitionState.update(4, 3), "zeroOffsetSync downStreamOffset 3 is incorrect");
+        assertTrue(partitionState.update(5, 4), "zeroOffsetSync downStreamOffset 4 is incorrect");
     }
 
     @Test
@@ -134,13 +141,16 @@ public class MirrorSourceTaskTest {
         for (int i = 0; i < sourceRecords.size(); i++) {
             SourceRecord sourceRecord = sourceRecords.get(i);
             ConsumerRecord<byte[], byte[]> consumerRecord = consumerRecordsList.get(i);
-            assertEquals(consumerRecord.key(), sourceRecord.key());
-            assertEquals(consumerRecord.value(), sourceRecord.value());
+            assertEquals(consumerRecord.key(), sourceRecord.key(),
+                    "consumerRecord key does not equal sourceRecord key");
+            assertEquals(consumerRecord.value(), sourceRecord.value(),
+                    "consumerRecord value does not equal sourceRecord value");
             // We expect that the topicname will be based on the replication policy currently used
             assertEquals(replicationPolicy.formatRemoteTopic(sourceClusterName, topicName),
-                    sourceRecord.topic());
+                    sourceRecord.topic(), "topicName not the same as the current replicationPolicy");
             // We expect that MirrorMaker will keep the same partition assignment
-            assertEquals(consumerRecord.partition(), sourceRecord.kafkaPartition().intValue());
+            assertEquals(consumerRecord.partition(), sourceRecord.kafkaPartition().intValue(),
+                    "partition assignment not the same as the current replicationPolicy");
             // Check header values
             List<Header> expectedHeaders = new ArrayList<>();
             consumerRecord.headers().forEach(expectedHeaders::add);
@@ -155,8 +165,10 @@ public class MirrorSourceTaskTest {
         for (int i = 0; i < expectedHeaders.size(); i++) {
             Header expectedHeader = expectedHeaders.get(i);
             org.apache.kafka.connect.header.Header taskHeader = taskHeaders.get(i);
-            assertEquals(expectedHeader.key(), taskHeader.key());
-            assertEquals(expectedHeader.value(), taskHeader.value());
+            assertEquals(expectedHeader.key(), taskHeader.key(),
+                    "taskHeader's key expected to equal " + taskHeader.key());
+            assertEquals(expectedHeader.value(), taskHeader.value(),
+                    "taskHeader's value expected to equal " + taskHeader.value().toString());
         }
     }
 }
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
index 18c9b73..9307c60 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
@@ -47,21 +47,26 @@ public class OffsetSyncStoreTest {
         FakeOffsetSyncStore store = new FakeOffsetSyncStore();
 
         store.sync(tp, 100, 200);
-        assertEquals(store.translateDownstream(tp, 150), 250);
+        assertEquals(store.translateDownstream(tp, 150), 250,
+                "Failure in translating downstream offset 250");
 
         // Translate exact offsets
         store.sync(tp, 150, 251);
-        assertEquals(store.translateDownstream(tp, 150), 251);
+        assertEquals(store.translateDownstream(tp, 150), 251,
+                "Failure in translating exact downstream offset 251");
 
         // Use old offset (5) prior to any sync -> can't translate
-        assertEquals(-1, store.translateDownstream(tp, 5));
+        assertEquals(-1, store.translateDownstream(tp, 5),
+                "Expected old offset to not translate");
 
         // Downstream offsets reset
         store.sync(tp, 200, 10);
-        assertEquals(store.translateDownstream(tp, 200), 10);
+        assertEquals(store.translateDownstream(tp, 200), 10,
+                "Failure in resetting translation of downstream offset");
 
         // Upstream offsets reset
         store.sync(tp, 20, 20);
-        assertEquals(store.translateDownstream(tp, 20), 20);
+        assertEquals(store.translateDownstream(tp, 20), 20,
+                "Failure in resetting translation of upstream offset");
     }
 }
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncTest.java
index 33f3ab0..dc7efe2 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncTest.java
@@ -32,8 +32,11 @@ public class OffsetSyncTest {
         byte[] value = offsetSync.recordValue();
         ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>("any-topic", 6, 7, key, value);
         OffsetSync deserialized = OffsetSync.deserializeRecord(record);
-        assertEquals(offsetSync.topicPartition(), deserialized.topicPartition());
-        assertEquals(offsetSync.upstreamOffset(), deserialized.upstreamOffset());
-        assertEquals(offsetSync.downstreamOffset(), deserialized.downstreamOffset());
+        assertEquals(offsetSync.topicPartition(), deserialized.topicPartition(),
+                "Failure on offset sync topic partition serde");
+        assertEquals(offsetSync.upstreamOffset(), deserialized.upstreamOffset(),
+                "Failure on upstream offset serde");
+        assertEquals(offsetSync.downstreamOffset(), deserialized.downstreamOffset(),
+                "Failure on downstream offset serde");
     }
 }