You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2023/05/01 16:42:39 UTC
[kafka] 02/03: MINOR: Refactor Mirror integration tests to reduce duplication (#13428)
This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit c7347c266f82c977500c76ce22309983e66e7ef0
Author: Greg Harris <gr...@aiven.io>
AuthorDate: Fri Mar 24 08:18:26 2023 -0700
MINOR: Refactor Mirror integration tests to reduce duplication (#13428)
Reviewers: Mickael Maison <mi...@gmail.com>
---
.../IdentityReplicationIntegrationTest.java | 213 +--------------------
.../MirrorConnectorsIntegrationBaseTest.java | 166 +++++++++-------
...rsWithCustomForwardingAdminIntegrationTest.java | 2 -
3 files changed, 96 insertions(+), 285 deletions(-)
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
index f2c8753fe36..8dc04e60747 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
@@ -16,28 +16,12 @@
*/
package org.apache.kafka.connect.mirror.integration;
-import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.config.TopicConfig;
-import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.mirror.IdentityReplicationPolicy;
-import org.apache.kafka.connect.mirror.MirrorClient;
-import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
-import org.apache.kafka.connect.mirror.MirrorMakerConfig;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Tag;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.BeforeEach;
/**
@@ -52,208 +36,13 @@ import org.junit.jupiter.api.BeforeEach;
public class IdentityReplicationIntegrationTest extends MirrorConnectorsIntegrationBaseTest {
@BeforeEach
public void startClusters() throws Exception {
+ replicateBackupToPrimary = false;
super.startClusters(new HashMap<String, String>() {{
put("replication.policy.class", IdentityReplicationPolicy.class.getName());
put("topics", "test-topic-.*");
- put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false");
- put(PRIMARY_CLUSTER_ALIAS + "->" + BACKUP_CLUSTER_ALIAS + ".enabled", "true");
}});
}
- @Test
- public void testReplication() throws Exception {
- produceMessages(primary, "test-topic-1");
- String consumerGroupName = "consumer-group-testReplication";
- Map<String, Object> consumerProps = new HashMap<String, Object>() {{
- put("group.id", consumerGroupName);
- put("auto.offset.reset", "latest");
- }};
- // warm up consumers before starting the connectors so we don't need to wait for discovery
- warmUpConsumer(consumerProps);
-
- mm2Config = new MirrorMakerConfig(mm2Props);
-
- waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
- waitUntilMirrorMakerIsRunning(primary, Collections.singletonList(MirrorHeartbeatConnector.class), mm2Config, BACKUP_CLUSTER_ALIAS, PRIMARY_CLUSTER_ALIAS);
-
- MirrorClient primaryClient = new MirrorClient(mm2Config.clientConfig(PRIMARY_CLUSTER_ALIAS));
- MirrorClient backupClient = new MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS));
-
- // make sure the topic is auto-created in the other cluster
- waitForTopicCreated(primary, "test-topic-1");
- waitForTopicCreated(backup, "test-topic-1");
- assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, getTopicConfig(backup.kafka(), "test-topic-1", TopicConfig.CLEANUP_POLICY_CONFIG),
- "topic config was not synced");
- createAndTestNewTopicWithConfigFilter();
-
- assertEquals(NUM_RECORDS_PRODUCED, primary.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count(),
- "Records were not produced to primary cluster.");
- assertEquals(NUM_RECORDS_PRODUCED, backup.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count(),
- "Records were not replicated to backup cluster.");
-
- assertTrue(primary.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "heartbeats").count() > 0,
- "Heartbeats were not emitted to primary cluster.");
- assertTrue(backup.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "heartbeats").count() > 0,
- "Heartbeats were not emitted to backup cluster.");
- assertTrue(backup.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "primary.heartbeats").count() > 0,
- "Heartbeats were not replicated downstream to backup cluster.");
- assertTrue(primary.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "heartbeats").count() > 0,
- "Heartbeats were not replicated downstream to primary cluster.");
-
- assertTrue(backupClient.upstreamClusters().contains(PRIMARY_CLUSTER_ALIAS), "Did not find upstream primary cluster.");
- assertEquals(1, backupClient.replicationHops(PRIMARY_CLUSTER_ALIAS), "Did not calculate replication hops correctly.");
- assertTrue(backup.kafka().consume(1, CHECKPOINT_DURATION_MS, "primary.checkpoints.internal").count() > 0,
- "Checkpoints were not emitted downstream to backup cluster.");
-
- Map<TopicPartition, OffsetAndMetadata> backupOffsets = waitForCheckpointOnAllPartitions(
- backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS, "test-topic-1");
-
- // Failover consumer group to backup cluster.
- try (Consumer<byte[], byte[]> primaryConsumer = backup.kafka().createConsumer(Collections.singletonMap("group.id", consumerGroupName))) {
- primaryConsumer.assign(backupOffsets.keySet());
- backupOffsets.forEach(primaryConsumer::seek);
- primaryConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
- primaryConsumer.commitAsync();
-
- assertTrue(primaryConsumer.position(new TopicPartition("test-topic-1", 0)) > 0, "Consumer failedover to zero offset.");
- assertTrue(primaryConsumer.position(
- new TopicPartition("test-topic-1", 0)) <= NUM_RECORDS_PRODUCED, "Consumer failedover beyond expected offset.");
- }
-
- primaryClient.close();
- backupClient.close();
-
- // create more matching topics
- primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
-
- // make sure the topic is auto-created in the other cluster
- waitForTopicCreated(backup, "test-topic-2");
-
- // only produce messages to the first partition
- produceMessages(primary, "test-topic-2", 1);
-
- // expect total consumed messages equals to NUM_RECORDS_PER_PARTITION
- assertEquals(NUM_RECORDS_PER_PARTITION, primary.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, "test-topic-2").count(),
- "Records were not produced to primary cluster.");
- assertEquals(NUM_RECORDS_PER_PARTITION, backup.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, "test-topic-2").count(),
- "New topic was not replicated to backup cluster.");
- }
-
- @Test
- public void testReplicationWithEmptyPartition() throws Exception {
- String consumerGroupName = "consumer-group-testReplicationWithEmptyPartition";
- Map<String, Object> consumerProps = Collections.singletonMap("group.id", consumerGroupName);
-
- // create topic
- String topic = "test-topic-with-empty-partition";
- primary.kafka().createTopic(topic, NUM_PARTITIONS);
-
- // produce to all test-topic-empty's partitions, except the last partition
- produceMessages(primary, topic, NUM_PARTITIONS - 1);
-
- // consume before starting the connectors so we don't need to wait for discovery
- int expectedRecords = NUM_RECORDS_PER_PARTITION * (NUM_PARTITIONS - 1);
- try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, topic)) {
- waitForConsumingAllRecords(primaryConsumer, expectedRecords);
- }
-
- // one way replication from primary to backup
- mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false");
- mm2Config = new MirrorMakerConfig(mm2Props);
- waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
-
- // sleep few seconds to have MM2 finish replication so that "end" consumer will consume some record
- Thread.sleep(TimeUnit.SECONDS.toMillis(3));
-
- // note that with IdentityReplicationPolicy, topics on the backup are NOT renamed to PRIMARY_CLUSTER_ALIAS + "." + topic
- String backupTopic = topic;
-
- // consume all records from backup cluster
- try (Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps,
- backupTopic)) {
- waitForConsumingAllRecords(backupConsumer, expectedRecords);
- }
-
- try (Admin backupClient = backup.kafka().createAdminClient()) {
- // retrieve the consumer group offset from backup cluster
- Map<TopicPartition, OffsetAndMetadata> remoteOffsets =
- backupClient.listConsumerGroupOffsets(consumerGroupName).partitionsToOffsetAndMetadata().get();
-
- // pinpoint the offset of the last partition which does not receive records
- OffsetAndMetadata offset = remoteOffsets.get(new TopicPartition(backupTopic, NUM_PARTITIONS - 1));
- // offset of the last partition should exist, but its value should be 0
- assertNotNull(offset, "Offset of last partition was not replicated");
- assertEquals(0, offset.offset(), "Offset of last partition is not zero");
- }
- }
-
- @Override
- public void testOneWayReplicationWithOffsetSyncs(int offsetLagMax) throws InterruptedException {
- produceMessages(primary, "test-topic-1");
- String consumerGroupName = "consumer-group-testOneWayReplicationWithAutoOffsetSync";
- Map<String, Object> consumerProps = new HashMap<String, Object>() {{
- put("group.id", consumerGroupName);
- put("auto.offset.reset", "earliest");
- }};
- // create consumers before starting the connectors so we don't need to wait for discovery
- try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps,
- "test-topic-1")) {
- // we need to wait for consuming all the records for MM2 replicating the expected offsets
- waitForConsumingAllRecords(primaryConsumer, NUM_RECORDS_PRODUCED);
- }
-
- // enable automated consumer group offset sync
- mm2Props.put("sync.group.offsets.enabled", "true");
- mm2Props.put("sync.group.offsets.interval.seconds", "1");
- mm2Props.put("offset.lag.max", Integer.toString(offsetLagMax));
- // one way replication from primary to backup
- mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false");
-
- mm2Config = new MirrorMakerConfig(mm2Props);
-
- waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
-
- // make sure the topic is created in the backup cluster with same name.
- topicShouldNotBeCreated(primary, "backup.test-topic-1");
- waitForTopicCreated(backup, "test-topic-1");
- // create a consumer at backup cluster with same consumer group Id to consume 1 topic
- try (Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(
- consumerProps, "test-topic-1")) {
-
- waitForConsumerGroupFullSync(backup, Collections.singletonList("test-topic-1"),
- consumerGroupName, NUM_RECORDS_PRODUCED, offsetLagMax);
-
- assertDownstreamRedeliveriesBoundedByMaxLag(backupConsumer, offsetLagMax);
- }
-
- // now create a new topic in primary cluster
- primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
- // make sure the topic is created in backup cluster
- waitForTopicCreated(backup, "test-topic-2");
-
- // produce some records to the new topic in primary cluster
- produceMessages(primary, "test-topic-2");
-
- // create a consumer at primary cluster to consume the new topic
- try (Consumer<byte[], byte[]> consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
- "group.id", consumerGroupName), "test-topic-2")) {
- // we need to wait for consuming all the records for MM2 replicating the expected offsets
- waitForConsumingAllRecords(consumer1, NUM_RECORDS_PRODUCED);
- }
-
- // create a consumer at backup cluster with same consumer group Id to consume old and new topic
- try (Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
- "group.id", consumerGroupName), "test-topic-1", "test-topic-2")) {
-
- waitForConsumerGroupFullSync(backup, Arrays.asList("test-topic-1", "test-topic-2"),
- consumerGroupName, NUM_RECORDS_PRODUCED, offsetLagMax);
-
- assertDownstreamRedeliveriesBoundedByMaxLag(backupConsumer, offsetLagMax);
- }
-
- assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
- }
-
/*
* Returns expected topic name on target cluster.
*/
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index b2950d4f19e..27579d3275f 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -115,6 +115,7 @@ public class MirrorConnectorsIntegrationBaseTest {
protected Map<String, String> additionalPrimaryClusterClientsConfigs = new HashMap<>();
protected Map<String, String> additionalBackupClusterClientsConfigs = new HashMap<>();
+ protected boolean replicateBackupToPrimary = true;
protected Boolean createReplicatedTopicsUpfront = false; // enable to speed up the test cases
protected Exit.Procedure exitProcedure;
private Exit.Procedure haltProcedure;
@@ -128,8 +129,6 @@ public class MirrorConnectorsIntegrationBaseTest {
public void startClusters() throws Exception {
startClusters(new HashMap<String, String>() {{
put("topics", "test-topic-.*, primary.test-topic-.*, backup.test-topic-.*");
- put(PRIMARY_CLUSTER_ALIAS + "->" + BACKUP_CLUSTER_ALIAS + ".enabled", "true");
- put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "true");
}});
}
@@ -166,6 +165,8 @@ public class MirrorConnectorsIntegrationBaseTest {
backupBrokerProps.put("auto.create.topics.enable", "false");
mm2Props.putAll(basicMM2Config());
+ mm2Props.put(PRIMARY_CLUSTER_ALIAS + "->" + BACKUP_CLUSTER_ALIAS + ".enabled", "true");
+ mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", Boolean.toString(replicateBackupToPrimary));
mm2Props.putAll(additionalMM2Config);
// exclude topic config:
@@ -255,7 +256,11 @@ public class MirrorConnectorsIntegrationBaseTest {
@Test
public void testReplication() throws Exception {
produceMessages(primary, "test-topic-1");
- produceMessages(backup, "test-topic-1");
+ String backupTopic1 = remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS);
+ if (replicateBackupToPrimary) {
+ produceMessages(backup, "test-topic-1");
+ }
+ String reverseTopic1 = remoteTopicName("test-topic-1", BACKUP_CLUSTER_ALIAS);
String consumerGroupName = "consumer-group-testReplication";
Map<String, Object> consumerProps = Collections.singletonMap("group.id", consumerGroupName);
// warm up consumers before starting the connectors so we don't need to wait for discovery
@@ -264,51 +269,59 @@ public class MirrorConnectorsIntegrationBaseTest {
mm2Config = new MirrorMakerConfig(mm2Props);
waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
- waitUntilMirrorMakerIsRunning(primary, CONNECTOR_LIST, mm2Config, BACKUP_CLUSTER_ALIAS, PRIMARY_CLUSTER_ALIAS);
+ List<Class<? extends Connector>> primaryConnectors = replicateBackupToPrimary ? CONNECTOR_LIST : Collections.singletonList(MirrorHeartbeatConnector.class);
+ waitUntilMirrorMakerIsRunning(primary, primaryConnectors, mm2Config, BACKUP_CLUSTER_ALIAS, PRIMARY_CLUSTER_ALIAS);
MirrorClient primaryClient = new MirrorClient(mm2Config.clientConfig(PRIMARY_CLUSTER_ALIAS));
MirrorClient backupClient = new MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS));
// make sure the topic is auto-created in the other cluster
- waitForTopicCreated(primary, "backup.test-topic-1");
- waitForTopicCreated(backup, "primary.test-topic-1");
+ waitForTopicCreated(primary, reverseTopic1);
+ waitForTopicCreated(backup, backupTopic1);
waitForTopicCreated(primary, "mm2-offset-syncs.backup.internal");
- assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, getTopicConfig(backup.kafka(), "primary.test-topic-1", TopicConfig.CLEANUP_POLICY_CONFIG),
+ assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, getTopicConfig(backup.kafka(), backupTopic1, TopicConfig.CLEANUP_POLICY_CONFIG),
"topic config was not synced");
createAndTestNewTopicWithConfigFilter();
assertEquals(NUM_RECORDS_PRODUCED, primary.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count(),
"Records were not produced to primary cluster.");
- assertEquals(NUM_RECORDS_PRODUCED, backup.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "primary.test-topic-1").count(),
+ assertEquals(NUM_RECORDS_PRODUCED, backup.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, backupTopic1).count(),
"Records were not replicated to backup cluster.");
assertEquals(NUM_RECORDS_PRODUCED, backup.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count(),
"Records were not produced to backup cluster.");
- assertEquals(NUM_RECORDS_PRODUCED, primary.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "backup.test-topic-1").count(),
- "Records were not replicated to primary cluster.");
-
- assertEquals(NUM_RECORDS_PRODUCED * 2, primary.kafka().consume(NUM_RECORDS_PRODUCED * 2, RECORD_TRANSFER_DURATION_MS, "backup.test-topic-1", "test-topic-1").count(),
- "Primary cluster doesn't have all records from both clusters.");
- assertEquals(NUM_RECORDS_PRODUCED * 2, backup.kafka().consume(NUM_RECORDS_PRODUCED * 2, RECORD_TRANSFER_DURATION_MS, "primary.test-topic-1", "test-topic-1").count(),
- "Backup cluster doesn't have all records from both clusters.");
-
+ if (replicateBackupToPrimary) {
+ assertEquals(NUM_RECORDS_PRODUCED, primary.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, reverseTopic1).count(),
+ "Records were not replicated to primary cluster.");
+ assertEquals(NUM_RECORDS_PRODUCED * 2, primary.kafka().consume(NUM_RECORDS_PRODUCED * 2, RECORD_TRANSFER_DURATION_MS, reverseTopic1, "test-topic-1").count(),
+ "Primary cluster doesn't have all records from both clusters.");
+ assertEquals(NUM_RECORDS_PRODUCED * 2, backup.kafka().consume(NUM_RECORDS_PRODUCED * 2, RECORD_TRANSFER_DURATION_MS, backupTopic1, "test-topic-1").count(),
+ "Backup cluster doesn't have all records from both clusters.");
+ }
+
assertTrue(primary.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "heartbeats").count() > 0,
"Heartbeats were not emitted to primary cluster.");
assertTrue(backup.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "heartbeats").count() > 0,
"Heartbeats were not emitted to backup cluster.");
assertTrue(backup.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "primary.heartbeats").count() > 0,
"Heartbeats were not replicated downstream to backup cluster.");
- assertTrue(primary.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "backup.heartbeats").count() > 0,
- "Heartbeats were not replicated downstream to primary cluster.");
+ if (replicateBackupToPrimary) {
+ assertTrue(primary.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "backup.heartbeats").count() > 0,
+ "Heartbeats were not replicated downstream to primary cluster.");
+ }
assertTrue(backupClient.upstreamClusters().contains(PRIMARY_CLUSTER_ALIAS), "Did not find upstream primary cluster.");
assertEquals(1, backupClient.replicationHops(PRIMARY_CLUSTER_ALIAS), "Did not calculate replication hops correctly.");
- assertTrue(primaryClient.upstreamClusters().contains(BACKUP_CLUSTER_ALIAS), "Did not find upstream backup cluster.");
- assertEquals(1, primaryClient.replicationHops(BACKUP_CLUSTER_ALIAS), "Did not calculate replication hops correctly.");
assertTrue(backup.kafka().consume(1, CHECKPOINT_DURATION_MS, "primary.checkpoints.internal").count() > 0,
"Checkpoints were not emitted downstream to backup cluster.");
+ if (replicateBackupToPrimary) {
+ assertTrue(primaryClient.upstreamClusters().contains(BACKUP_CLUSTER_ALIAS), "Did not find upstream backup cluster.");
+ assertEquals(1, primaryClient.replicationHops(BACKUP_CLUSTER_ALIAS), "Did not calculate replication hops correctly.");
+ assertTrue(primary.kafka().consume(1, CHECKPOINT_DURATION_MS, "backup.checkpoints.internal").count() > 0,
+ "Checkpoints were not emitted upstream to primary cluster.");
+ }
Map<TopicPartition, OffsetAndMetadata> backupOffsets = waitForCheckpointOnAllPartitions(
- backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS, "primary.test-topic-1");
+ backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS, backupTopic1);
// Failover consumer group to backup cluster.
try (Consumer<byte[], byte[]> primaryConsumer = backup.kafka().createConsumer(Collections.singletonMap("group.id", consumerGroupName))) {
@@ -317,55 +330,61 @@ public class MirrorConnectorsIntegrationBaseTest {
primaryConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
primaryConsumer.commitAsync();
- assertTrue(primaryConsumer.position(new TopicPartition("primary.test-topic-1", 0)) > 0, "Consumer failedover to zero offset.");
+ assertTrue(primaryConsumer.position(new TopicPartition(backupTopic1, 0)) > 0, "Consumer failedover to zero offset.");
assertTrue(primaryConsumer.position(
- new TopicPartition("primary.test-topic-1", 0)) <= NUM_RECORDS_PRODUCED, "Consumer failedover beyond expected offset.");
- assertTrue(primary.kafka().consume(1, CHECKPOINT_DURATION_MS, "backup.checkpoints.internal").count() > 0,
- "Checkpoints were not emitted upstream to primary cluster.");
+ new TopicPartition(backupTopic1, 0)) <= NUM_RECORDS_PRODUCED, "Consumer failedover beyond expected offset.");
}
- Map<TopicPartition, OffsetAndMetadata> primaryOffsets = waitForCheckpointOnAllPartitions(
- primaryClient, consumerGroupName, BACKUP_CLUSTER_ALIAS, "backup.test-topic-1");
-
assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
primaryClient.close();
backupClient.close();
-
- // Failback consumer group to primary cluster
- try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumer(Collections.singletonMap("group.id", consumerGroupName))) {
- primaryConsumer.assign(primaryOffsets.keySet());
- primaryOffsets.forEach(primaryConsumer::seek);
- primaryConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
- primaryConsumer.commitAsync();
- assertTrue(primaryConsumer.position(new TopicPartition("backup.test-topic-1", 0)) > 0, "Consumer failedback to zero downstream offset.");
- assertTrue(primaryConsumer.position(
- new TopicPartition("backup.test-topic-1", 0)) <= NUM_RECORDS_PRODUCED, "Consumer failedback beyond expected downstream offset.");
+ if (replicateBackupToPrimary) {
+ Map<TopicPartition, OffsetAndMetadata> primaryOffsets = waitForCheckpointOnAllPartitions(
+ primaryClient, consumerGroupName, BACKUP_CLUSTER_ALIAS, reverseTopic1);
+
+ // Failback consumer group to primary cluster
+ try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumer(Collections.singletonMap("group.id", consumerGroupName))) {
+ primaryConsumer.assign(primaryOffsets.keySet());
+ primaryOffsets.forEach(primaryConsumer::seek);
+ primaryConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+ primaryConsumer.commitAsync();
+
+ assertTrue(primaryConsumer.position(new TopicPartition(reverseTopic1, 0)) > 0, "Consumer failedback to zero downstream offset.");
+ assertTrue(primaryConsumer.position(
+ new TopicPartition(reverseTopic1, 0)) <= NUM_RECORDS_PRODUCED, "Consumer failedback beyond expected downstream offset.");
+ }
+
}
-
+
// create more matching topics
primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
- backup.kafka().createTopic("test-topic-3", NUM_PARTITIONS);
+ String backupTopic2 = remoteTopicName("test-topic-2", PRIMARY_CLUSTER_ALIAS);
// make sure the topic is auto-created in the other cluster
- waitForTopicCreated(backup, "primary.test-topic-2");
- waitForTopicCreated(primary, "backup.test-topic-3");
+ waitForTopicCreated(backup, backupTopic2);
// only produce messages to the first partition
produceMessages(primary, "test-topic-2", 1);
- produceMessages(backup, "test-topic-3", 1);
-
+
// expect total consumed messages equals to NUM_RECORDS_PER_PARTITION
assertEquals(NUM_RECORDS_PER_PARTITION, primary.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, "test-topic-2").count(),
"Records were not produced to primary cluster.");
- assertEquals(NUM_RECORDS_PER_PARTITION, backup.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, "test-topic-3").count(),
- "Records were not produced to backup cluster.");
-
- assertEquals(NUM_RECORDS_PER_PARTITION, primary.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, "backup.test-topic-3").count(),
- "New topic was not replicated to primary cluster.");
- assertEquals(NUM_RECORDS_PER_PARTITION, backup.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count(),
+ assertEquals(NUM_RECORDS_PER_PARTITION, backup.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, backupTopic2).count(),
"New topic was not replicated to backup cluster.");
+
+ if (replicateBackupToPrimary) {
+ backup.kafka().createTopic("test-topic-3", NUM_PARTITIONS);
+ String reverseTopic3 = remoteTopicName("test-topic-3", BACKUP_CLUSTER_ALIAS);
+ waitForTopicCreated(primary, reverseTopic3);
+ produceMessages(backup, "test-topic-3", 1);
+ assertEquals(NUM_RECORDS_PER_PARTITION, backup.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, "test-topic-3").count(),
+ "Records were not produced to backup cluster.");
+
+ assertEquals(NUM_RECORDS_PER_PARTITION, primary.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, reverseTopic3).count(),
+ "New topic was not replicated to primary cluster.");
+ }
}
@Test
@@ -394,7 +413,7 @@ public class MirrorConnectorsIntegrationBaseTest {
// sleep few seconds to have MM2 finish replication so that "end" consumer will consume some record
Thread.sleep(TimeUnit.SECONDS.toMillis(3));
- String backupTopic = PRIMARY_CLUSTER_ALIAS + "." + topic;
+ String backupTopic = remoteTopicName(topic, PRIMARY_CLUSTER_ALIAS);
// consume all records from backup cluster
try (Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps,
@@ -425,8 +444,9 @@ public class MirrorConnectorsIntegrationBaseTest {
testOneWayReplicationWithOffsetSyncs(0);
}
- public void testOneWayReplicationWithOffsetSyncs(int offsetLagMax) throws InterruptedException {
+ private void testOneWayReplicationWithOffsetSyncs(int offsetLagMax) throws InterruptedException {
produceMessages(primary, "test-topic-1");
+ String backupTopic1 = remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS);
String consumerGroupName = "consumer-group-testOneWayReplicationWithAutoOffsetSync";
Map<String, Object> consumerProps = new HashMap<String, Object>() {{
put("group.id", consumerGroupName);
@@ -452,13 +472,16 @@ public class MirrorConnectorsIntegrationBaseTest {
waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
// make sure the topic is created in the primary cluster only
- topicShouldNotBeCreated(primary, "backup.test-topic-1");
- waitForTopicCreated(backup, "primary.test-topic-1");
+ String reverseTopic1 = remoteTopicName("test-topic-1", BACKUP_CLUSTER_ALIAS);
+ if (!"test-topic-1".equals(reverseTopic1)) {
+ topicShouldNotBeCreated(primary, reverseTopic1);
+ }
+ waitForTopicCreated(backup, backupTopic1);
// create a consumer at backup cluster with same consumer group Id to consume 1 topic
try (Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(
- consumerProps, "primary.test-topic-1")) {
+ consumerProps, backupTopic1)) {
- waitForConsumerGroupFullSync(backup, Collections.singletonList("primary.test-topic-1"),
+ waitForConsumerGroupFullSync(backup, Collections.singletonList(backupTopic1),
consumerGroupName, NUM_RECORDS_PRODUCED, offsetLagMax);
assertDownstreamRedeliveriesBoundedByMaxLag(backupConsumer, offsetLagMax);
@@ -467,7 +490,8 @@ public class MirrorConnectorsIntegrationBaseTest {
// now create a new topic in primary cluster
primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
// make sure the topic is created in backup cluster
- waitForTopicCreated(backup, "primary.test-topic-2");
+ String remoteTopic2 = remoteTopicName("test-topic-2", PRIMARY_CLUSTER_ALIAS);
+ waitForTopicCreated(backup, remoteTopic2);
// produce some records to the new topic in primary cluster
produceMessages(primary, "test-topic-2");
@@ -481,9 +505,9 @@ public class MirrorConnectorsIntegrationBaseTest {
// create a consumer at backup cluster with same consumer group Id to consume old and new topic
try (Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
- "group.id", consumerGroupName), "primary.test-topic-1", "primary.test-topic-2")) {
+ "group.id", consumerGroupName), backupTopic1, remoteTopic2)) {
- waitForConsumerGroupFullSync(backup, Arrays.asList("primary.test-topic-1", "primary.test-topic-2"),
+ waitForConsumerGroupFullSync(backup, Arrays.asList(backupTopic1, remoteTopic2),
consumerGroupName, NUM_RECORDS_PRODUCED, offsetLagMax);
assertDownstreamRedeliveriesBoundedByMaxLag(backupConsumer, offsetLagMax);
@@ -694,7 +718,7 @@ public class MirrorConnectorsIntegrationBaseTest {
}
}
- protected static void restartMirrorMakerConnectors(EmbeddedConnectCluster connectCluster, List<Class<? extends Connector>> connectorClasses) {
+ private static void restartMirrorMakerConnectors(EmbeddedConnectCluster connectCluster, List<Class<? extends Connector>> connectorClasses) {
for (Class<? extends Connector> connector : connectorClasses) {
connectCluster.restartConnectorAndTasks(connector.getSimpleName(), false, true, false);
}
@@ -717,7 +741,7 @@ public class MirrorConnectorsIntegrationBaseTest {
/*
* delete all topics of the input kafka cluster
*/
- public static void deleteAllTopics(EmbeddedKafkaCluster cluster) throws Exception {
+ private static void deleteAllTopics(EmbeddedKafkaCluster cluster) throws Exception {
try (final Admin adminClient = cluster.createAdminClient()) {
Set<String> topicsToBeDeleted = adminClient.listTopics().names().get();
log.debug("Deleting topics: {} ", topicsToBeDeleted);
@@ -752,7 +776,7 @@ public class MirrorConnectorsIntegrationBaseTest {
/*
* produce messages to the cluster and topic partition less than numPartitions
*/
- protected void produceMessages(EmbeddedConnectCluster cluster, String topicName, int numPartitions) {
+ private void produceMessages(EmbeddedConnectCluster cluster, String topicName, int numPartitions) {
int cnt = 0;
for (int r = 0; r < NUM_RECORDS_PER_PARTITION; r++)
for (int p = 0; p < numPartitions; p++)
@@ -772,7 +796,7 @@ public class MirrorConnectorsIntegrationBaseTest {
cluster.produce(topic, partition, key, value);
}
- protected static Map<TopicPartition, OffsetAndMetadata> waitForCheckpointOnAllPartitions(
+ private static Map<TopicPartition, OffsetAndMetadata> waitForCheckpointOnAllPartitions(
MirrorClient client, String consumerGroupName, String remoteClusterAlias, String topicName
) throws InterruptedException {
AtomicReference<Map<TopicPartition, OffsetAndMetadata>> ret = new AtomicReference<>();
@@ -804,7 +828,7 @@ public class MirrorConnectorsIntegrationBaseTest {
* given consumer group, topics and expected number of records, make sure the consumer group
* offsets are eventually synced to the expected offset numbers
*/
- protected static <T> void waitForConsumerGroupFullSync(
+ private static <T> void waitForConsumerGroupFullSync(
EmbeddedConnectCluster connect,
List<String> topics,
String consumerGroupId,
@@ -853,7 +877,7 @@ public class MirrorConnectorsIntegrationBaseTest {
}
}
- protected static void assertMonotonicCheckpoints(EmbeddedConnectCluster cluster, String checkpointTopic) {
+ private static void assertMonotonicCheckpoints(EmbeddedConnectCluster cluster, String checkpointTopic) {
TopicPartition checkpointTopicPartition = new TopicPartition(checkpointTopic, 0);
try (Consumer<byte[], byte[]> backupConsumer = cluster.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
"auto.offset.reset", "earliest"), checkpointTopic)) {
@@ -879,7 +903,7 @@ public class MirrorConnectorsIntegrationBaseTest {
}
}
- protected static void assertDownstreamRedeliveriesBoundedByMaxLag(Consumer<byte[], byte[]> targetConsumer, int offsetLagMax) {
+ private static void assertDownstreamRedeliveriesBoundedByMaxLag(Consumer<byte[], byte[]> targetConsumer, int offsetLagMax) {
ConsumerRecords<byte[], byte[]> records = targetConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
// After a full sync, there should be at most offset.lag.max records per partition consumed by both upstream and downstream consumers.
for (TopicPartition tp : records.partitions()) {
@@ -891,7 +915,7 @@ public class MirrorConnectorsIntegrationBaseTest {
/*
* make sure the consumer to consume expected number of records
*/
- protected static <T> void waitForConsumingAllRecords(Consumer<T, T> consumer, int numExpectedRecords)
+ private static <T> void waitForConsumingAllRecords(Consumer<T, T> consumer, int numExpectedRecords)
throws InterruptedException {
final AtomicInteger totalConsumedRecords = new AtomicInteger(0);
waitForCondition(() -> {
@@ -904,7 +928,7 @@ public class MirrorConnectorsIntegrationBaseTest {
/*
* MM2 config to use in integration tests
*/
- protected static Map<String, String> basicMM2Config() {
+ private static Map<String, String> basicMM2Config() {
Map<String, String> mm2Props = new HashMap<>();
mm2Props.put("clusters", PRIMARY_CLUSTER_ALIAS + ", " + BACKUP_CLUSTER_ALIAS);
mm2Props.put("max.tasks", "10");
@@ -944,8 +968,8 @@ public class MirrorConnectorsIntegrationBaseTest {
// This can speed up some test cases
if (createReplicatedTopicsUpfront) {
- primary.kafka().createTopic("backup.test-topic-1", 1, 1, emptyMap, adminClientConfig);
- backup.kafka().createTopic("primary.test-topic-1", 1, 1, emptyMap, adminClientConfig);
+ primary.kafka().createTopic(remoteTopicName("test-topic-1", BACKUP_CLUSTER_ALIAS), 1, 1, emptyMap, adminClientConfig);
+ backup.kafka().createTopic(remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS), 1, 1, emptyMap, adminClientConfig);
}
}
@@ -966,7 +990,7 @@ public class MirrorConnectorsIntegrationBaseTest {
/*
* making sure the topic isn't created on the cluster
*/
- protected static void topicShouldNotBeCreated(EmbeddedConnectCluster cluster, String topicName) throws InterruptedException {
+ private static void topicShouldNotBeCreated(EmbeddedConnectCluster cluster, String topicName) throws InterruptedException {
try (final Admin adminClient = cluster.kafka().createAdminClient()) {
waitForCondition(() ->
!adminClient.listTopics().names()
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java
index b9efda4a4d7..ab149e68753 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java
@@ -146,8 +146,6 @@ public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest extends Mi
backupWorkerProps.putAll(superUserConfig());
HashMap<String, String> additionalConfig = new HashMap<String, String>(superUserConfig()) {{
- put(PRIMARY_CLUSTER_ALIAS + "->" + BACKUP_CLUSTER_ALIAS + ".enabled", "true");
- put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "true");
put(FORWARDING_ADMIN_CLASS, FakeForwardingAdminWithLocalMetadata.class.getName());
}};