You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2021/05/27 06:23:39 UTC
[kafka] branch trunk updated: KAFKA-12819: Add assert messages to
MirrorMaker tests plus other quality of life improvements (#10762)
This is an automated email from the ASF dual-hosted git repository.
kkarantasis pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 56d9482 KAFKA-12819: Add assert messages to MirrorMaker tests plus other quality of life improvements (#10762)
56d9482 is described below
commit 56d9482462c2aa941b151015499fc59485fe7426
Author: Matthew de Detrich <md...@gmail.com>
AuthorDate: Thu May 27 08:21:45 2021 +0200
KAFKA-12819: Add assert messages to MirrorMaker tests plus other quality of life improvements (#10762)
Reviewers: Ryanne Dolan <ry...@gmail.com>, Konstantine Karantasis <k....@gmail.com>
---
.../kafka/connect/mirror/CheckpointTest.java | 12 ++--
.../apache/kafka/connect/mirror/HeartbeatTest.java | 9 ++-
.../mirror/MirrorCheckpointConnectorTest.java | 30 ++++++----
.../connect/mirror/MirrorCheckpointTaskTest.java | 51 +++++++++++------
.../connect/mirror/MirrorConnectorConfigTest.java | 64 ++++++++++++++--------
.../mirror/MirrorHeartBeatConnectorTest.java | 4 +-
.../connect/mirror/MirrorHeartbeatTaskTest.java | 8 ++-
.../connect/mirror/MirrorMakerConfigTest.java | 20 +++----
.../connect/mirror/MirrorSourceConnectorTest.java | 8 +--
.../kafka/connect/mirror/MirrorSourceTaskTest.java | 62 ++++++++++++---------
.../kafka/connect/mirror/OffsetSyncStoreTest.java | 15 +++--
.../kafka/connect/mirror/OffsetSyncTest.java | 9 ++-
12 files changed, 180 insertions(+), 112 deletions(-)
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/CheckpointTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/CheckpointTest.java
index fd5448c..f008f99 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/CheckpointTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/CheckpointTest.java
@@ -32,9 +32,13 @@ public class CheckpointTest {
byte[] value = checkpoint.recordValue();
ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>("any-topic", 7, 8, key, value);
Checkpoint deserialized = Checkpoint.deserializeRecord(record);
- assertEquals(checkpoint.consumerGroupId(), deserialized.consumerGroupId());
- assertEquals(checkpoint.topicPartition(), deserialized.topicPartition());
- assertEquals(checkpoint.upstreamOffset(), deserialized.upstreamOffset());
- assertEquals(checkpoint.downstreamOffset(), deserialized.downstreamOffset());
+ assertEquals(checkpoint.consumerGroupId(), deserialized.consumerGroupId(),
+ "Failure on checkpoint consumerGroupId serde");
+ assertEquals(checkpoint.topicPartition(), deserialized.topicPartition(),
+ "Failure on checkpoint topicPartition serde");
+ assertEquals(checkpoint.upstreamOffset(), deserialized.upstreamOffset(),
+ "Failure on checkpoint upstreamOffset serde");
+ assertEquals(checkpoint.downstreamOffset(), deserialized.downstreamOffset(),
+ "Failure on checkpoint downstreamOffset serde");
}
}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/HeartbeatTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/HeartbeatTest.java
index fb473f7..723b0dc 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/HeartbeatTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/HeartbeatTest.java
@@ -31,8 +31,11 @@ public class HeartbeatTest {
byte[] value = heartbeat.recordValue();
ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>("any-topic", 6, 7, key, value);
Heartbeat deserialized = Heartbeat.deserializeRecord(record);
- assertEquals(heartbeat.sourceClusterAlias(), deserialized.sourceClusterAlias());
- assertEquals(heartbeat.targetClusterAlias(), deserialized.targetClusterAlias());
- assertEquals(heartbeat.timestamp(), deserialized.timestamp());
+ assertEquals(heartbeat.sourceClusterAlias(), deserialized.sourceClusterAlias(),
+ "Failure on heartbeat sourceClusterAlias serde");
+ assertEquals(heartbeat.targetClusterAlias(), deserialized.targetClusterAlias(),
+ "Failure on heartbeat targetClusterAlias serde");
+ assertEquals(heartbeat.timestamp(), deserialized.timestamp(),
+ "Failure on heartbeat timestamp serde");
}
}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java
index 3c8453c..1391e76 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java
@@ -38,6 +38,8 @@ import static org.mockito.Mockito.spy;
public class MirrorCheckpointConnectorTest {
+ private static final String CONSUMER_GROUP = "consumer-group-1";
+
@Test
public void testMirrorCheckpointConnectorDisabled() {
// disable the checkpoint emission
@@ -45,13 +47,13 @@ public class MirrorCheckpointConnectorTest {
makeProps("emit.checkpoints.enabled", "false"));
List<String> knownConsumerGroups = new ArrayList<>();
- knownConsumerGroups.add("consumer-group-1");
+ knownConsumerGroups.add(CONSUMER_GROUP);
// MirrorCheckpointConnector as minimum to run taskConfig()
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(knownConsumerGroups,
config);
List<Map<String, String>> output = connector.taskConfigs(1);
// expect no task will be created
- assertEquals(0, output.size());
+ assertEquals(0, output.size(), "MirrorCheckpointConnector not disabled");
}
@Test
@@ -61,14 +63,16 @@ public class MirrorCheckpointConnectorTest {
makeProps("emit.checkpoints.enabled", "true"));
List<String> knownConsumerGroups = new ArrayList<>();
- knownConsumerGroups.add("consumer-group-1");
+ knownConsumerGroups.add(CONSUMER_GROUP);
// MirrorCheckpointConnector as minimum to run taskConfig()
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(knownConsumerGroups,
config);
List<Map<String, String>> output = connector.taskConfigs(1);
// expect 1 task will be created
- assertEquals(1, output.size());
- assertEquals("consumer-group-1", output.get(0).get(MirrorConnectorConfig.TASK_CONSUMER_GROUPS));
+ assertEquals(1, output.size(),
+ "MirrorCheckpointConnectorEnabled for " + CONSUMER_GROUP + " has incorrect size");
+ assertEquals(CONSUMER_GROUP, output.get(0).get(MirrorConnectorConfig.TASK_CONSUMER_GROUPS),
+ "MirrorCheckpointConnectorEnabled for " + CONSUMER_GROUP + " failed");
}
@Test
@@ -77,7 +81,7 @@ public class MirrorCheckpointConnectorTest {
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(new ArrayList<>(), config);
List<Map<String, String>> output = connector.taskConfigs(1);
// expect no task will be created
- assertEquals(0, output.size());
+ assertEquals(0, output.size(), "ConsumerGroup shouldn't exist");
}
@Test
@@ -86,12 +90,12 @@ public class MirrorCheckpointConnectorTest {
MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps("enabled", "false"));
List<String> knownConsumerGroups = new ArrayList<>();
- knownConsumerGroups.add("consumer-group-1");
+ knownConsumerGroups.add(CONSUMER_GROUP);
// MirrorCheckpointConnector as minimum to run taskConfig()
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(knownConsumerGroups, config);
List<Map<String, String>> output = connector.taskConfigs(1);
// expect no task will be created
- assertEquals(0, output.size());
+ assertEquals(0, output.size(), "Replication isn't disabled");
}
@Test
@@ -100,13 +104,14 @@ public class MirrorCheckpointConnectorTest {
MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps("enabled", "true"));
List<String> knownConsumerGroups = new ArrayList<>();
- knownConsumerGroups.add("consumer-group-1");
+ knownConsumerGroups.add(CONSUMER_GROUP);
// MirrorCheckpointConnector as minimum to run taskConfig()
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(knownConsumerGroups, config);
List<Map<String, String>> output = connector.taskConfigs(1);
// expect 1 task will be created
- assertEquals(1, output.size());
- assertEquals("consumer-group-1", output.get(0).get(MirrorConnectorConfig.TASK_CONSUMER_GROUPS));
+ assertEquals(1, output.size(), "Replication for consumer-group-1 has incorrect size");
+ assertEquals(CONSUMER_GROUP, output.get(0).get(MirrorConnectorConfig.TASK_CONSUMER_GROUPS),
+ "Replication for consumer-group-1 failed");
}
@Test
@@ -123,7 +128,8 @@ public class MirrorCheckpointConnectorTest {
List<String> groupFound = connector.findConsumerGroups();
Set<String> expectedGroups = groups.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toSet());
- assertEquals(expectedGroups, new HashSet<>(groupFound));
+ assertEquals(expectedGroups, new HashSet<>(groupFound),
+ "Expected groups are not the same as findConsumerGroups");
}
}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
index abd314b..7ef878a 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
@@ -36,11 +36,14 @@ public class MirrorCheckpointTaskTest {
MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
new DefaultReplicationPolicy(), null, Collections.emptyMap(), Collections.emptyMap());
assertEquals(new TopicPartition("source1.topic3", 4),
- mirrorCheckpointTask.renameTopicPartition(new TopicPartition("topic3", 4)));
+ mirrorCheckpointTask.renameTopicPartition(new TopicPartition("topic3", 4)),
+ "Renaming source1.topic3 failed");
assertEquals(new TopicPartition("topic3", 5),
- mirrorCheckpointTask.renameTopicPartition(new TopicPartition("target2.topic3", 5)));
+ mirrorCheckpointTask.renameTopicPartition(new TopicPartition("target2.topic3", 5)),
+ "Renaming target2.topic3 failed");
assertEquals(new TopicPartition("source1.source6.topic7", 8),
- mirrorCheckpointTask.renameTopicPartition(new TopicPartition("source6.topic7", 8)));
+ mirrorCheckpointTask.renameTopicPartition(new TopicPartition("source6.topic7", 8)),
+ "Renaming source1.source6.topic7 failed");
}
@Test
@@ -53,21 +56,33 @@ public class MirrorCheckpointTaskTest {
Checkpoint checkpoint1 = mirrorCheckpointTask.checkpoint("group9", new TopicPartition("topic1", 2),
new OffsetAndMetadata(10, null));
SourceRecord sourceRecord1 = mirrorCheckpointTask.checkpointRecord(checkpoint1, 123L);
- assertEquals(new TopicPartition("source1.topic1", 2), checkpoint1.topicPartition());
- assertEquals("group9", checkpoint1.consumerGroupId());
- assertEquals("group9", Checkpoint.unwrapGroup(sourceRecord1.sourcePartition()));
- assertEquals(10, checkpoint1.upstreamOffset());
- assertEquals(11, checkpoint1.downstreamOffset());
- assertEquals(123L, sourceRecord1.timestamp().longValue());
+ assertEquals(new TopicPartition("source1.topic1", 2), checkpoint1.topicPartition(),
+ "checkpoint group9 source1.topic1 failed");
+ assertEquals("group9", checkpoint1.consumerGroupId(),
+ "checkpoint group9 consumerGroupId failed");
+ assertEquals("group9", Checkpoint.unwrapGroup(sourceRecord1.sourcePartition()),
+ "checkpoint group9 sourcePartition failed");
+ assertEquals(10, checkpoint1.upstreamOffset(),
+ "checkpoint group9 upstreamOffset failed");
+ assertEquals(11, checkpoint1.downstreamOffset(),
+ "checkpoint group9 downstreamOffset failed");
+ assertEquals(123L, sourceRecord1.timestamp().longValue(),
+ "checkpoint group9 timestamp failed");
Checkpoint checkpoint2 = mirrorCheckpointTask.checkpoint("group11", new TopicPartition("target2.topic5", 6),
new OffsetAndMetadata(12, null));
SourceRecord sourceRecord2 = mirrorCheckpointTask.checkpointRecord(checkpoint2, 234L);
- assertEquals(new TopicPartition("topic5", 6), checkpoint2.topicPartition());
- assertEquals("group11", checkpoint2.consumerGroupId());
- assertEquals("group11", Checkpoint.unwrapGroup(sourceRecord2.sourcePartition()));
- assertEquals(12, checkpoint2.upstreamOffset());
- assertEquals(13, checkpoint2.downstreamOffset());
- assertEquals(234L, sourceRecord2.timestamp().longValue());
+ assertEquals(new TopicPartition("topic5", 6), checkpoint2.topicPartition(),
+ "checkpoint group11 topic5 failed");
+ assertEquals("group11", checkpoint2.consumerGroupId(),
+ "checkpoint group11 consumerGroupId failed");
+ assertEquals("group11", Checkpoint.unwrapGroup(sourceRecord2.sourcePartition()),
+ "checkpoint group11 sourcePartition failed");
+ assertEquals(12, checkpoint2.upstreamOffset(),
+ "checkpoint group11 upstreamOffset failed");
+ assertEquals(13, checkpoint2.downstreamOffset(),
+ "checkpoint group11 downstreamOffset failed");
+ assertEquals(234L, sourceRecord2.timestamp().longValue(),
+ "checkpoint group11 timestamp failed");
}
@Test
@@ -118,7 +133,9 @@ public class MirrorCheckpointTaskTest {
Map<String, Map<TopicPartition, OffsetAndMetadata>> output = mirrorCheckpointTask.syncGroupOffset();
- assertEquals(101, output.get(consumer1).get(t1p0).offset());
- assertEquals(51, output.get(consumer2).get(t2p0).offset());
+ assertEquals(101, output.get(consumer1).get(t1p0).offset(),
+ "Consumer 1 " + topic1 + " failed");
+ assertEquals(51, output.get(consumer2).get(t2p0).offset(),
+ "Consumer 2 " + topic2 + " failed");
}
}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
index f53aa68..7abe30d 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
@@ -41,7 +41,8 @@ public class MirrorConnectorConfigTest {
MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps());
Map<String, String> props = config.taskConfigForTopicPartitions(topicPartitions);
MirrorTaskConfig taskConfig = new MirrorTaskConfig(props);
- assertEquals(taskConfig.taskTopicPartitions(), new HashSet<>(topicPartitions));
+ assertEquals(taskConfig.taskTopicPartitions(), new HashSet<>(topicPartitions),
+ "Setting topic property configuration failed");
}
@Test
@@ -50,29 +51,36 @@ public class MirrorConnectorConfigTest {
MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps());
Map<String, String> props = config.taskConfigForConsumerGroups(groups);
MirrorTaskConfig taskConfig = new MirrorTaskConfig(props);
- assertEquals(taskConfig.taskConsumerGroups(), new HashSet<>(groups));
+ assertEquals(taskConfig.taskConsumerGroups(), new HashSet<>(groups),
+ "Setting consumer groups property configuration failed");
}
@Test
public void testTopicMatching() {
MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps("topics", "topic1"));
- assertTrue(config.topicFilter().shouldReplicateTopic("topic1"));
- assertFalse(config.topicFilter().shouldReplicateTopic("topic2"));
+ assertTrue(config.topicFilter().shouldReplicateTopic("topic1"),
+ "topic1 replication property configuration failed");
+ assertFalse(config.topicFilter().shouldReplicateTopic("topic2"),
+ "topic2 replication property configuration failed");
}
@Test
public void testGroupMatching() {
MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps("groups", "group1"));
- assertTrue(config.groupFilter().shouldReplicateGroup("group1"));
- assertFalse(config.groupFilter().shouldReplicateGroup("group2"));
+ assertTrue(config.groupFilter().shouldReplicateGroup("group1"),
+ "topic1 group matching property configuration failed");
+ assertFalse(config.groupFilter().shouldReplicateGroup("group2"),
+ "topic2 group matching property configuration failed");
}
@Test
public void testConfigPropertyMatching() {
MirrorConnectorConfig config = new MirrorConnectorConfig(
makeProps("config.properties.exclude", "prop2"));
- assertTrue(config.configPropertyFilter().shouldReplicateConfigProperty("prop1"));
- assertFalse(config.configPropertyFilter().shouldReplicateConfigProperty("prop2"));
+ assertTrue(config.configPropertyFilter().shouldReplicateConfigProperty("prop1"),
+ "config.properties.exclude incorrectly excluded prop1");
+ assertFalse(config.configPropertyFilter().shouldReplicateConfigProperty("prop2"),
+ "config.properties.exclude incorrectly included prop2");
}
@Test
@@ -92,24 +100,26 @@ public class MirrorConnectorConfigTest {
@Test
public void testNoTopics() {
MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps("topics", ""));
- assertFalse(config.topicFilter().shouldReplicateTopic("topic1"));
- assertFalse(config.topicFilter().shouldReplicateTopic("topic2"));
- assertFalse(config.topicFilter().shouldReplicateTopic(""));
+ assertFalse(config.topicFilter().shouldReplicateTopic("topic1"), "topic1 shouldn't exist");
+ assertFalse(config.topicFilter().shouldReplicateTopic("topic2"), "topic2 shouldn't exist");
+ assertFalse(config.topicFilter().shouldReplicateTopic(""), "Empty topic shouldn't exist");
}
@Test
public void testAllTopics() {
MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps("topics", ".*"));
- assertTrue(config.topicFilter().shouldReplicateTopic("topic1"));
- assertTrue(config.topicFilter().shouldReplicateTopic("topic2"));
+ assertTrue(config.topicFilter().shouldReplicateTopic("topic1"),
+ "topic1 created from wildcard should exist");
+ assertTrue(config.topicFilter().shouldReplicateTopic("topic2"),
+ "topic2 created from wildcard should exist");
}
@Test
public void testListOfTopics() {
MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps("topics", "topic1, topic2"));
- assertTrue(config.topicFilter().shouldReplicateTopic("topic1"));
- assertTrue(config.topicFilter().shouldReplicateTopic("topic2"));
- assertFalse(config.topicFilter().shouldReplicateTopic("topic3"));
+ assertTrue(config.topicFilter().shouldReplicateTopic("topic1"), "topic1 created from list should exist");
+ assertTrue(config.topicFilter().shouldReplicateTopic("topic2"), "topic2 created from list should exist");
+ assertFalse(config.topicFilter().shouldReplicateTopic("topic3"), "topic3 created from list should exist");
}
@Test
@@ -156,7 +166,8 @@ public class MirrorConnectorConfigTest {
connectorConsumerProps = config.sourceConsumerConfig();
expectedConsumerProps.put("auto.offset.reset", "latest");
expectedConsumerProps.remove("max.poll.interval.ms");
- assertEquals(expectedConsumerProps, connectorConsumerProps);
+ assertEquals(expectedConsumerProps, connectorConsumerProps,
+ MirrorConnectorConfig.CONSUMER_CLIENT_PREFIX + " source consumer config not matching");
}
@Test
@@ -172,7 +183,8 @@ public class MirrorConnectorConfigTest {
expectedConsumerProps.put("enable.auto.commit", "false");
expectedConsumerProps.put("auto.offset.reset", "latest");
expectedConsumerProps.put("max.poll.interval.ms", "100");
- assertEquals(expectedConsumerProps, connectorConsumerProps);
+ assertEquals(expectedConsumerProps, connectorConsumerProps,
+ prefix + " source consumer config not matching");
}
@Test
@@ -184,7 +196,8 @@ public class MirrorConnectorConfigTest {
Map<String, Object> connectorProducerProps = config.sourceProducerConfig();
Map<String, Object> expectedProducerProps = new HashMap<>();
expectedProducerProps.put("acks", "1");
- assertEquals(expectedProducerProps, connectorProducerProps);
+ assertEquals(expectedProducerProps, connectorProducerProps,
+ MirrorConnectorConfig.PRODUCER_CLIENT_PREFIX + " source product config not matching");
}
@Test
@@ -195,7 +208,8 @@ public class MirrorConnectorConfigTest {
Map<String, Object> connectorProducerProps = config.sourceProducerConfig();
Map<String, Object> expectedProducerProps = new HashMap<>();
expectedProducerProps.put("acks", "1");
- assertEquals(expectedProducerProps, connectorProducerProps);
+ assertEquals(expectedProducerProps, connectorProducerProps,
+ prefix + " source producer config not matching");
}
@Test
@@ -208,7 +222,8 @@ public class MirrorConnectorConfigTest {
Map<String, Object> connectorAdminProps = config.sourceAdminConfig();
Map<String, Object> expectedAdminProps = new HashMap<>();
expectedAdminProps.put("connections.max.idle.ms", "10000");
- assertEquals(expectedAdminProps, connectorAdminProps);
+ assertEquals(expectedAdminProps, connectorAdminProps,
+ MirrorConnectorConfig.ADMIN_CLIENT_PREFIX + " source connector admin props not matching");
}
@Test
@@ -219,7 +234,7 @@ public class MirrorConnectorConfigTest {
Map<String, Object> connectorAdminProps = config.sourceAdminConfig();
Map<String, Object> expectedAdminProps = new HashMap<>();
expectedAdminProps.put("connections.max.idle.ms", "10000");
- assertEquals(expectedAdminProps, connectorAdminProps);
+ assertEquals(expectedAdminProps, connectorAdminProps, prefix + " source connector admin props not matching");
}
@Test
@@ -232,7 +247,8 @@ public class MirrorConnectorConfigTest {
Map<String, Object> connectorAdminProps = config.targetAdminConfig();
Map<String, Object> expectedAdminProps = new HashMap<>();
expectedAdminProps.put("connections.max.idle.ms", "10000");
- assertEquals(expectedAdminProps, connectorAdminProps);
+ assertEquals(expectedAdminProps, connectorAdminProps,
+ MirrorConnectorConfig.ADMIN_CLIENT_PREFIX + " target connector admin props not matching");
}
@Test
@@ -243,7 +259,7 @@ public class MirrorConnectorConfigTest {
Map<String, Object> connectorAdminProps = config.targetAdminConfig();
Map<String, Object> expectedAdminProps = new HashMap<>();
expectedAdminProps.put("connections.max.idle.ms", "10000");
- assertEquals(expectedAdminProps, connectorAdminProps);
+ assertEquals(expectedAdminProps, connectorAdminProps, prefix + " source connector admin props not matching");
}
}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartBeatConnectorTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartBeatConnectorTest.java
index b48c469..ec06919 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartBeatConnectorTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartBeatConnectorTest.java
@@ -34,7 +34,7 @@ public class MirrorHeartBeatConnectorTest {
MirrorHeartbeatConnector connector = new MirrorHeartbeatConnector(config);
List<Map<String, String>> output = connector.taskConfigs(1);
// expect no task will be created
- assertEquals(0, output.size());
+ assertEquals(0, output.size(), "Expected task to not be created");
}
@Test
@@ -47,6 +47,6 @@ public class MirrorHeartBeatConnectorTest {
MirrorHeartbeatConnector connector = new MirrorHeartbeatConnector(config);
List<Map<String, String>> output = connector.taskConfigs(1);
// expect one task will be created, even the replication is disabled
- assertEquals(1, output.size());
+ assertEquals(1, output.size(), "Task should have been created even with replication disabled");
}
}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTaskTest.java
index d4f96e7..39fd6df 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTaskTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorHeartbeatTaskTest.java
@@ -34,7 +34,9 @@ public class MirrorHeartbeatTaskTest {
List<SourceRecord> records = heartbeatTask.poll();
assertEquals(1, records.size());
Map<String, ?> sourcePartition = records.iterator().next().sourcePartition();
- assertEquals(sourcePartition.get(Heartbeat.SOURCE_CLUSTER_ALIAS_KEY), "testSource");
- assertEquals(sourcePartition.get(Heartbeat.TARGET_CLUSTER_ALIAS_KEY), "testTarget");
+ assertEquals(sourcePartition.get(Heartbeat.SOURCE_CLUSTER_ALIAS_KEY), "testSource",
+ "sourcePartition's " + Heartbeat.SOURCE_CLUSTER_ALIAS_KEY + " record was not created");
+ assertEquals(sourcePartition.get(Heartbeat.TARGET_CLUSTER_ALIAS_KEY), "testTarget",
+ "sourcePartition's " + Heartbeat.TARGET_CLUSTER_ALIAS_KEY + " record was not created");
}
-}
\ No newline at end of file
+}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
index f5fe2c3..4787ecd 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java
@@ -137,13 +137,13 @@ public class MirrorMakerConfigTest {
MirrorConnectorConfig connectorConfig = new MirrorConnectorConfig(connectorProps);
assertEquals(100, (int) connectorConfig.getInt("tasks.max"),
"Connector properties like tasks.max should be passed through to underlying Connectors.");
- assertEquals(Arrays.asList("topic-1"), connectorConfig.getList("topics"),
+ assertEquals(Collections.singletonList("topic-1"), connectorConfig.getList("topics"),
"Topics include should be passed through to underlying Connectors.");
- assertEquals(Arrays.asList("group-2"), connectorConfig.getList("groups"),
+ assertEquals(Collections.singletonList("group-2"), connectorConfig.getList("groups"),
"Groups include should be passed through to underlying Connectors.");
- assertEquals(Arrays.asList("property-3"), connectorConfig.getList("config.properties.exclude"),
+ assertEquals(Collections.singletonList("property-3"), connectorConfig.getList("config.properties.exclude"),
"Config properties exclude should be passed through to underlying Connectors.");
- assertEquals(Arrays.asList("FakeMetricsReporter"), connectorConfig.getList("metric.reporters"),
+ assertEquals(Collections.singletonList("FakeMetricsReporter"), connectorConfig.getList("metric.reporters"),
"Metrics reporters should be passed through to underlying Connectors.");
assertEquals("DefaultTopicFilter", connectorConfig.getClass("topic.filter.class").getSimpleName(),
"Filters should be passed through to underlying Connectors.");
@@ -168,13 +168,13 @@ public class MirrorMakerConfigTest {
DefaultTopicFilter.TopicFilterConfig filterConfig =
new DefaultTopicFilter.TopicFilterConfig(connectorProps);
- assertEquals(Arrays.asList("topic3"), filterConfig.getList("topics.exclude"),
+ assertEquals(Collections.singletonList("topic3"), filterConfig.getList("topics.exclude"),
"Topics exclude should be backwards compatible.");
- assertEquals(Arrays.asList("group-7"), connectorConfig.getList("groups.exclude"),
+ assertEquals(Collections.singletonList("group-7"), connectorConfig.getList("groups.exclude"),
"Groups exclude should be backwards compatible.");
- assertEquals(Arrays.asList("property-3"), connectorConfig.getList("config.properties.exclude"),
+ assertEquals(Collections.singletonList("property-3"), connectorConfig.getList("config.properties.exclude"),
"Config properties exclude should be backwards compatible.");
}
@@ -193,10 +193,10 @@ public class MirrorMakerConfigTest {
DefaultTopicFilter.TopicFilterConfig filterConfig =
new DefaultTopicFilter.TopicFilterConfig(connectorProps);
- assertEquals(Arrays.asList("topic3"), filterConfig.getList("topics.exclude"),
+ assertEquals(Collections.singletonList("topic3"), filterConfig.getList("topics.exclude"),
"Topics exclude should be backwards compatible.");
- assertEquals(Arrays.asList("group-7"), connectorConfig.getList("groups.exclude"),
+ assertEquals(Collections.singletonList("group-7"), connectorConfig.getList("groups.exclude"),
"Groups exclude should be backwards compatible.");
}
@@ -213,7 +213,7 @@ public class MirrorMakerConfigTest {
new DefaultTopicFilter.TopicFilterConfig(connectorProps);
assertEquals(Arrays.asList("topic1", "topic2"), filterConfig.getList("topics"),
"source->target.topics should be passed through to TopicFilters.");
- assertEquals(Arrays.asList("topic3"), filterConfig.getList("topics.exclude"),
+ assertEquals(Collections.singletonList("topic3"), filterConfig.getList("topics.exclude"),
"source->target.topics.exclude should be passed through to TopicFilters.");
}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
index 42d7951..68d149c 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
@@ -167,13 +167,13 @@ public class MirrorSourceConnectorTest {
// t3 -> [t0p2, t0p5, t1p0, t2p1]
Map<String, String> t1 = output.get(0);
- assertEquals("t0-0,t0-3,t0-6,t1-1", t1.get(TASK_TOPIC_PARTITIONS));
+ assertEquals("t0-0,t0-3,t0-6,t1-1", t1.get(TASK_TOPIC_PARTITIONS), "Config for t1 is incorrect");
Map<String, String> t2 = output.get(1);
- assertEquals("t0-1,t0-4,t0-7,t2-0", t2.get(TASK_TOPIC_PARTITIONS));
+ assertEquals("t0-1,t0-4,t0-7,t2-0", t2.get(TASK_TOPIC_PARTITIONS), "Config for t2 is incorrect");
Map<String, String> t3 = output.get(2);
- assertEquals("t0-2,t0-5,t1-0,t2-1", t3.get(TASK_TOPIC_PARTITIONS));
+ assertEquals("t0-2,t0-5,t1-0,t2-1", t3.get(TASK_TOPIC_PARTITIONS), "Config for t3 is incorrect");
}
@Test
@@ -201,7 +201,7 @@ public class MirrorSourceConnectorTest {
Map<String, Long> expectedPartitionCounts = new HashMap<>();
expectedPartitionCounts.put("source.topic", 1L);
Map<String, String> configMap = MirrorSourceConnector.configToMap(topicConfig);
- assertEquals(2, configMap.size());
+ assertEquals(2, configMap.size(), "configMap has incorrect size");
Map<String, NewTopic> expectedNewTopics = new HashMap<>();
expectedNewTopics.put("source.topic", new NewTopic("source.topic", 1, (short) 0).configs(configMap));
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
index 9cf09f8..feb2f7f 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
@@ -54,15 +54,22 @@ public class MirrorSourceTaskTest {
MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(null, null, "cluster7",
new DefaultReplicationPolicy(), 50);
SourceRecord sourceRecord = mirrorSourceTask.convertRecord(consumerRecord);
- assertEquals("cluster7.topic1", sourceRecord.topic());
- assertEquals(2, sourceRecord.kafkaPartition().intValue());
- assertEquals(new TopicPartition("topic1", 2), MirrorUtils.unwrapPartition(sourceRecord.sourcePartition()));
- assertEquals(3L, MirrorUtils.unwrapOffset(sourceRecord.sourceOffset()).longValue());
- assertEquals(4L, sourceRecord.timestamp().longValue());
- assertEquals(key, sourceRecord.key());
- assertEquals(value, sourceRecord.value());
- assertEquals(headers.lastHeader("header1").value(), sourceRecord.headers().lastWithName("header1").value());
- assertEquals(headers.lastHeader("header2").value(), sourceRecord.headers().lastWithName("header2").value());
+ assertEquals("cluster7.topic1", sourceRecord.topic(),
+ "Failure on cluster7.topic1 consumerRecord serde");
+ assertEquals(2, sourceRecord.kafkaPartition().intValue(),
+ "sourceRecord kafka partition is incorrect");
+ assertEquals(new TopicPartition("topic1", 2), MirrorUtils.unwrapPartition(sourceRecord.sourcePartition()),
+ "topic1 unwrapped from sourcePartition is incorrect");
+ assertEquals(3L, MirrorUtils.unwrapOffset(sourceRecord.sourceOffset()).longValue(),
+ "sourceRecord's sourceOffset is incorrect");
+ assertEquals(4L, sourceRecord.timestamp().longValue(),
+ "sourceRecord's timestamp is incorrect");
+ assertEquals(key, sourceRecord.key(), "sourceRecord's key is incorrect");
+ assertEquals(value, sourceRecord.value(), "sourceRecord's value is incorrect");
+ assertEquals(headers.lastHeader("header1").value(), sourceRecord.headers().lastWithName("header1").value(),
+ "sourceRecord's header1 is incorrect");
+ assertEquals(headers.lastHeader("header2").value(), sourceRecord.headers().lastWithName("header2").value(),
+ "sourceRecord's header2 is incorrect");
}
@Test
@@ -86,16 +93,16 @@ public class MirrorSourceTaskTest {
MirrorSourceTask.PartitionState partitionState = new MirrorSourceTask.PartitionState(0);
// if max offset lag is zero, should always emit offset syncs
- assertTrue(partitionState.update(0, 100));
- assertTrue(partitionState.update(2, 102));
- assertTrue(partitionState.update(3, 153));
- assertTrue(partitionState.update(4, 154));
- assertTrue(partitionState.update(5, 155));
- assertTrue(partitionState.update(6, 207));
- assertTrue(partitionState.update(2, 208));
- assertTrue(partitionState.update(3, 209));
- assertTrue(partitionState.update(4, 3));
- assertTrue(partitionState.update(5, 4));
+ assertTrue(partitionState.update(0, 100), "zeroOffsetSync downStreamOffset 100 is incorrect");
+ assertTrue(partitionState.update(2, 102), "zeroOffsetSync downStreamOffset 102 is incorrect");
+ assertTrue(partitionState.update(3, 153), "zeroOffsetSync downStreamOffset 153 is incorrect");
+ assertTrue(partitionState.update(4, 154), "zeroOffsetSync downStreamOffset 154 is incorrect");
+ assertTrue(partitionState.update(5, 155), "zeroOffsetSync downStreamOffset 155 is incorrect");
+ assertTrue(partitionState.update(6, 207), "zeroOffsetSync downStreamOffset 207 is incorrect");
+ assertTrue(partitionState.update(2, 208), "zeroOffsetSync downStreamOffset 208 is incorrect");
+ assertTrue(partitionState.update(3, 209), "zeroOffsetSync downStreamOffset 209 is incorrect");
+ assertTrue(partitionState.update(4, 3), "zeroOffsetSync downStreamOffset 3 is incorrect");
+ assertTrue(partitionState.update(5, 4), "zeroOffsetSync downStreamOffset 4 is incorrect");
}
@Test
@@ -134,13 +141,16 @@ public class MirrorSourceTaskTest {
for (int i = 0; i < sourceRecords.size(); i++) {
SourceRecord sourceRecord = sourceRecords.get(i);
ConsumerRecord<byte[], byte[]> consumerRecord = consumerRecordsList.get(i);
- assertEquals(consumerRecord.key(), sourceRecord.key());
- assertEquals(consumerRecord.value(), sourceRecord.value());
+ assertEquals(consumerRecord.key(), sourceRecord.key(),
+ "consumerRecord key does not equal sourceRecord key");
+ assertEquals(consumerRecord.value(), sourceRecord.value(),
+ "consumerRecord value does not equal sourceRecord value");
// We expect that the topicname will be based on the replication policy currently used
assertEquals(replicationPolicy.formatRemoteTopic(sourceClusterName, topicName),
- sourceRecord.topic());
+ sourceRecord.topic(), "topicName not the same as the current replicationPolicy");
// We expect that MirrorMaker will keep the same partition assignment
- assertEquals(consumerRecord.partition(), sourceRecord.kafkaPartition().intValue());
+ assertEquals(consumerRecord.partition(), sourceRecord.kafkaPartition().intValue(),
+ "partition assignment not the same as the current replicationPolicy");
// Check header values
List<Header> expectedHeaders = new ArrayList<>();
consumerRecord.headers().forEach(expectedHeaders::add);
@@ -155,8 +165,10 @@ public class MirrorSourceTaskTest {
for (int i = 0; i < expectedHeaders.size(); i++) {
Header expectedHeader = expectedHeaders.get(i);
org.apache.kafka.connect.header.Header taskHeader = taskHeaders.get(i);
- assertEquals(expectedHeader.key(), taskHeader.key());
- assertEquals(expectedHeader.value(), taskHeader.value());
+ assertEquals(expectedHeader.key(), taskHeader.key(),
+ "taskHeader's key expected to equal " + taskHeader.key());
+ assertEquals(expectedHeader.value(), taskHeader.value(),
+ "taskHeader's value expected to equal " + taskHeader.value().toString());
}
}
}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
index 18c9b73..9307c60 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
@@ -47,21 +47,26 @@ public class OffsetSyncStoreTest {
FakeOffsetSyncStore store = new FakeOffsetSyncStore();
store.sync(tp, 100, 200);
- assertEquals(store.translateDownstream(tp, 150), 250);
+ assertEquals(store.translateDownstream(tp, 150), 250,
+ "Failure in translating downstream offset 250");
// Translate exact offsets
store.sync(tp, 150, 251);
- assertEquals(store.translateDownstream(tp, 150), 251);
+ assertEquals(store.translateDownstream(tp, 150), 251,
+ "Failure in translating exact downstream offset 251");
// Use old offset (5) prior to any sync -> can't translate
- assertEquals(-1, store.translateDownstream(tp, 5));
+ assertEquals(-1, store.translateDownstream(tp, 5),
+ "Expected old offset to not translate");
// Downstream offsets reset
store.sync(tp, 200, 10);
- assertEquals(store.translateDownstream(tp, 200), 10);
+ assertEquals(store.translateDownstream(tp, 200), 10,
+ "Failure in resetting translation of downstream offset");
// Upstream offsets reset
store.sync(tp, 20, 20);
- assertEquals(store.translateDownstream(tp, 20), 20);
+ assertEquals(store.translateDownstream(tp, 20), 20,
+ "Failure in resetting translation of upstream offset");
}
}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncTest.java
index 33f3ab0..dc7efe2 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncTest.java
@@ -32,8 +32,11 @@ public class OffsetSyncTest {
byte[] value = offsetSync.recordValue();
ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>("any-topic", 6, 7, key, value);
OffsetSync deserialized = OffsetSync.deserializeRecord(record);
- assertEquals(offsetSync.topicPartition(), deserialized.topicPartition());
- assertEquals(offsetSync.upstreamOffset(), deserialized.upstreamOffset());
- assertEquals(offsetSync.downstreamOffset(), deserialized.downstreamOffset());
+ assertEquals(offsetSync.topicPartition(), deserialized.topicPartition(),
+ "Failure on offset sync topic partition serde");
+ assertEquals(offsetSync.upstreamOffset(), deserialized.upstreamOffset(),
+ "Failure on upstream offset serde");
+ assertEquals(offsetSync.downstreamOffset(), deserialized.downstreamOffset(),
+ "Failure on downstream offset serde");
}
}