You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2023/05/01 16:42:39 UTC

[kafka] 02/03: MINOR: Refactor Mirror integration tests to reduce duplication (#13428)

This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit c7347c266f82c977500c76ce22309983e66e7ef0
Author: Greg Harris <gr...@aiven.io>
AuthorDate: Fri Mar 24 08:18:26 2023 -0700

    MINOR: Refactor Mirror integration tests to reduce duplication (#13428)
    
    
    Reviewers: Mickael Maison <mi...@gmail.com>
---
 .../IdentityReplicationIntegrationTest.java        | 213 +--------------------
 .../MirrorConnectorsIntegrationBaseTest.java       | 166 +++++++++-------
 ...rsWithCustomForwardingAdminIntegrationTest.java |   2 -
 3 files changed, 96 insertions(+), 285 deletions(-)

diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
index f2c8753fe36..8dc04e60747 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
@@ -16,28 +16,12 @@
  */
 package org.apache.kafka.connect.mirror.integration;
 
-import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.config.TopicConfig;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.connect.mirror.IdentityReplicationPolicy;
-import org.apache.kafka.connect.mirror.MirrorClient;
-import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
-import org.apache.kafka.connect.mirror.MirrorMakerConfig;
 
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
 
 import org.junit.jupiter.api.Tag;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.BeforeEach;
 
 /**
@@ -52,208 +36,13 @@ import org.junit.jupiter.api.BeforeEach;
 public class IdentityReplicationIntegrationTest extends MirrorConnectorsIntegrationBaseTest {
     @BeforeEach
     public void startClusters() throws Exception {
+        replicateBackupToPrimary = false;
         super.startClusters(new HashMap<String, String>() {{
                 put("replication.policy.class", IdentityReplicationPolicy.class.getName());
                 put("topics", "test-topic-.*");
-                put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false");
-                put(PRIMARY_CLUSTER_ALIAS + "->" + BACKUP_CLUSTER_ALIAS + ".enabled", "true");
             }});
     }
 
-    @Test
-    public void testReplication() throws Exception {
-        produceMessages(primary, "test-topic-1");
-        String consumerGroupName = "consumer-group-testReplication";
-        Map<String, Object> consumerProps = new HashMap<String, Object>() {{
-                put("group.id", consumerGroupName);
-                put("auto.offset.reset", "latest");
-            }};
-        // warm up consumers before starting the connectors so we don't need to wait for discovery
-        warmUpConsumer(consumerProps);
-
-        mm2Config = new MirrorMakerConfig(mm2Props);
-
-        waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
-        waitUntilMirrorMakerIsRunning(primary, Collections.singletonList(MirrorHeartbeatConnector.class), mm2Config, BACKUP_CLUSTER_ALIAS, PRIMARY_CLUSTER_ALIAS);
-
-        MirrorClient primaryClient = new MirrorClient(mm2Config.clientConfig(PRIMARY_CLUSTER_ALIAS));
-        MirrorClient backupClient = new MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS));
-
-        // make sure the topic is auto-created in the other cluster
-        waitForTopicCreated(primary, "test-topic-1");
-        waitForTopicCreated(backup, "test-topic-1");
-        assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, getTopicConfig(backup.kafka(), "test-topic-1", TopicConfig.CLEANUP_POLICY_CONFIG),
-                "topic config was not synced");
-        createAndTestNewTopicWithConfigFilter();
-
-        assertEquals(NUM_RECORDS_PRODUCED, primary.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count(),
-                "Records were not produced to primary cluster.");
-        assertEquals(NUM_RECORDS_PRODUCED, backup.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count(),
-                "Records were not replicated to backup cluster.");
-
-        assertTrue(primary.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "heartbeats").count() > 0,
-                "Heartbeats were not emitted to primary cluster.");
-        assertTrue(backup.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "heartbeats").count() > 0,
-                "Heartbeats were not emitted to backup cluster.");
-        assertTrue(backup.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "primary.heartbeats").count() > 0,
-                "Heartbeats were not replicated downstream to backup cluster.");
-        assertTrue(primary.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "heartbeats").count() > 0,
-                "Heartbeats were not replicated downstream to primary cluster.");
-
-        assertTrue(backupClient.upstreamClusters().contains(PRIMARY_CLUSTER_ALIAS), "Did not find upstream primary cluster.");
-        assertEquals(1, backupClient.replicationHops(PRIMARY_CLUSTER_ALIAS), "Did not calculate replication hops correctly.");
-        assertTrue(backup.kafka().consume(1, CHECKPOINT_DURATION_MS, "primary.checkpoints.internal").count() > 0,
-                "Checkpoints were not emitted downstream to backup cluster.");
-
-        Map<TopicPartition, OffsetAndMetadata> backupOffsets = waitForCheckpointOnAllPartitions(
-                backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS, "test-topic-1");
-
-        // Failover consumer group to backup cluster.
-        try (Consumer<byte[], byte[]> primaryConsumer = backup.kafka().createConsumer(Collections.singletonMap("group.id", consumerGroupName))) {
-            primaryConsumer.assign(backupOffsets.keySet());
-            backupOffsets.forEach(primaryConsumer::seek);
-            primaryConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
-            primaryConsumer.commitAsync();
-
-            assertTrue(primaryConsumer.position(new TopicPartition("test-topic-1", 0)) > 0, "Consumer failedover to zero offset.");
-            assertTrue(primaryConsumer.position(
-                    new TopicPartition("test-topic-1", 0)) <= NUM_RECORDS_PRODUCED, "Consumer failedover beyond expected offset.");
-        }
-
-        primaryClient.close();
-        backupClient.close();
-
-        // create more matching topics
-        primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
-
-        // make sure the topic is auto-created in the other cluster
-        waitForTopicCreated(backup, "test-topic-2");
-
-        // only produce messages to the first partition
-        produceMessages(primary, "test-topic-2", 1);
-
-        // expect total consumed messages equals to NUM_RECORDS_PER_PARTITION
-        assertEquals(NUM_RECORDS_PER_PARTITION, primary.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, "test-topic-2").count(),
-                "Records were not produced to primary cluster.");
-        assertEquals(NUM_RECORDS_PER_PARTITION, backup.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, "test-topic-2").count(),
-                "New topic was not replicated to backup cluster.");
-    }
-
-    @Test
-    public void testReplicationWithEmptyPartition() throws Exception {
-        String consumerGroupName = "consumer-group-testReplicationWithEmptyPartition";
-        Map<String, Object> consumerProps  = Collections.singletonMap("group.id", consumerGroupName);
-
-        // create topic
-        String topic = "test-topic-with-empty-partition";
-        primary.kafka().createTopic(topic, NUM_PARTITIONS);
-
-        // produce to all test-topic-empty's partitions, except the last partition
-        produceMessages(primary, topic, NUM_PARTITIONS - 1);
-
-        // consume before starting the connectors so we don't need to wait for discovery
-        int expectedRecords = NUM_RECORDS_PER_PARTITION * (NUM_PARTITIONS - 1);
-        try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, topic)) {
-            waitForConsumingAllRecords(primaryConsumer, expectedRecords);
-        }
-
-        // one way replication from primary to backup
-        mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false");
-        mm2Config = new MirrorMakerConfig(mm2Props);
-        waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
-
-        // sleep few seconds to have MM2 finish replication so that "end" consumer will consume some record
-        Thread.sleep(TimeUnit.SECONDS.toMillis(3));
-
-        // note that with IdentityReplicationPolicy, topics on the backup are NOT renamed to PRIMARY_CLUSTER_ALIAS + "." + topic
-        String backupTopic = topic;
-
-        // consume all records from backup cluster
-        try (Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps,
-                backupTopic)) {
-            waitForConsumingAllRecords(backupConsumer, expectedRecords);
-        }
-
-        try (Admin backupClient = backup.kafka().createAdminClient()) {
-            // retrieve the consumer group offset from backup cluster
-            Map<TopicPartition, OffsetAndMetadata> remoteOffsets =
-                    backupClient.listConsumerGroupOffsets(consumerGroupName).partitionsToOffsetAndMetadata().get();
-
-            // pinpoint the offset of the last partition which does not receive records
-            OffsetAndMetadata offset = remoteOffsets.get(new TopicPartition(backupTopic, NUM_PARTITIONS - 1));
-            // offset of the last partition should exist, but its value should be 0
-            assertNotNull(offset, "Offset of last partition was not replicated");
-            assertEquals(0, offset.offset(), "Offset of last partition is not zero");
-        }
-    }
-
-    @Override
-    public void testOneWayReplicationWithOffsetSyncs(int offsetLagMax) throws InterruptedException {
-        produceMessages(primary, "test-topic-1");
-        String consumerGroupName = "consumer-group-testOneWayReplicationWithAutoOffsetSync";
-        Map<String, Object> consumerProps  = new HashMap<String, Object>() {{
-                put("group.id", consumerGroupName);
-                put("auto.offset.reset", "earliest");
-            }};
-        // create consumers before starting the connectors so we don't need to wait for discovery
-        try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps,
-                "test-topic-1")) {
-            // we need to wait for consuming all the records for MM2 replicating the expected offsets
-            waitForConsumingAllRecords(primaryConsumer, NUM_RECORDS_PRODUCED);
-        }
-
-        // enable automated consumer group offset sync
-        mm2Props.put("sync.group.offsets.enabled", "true");
-        mm2Props.put("sync.group.offsets.interval.seconds", "1");
-        mm2Props.put("offset.lag.max", Integer.toString(offsetLagMax));
-        // one way replication from primary to backup
-        mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false");
-
-        mm2Config = new MirrorMakerConfig(mm2Props);
-
-        waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
-
-        // make sure the topic is created in the backup cluster with same name.
-        topicShouldNotBeCreated(primary, "backup.test-topic-1");
-        waitForTopicCreated(backup, "test-topic-1");
-        // create a consumer at backup cluster with same consumer group Id to consume 1 topic
-        try (Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(
-                consumerProps, "test-topic-1")) {
-
-            waitForConsumerGroupFullSync(backup, Collections.singletonList("test-topic-1"),
-                    consumerGroupName, NUM_RECORDS_PRODUCED, offsetLagMax);
-
-            assertDownstreamRedeliveriesBoundedByMaxLag(backupConsumer, offsetLagMax);
-        }
-
-        // now create a new topic in primary cluster
-        primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
-        // make sure the topic is created in backup cluster
-        waitForTopicCreated(backup, "test-topic-2");
-
-        // produce some records to the new topic in primary cluster
-        produceMessages(primary, "test-topic-2");
-
-        // create a consumer at primary cluster to consume the new topic
-        try (Consumer<byte[], byte[]> consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-                "group.id", consumerGroupName), "test-topic-2")) {
-            // we need to wait for consuming all the records for MM2 replicating the expected offsets
-            waitForConsumingAllRecords(consumer1, NUM_RECORDS_PRODUCED);
-        }
-
-        // create a consumer at backup cluster with same consumer group Id to consume old and new topic
-        try (Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-                "group.id", consumerGroupName), "test-topic-1", "test-topic-2")) {
-
-            waitForConsumerGroupFullSync(backup, Arrays.asList("test-topic-1", "test-topic-2"),
-                    consumerGroupName, NUM_RECORDS_PRODUCED, offsetLagMax);
-
-            assertDownstreamRedeliveriesBoundedByMaxLag(backupConsumer, offsetLagMax);
-        }
-
-        assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
-    }
-
     /*
      * Returns expected topic name on target cluster.
      */
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index b2950d4f19e..27579d3275f 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -115,6 +115,7 @@ public class MirrorConnectorsIntegrationBaseTest {
 
     protected Map<String, String> additionalPrimaryClusterClientsConfigs = new HashMap<>();
     protected Map<String, String> additionalBackupClusterClientsConfigs = new HashMap<>();
+    protected boolean replicateBackupToPrimary = true;
     protected Boolean createReplicatedTopicsUpfront = false; // enable to speed up the test cases
     protected Exit.Procedure exitProcedure;
     private Exit.Procedure haltProcedure;
@@ -128,8 +129,6 @@ public class MirrorConnectorsIntegrationBaseTest {
     public void startClusters() throws Exception {
         startClusters(new HashMap<String, String>() {{
                 put("topics", "test-topic-.*, primary.test-topic-.*, backup.test-topic-.*");
-                put(PRIMARY_CLUSTER_ALIAS + "->" + BACKUP_CLUSTER_ALIAS + ".enabled", "true");
-                put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "true");
             }});
     }
 
@@ -166,6 +165,8 @@ public class MirrorConnectorsIntegrationBaseTest {
         backupBrokerProps.put("auto.create.topics.enable", "false");
 
         mm2Props.putAll(basicMM2Config());
+        mm2Props.put(PRIMARY_CLUSTER_ALIAS + "->" + BACKUP_CLUSTER_ALIAS + ".enabled", "true");
+        mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", Boolean.toString(replicateBackupToPrimary));
         mm2Props.putAll(additionalMM2Config);
 
         // exclude topic config:
@@ -255,7 +256,11 @@ public class MirrorConnectorsIntegrationBaseTest {
     @Test
     public void testReplication() throws Exception {
         produceMessages(primary, "test-topic-1");
-        produceMessages(backup, "test-topic-1");
+        String backupTopic1 = remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS);
+        if (replicateBackupToPrimary) {
+            produceMessages(backup, "test-topic-1");
+        }
+        String reverseTopic1 = remoteTopicName("test-topic-1", BACKUP_CLUSTER_ALIAS);
         String consumerGroupName = "consumer-group-testReplication";
         Map<String, Object> consumerProps = Collections.singletonMap("group.id", consumerGroupName);
         // warm up consumers before starting the connectors so we don't need to wait for discovery
@@ -264,51 +269,59 @@ public class MirrorConnectorsIntegrationBaseTest {
         mm2Config = new MirrorMakerConfig(mm2Props);
 
         waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
-        waitUntilMirrorMakerIsRunning(primary, CONNECTOR_LIST, mm2Config, BACKUP_CLUSTER_ALIAS, PRIMARY_CLUSTER_ALIAS); 
+        List<Class<? extends Connector>> primaryConnectors = replicateBackupToPrimary ? CONNECTOR_LIST : Collections.singletonList(MirrorHeartbeatConnector.class);
+        waitUntilMirrorMakerIsRunning(primary, primaryConnectors, mm2Config, BACKUP_CLUSTER_ALIAS, PRIMARY_CLUSTER_ALIAS);
 
         MirrorClient primaryClient = new MirrorClient(mm2Config.clientConfig(PRIMARY_CLUSTER_ALIAS));
         MirrorClient backupClient = new MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS));
 
         // make sure the topic is auto-created in the other cluster
-        waitForTopicCreated(primary, "backup.test-topic-1");
-        waitForTopicCreated(backup, "primary.test-topic-1");
+        waitForTopicCreated(primary, reverseTopic1);
+        waitForTopicCreated(backup, backupTopic1);
         waitForTopicCreated(primary, "mm2-offset-syncs.backup.internal");
-        assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, getTopicConfig(backup.kafka(), "primary.test-topic-1", TopicConfig.CLEANUP_POLICY_CONFIG),
+        assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, getTopicConfig(backup.kafka(), backupTopic1, TopicConfig.CLEANUP_POLICY_CONFIG),
                 "topic config was not synced");
         createAndTestNewTopicWithConfigFilter();
 
         assertEquals(NUM_RECORDS_PRODUCED, primary.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count(),
             "Records were not produced to primary cluster.");
-        assertEquals(NUM_RECORDS_PRODUCED, backup.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "primary.test-topic-1").count(),
+        assertEquals(NUM_RECORDS_PRODUCED, backup.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, backupTopic1).count(),
             "Records were not replicated to backup cluster.");
         assertEquals(NUM_RECORDS_PRODUCED, backup.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count(),
             "Records were not produced to backup cluster.");
-        assertEquals(NUM_RECORDS_PRODUCED, primary.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "backup.test-topic-1").count(),
-            "Records were not replicated to primary cluster.");
-        
-        assertEquals(NUM_RECORDS_PRODUCED * 2, primary.kafka().consume(NUM_RECORDS_PRODUCED * 2, RECORD_TRANSFER_DURATION_MS, "backup.test-topic-1", "test-topic-1").count(),
-            "Primary cluster doesn't have all records from both clusters.");
-        assertEquals(NUM_RECORDS_PRODUCED * 2, backup.kafka().consume(NUM_RECORDS_PRODUCED * 2, RECORD_TRANSFER_DURATION_MS, "primary.test-topic-1", "test-topic-1").count(),
-            "Backup cluster doesn't have all records from both clusters.");
-        
+        if (replicateBackupToPrimary) {
+            assertEquals(NUM_RECORDS_PRODUCED, primary.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, reverseTopic1).count(),
+                    "Records were not replicated to primary cluster.");
+            assertEquals(NUM_RECORDS_PRODUCED * 2, primary.kafka().consume(NUM_RECORDS_PRODUCED * 2, RECORD_TRANSFER_DURATION_MS, reverseTopic1, "test-topic-1").count(),
+                "Primary cluster doesn't have all records from both clusters.");
+            assertEquals(NUM_RECORDS_PRODUCED * 2, backup.kafka().consume(NUM_RECORDS_PRODUCED * 2, RECORD_TRANSFER_DURATION_MS, backupTopic1, "test-topic-1").count(),
+                "Backup cluster doesn't have all records from both clusters.");
+        }
+
         assertTrue(primary.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "heartbeats").count() > 0,
             "Heartbeats were not emitted to primary cluster.");
         assertTrue(backup.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "heartbeats").count() > 0,
             "Heartbeats were not emitted to backup cluster.");
         assertTrue(backup.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "primary.heartbeats").count() > 0,
             "Heartbeats were not replicated downstream to backup cluster.");
-        assertTrue(primary.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "backup.heartbeats").count() > 0,
-            "Heartbeats were not replicated downstream to primary cluster.");
+        if (replicateBackupToPrimary) {
+            assertTrue(primary.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "backup.heartbeats").count() > 0,
+                    "Heartbeats were not replicated downstream to primary cluster.");
+        }
         
         assertTrue(backupClient.upstreamClusters().contains(PRIMARY_CLUSTER_ALIAS), "Did not find upstream primary cluster.");
         assertEquals(1, backupClient.replicationHops(PRIMARY_CLUSTER_ALIAS), "Did not calculate replication hops correctly.");
-        assertTrue(primaryClient.upstreamClusters().contains(BACKUP_CLUSTER_ALIAS), "Did not find upstream backup cluster.");
-        assertEquals(1, primaryClient.replicationHops(BACKUP_CLUSTER_ALIAS), "Did not calculate replication hops correctly.");
         assertTrue(backup.kafka().consume(1, CHECKPOINT_DURATION_MS, "primary.checkpoints.internal").count() > 0,
             "Checkpoints were not emitted downstream to backup cluster.");
+        if (replicateBackupToPrimary) {
+            assertTrue(primaryClient.upstreamClusters().contains(BACKUP_CLUSTER_ALIAS), "Did not find upstream backup cluster.");
+            assertEquals(1, primaryClient.replicationHops(BACKUP_CLUSTER_ALIAS), "Did not calculate replication hops correctly.");
+            assertTrue(primary.kafka().consume(1, CHECKPOINT_DURATION_MS, "backup.checkpoints.internal").count() > 0,
+                    "Checkpoints were not emitted upstream to primary cluster.");
+        }
 
         Map<TopicPartition, OffsetAndMetadata> backupOffsets = waitForCheckpointOnAllPartitions(
-                backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS, "primary.test-topic-1");
+                backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS, backupTopic1);
 
         // Failover consumer group to backup cluster.
         try (Consumer<byte[], byte[]> primaryConsumer = backup.kafka().createConsumer(Collections.singletonMap("group.id", consumerGroupName))) {
@@ -317,55 +330,61 @@ public class MirrorConnectorsIntegrationBaseTest {
             primaryConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
             primaryConsumer.commitAsync();
 
-            assertTrue(primaryConsumer.position(new TopicPartition("primary.test-topic-1", 0)) > 0, "Consumer failedover to zero offset.");
+            assertTrue(primaryConsumer.position(new TopicPartition(backupTopic1, 0)) > 0, "Consumer failedover to zero offset.");
             assertTrue(primaryConsumer.position(
-                new TopicPartition("primary.test-topic-1", 0)) <= NUM_RECORDS_PRODUCED, "Consumer failedover beyond expected offset.");
-            assertTrue(primary.kafka().consume(1, CHECKPOINT_DURATION_MS, "backup.checkpoints.internal").count() > 0,
-                "Checkpoints were not emitted upstream to primary cluster.");
+                new TopicPartition(backupTopic1, 0)) <= NUM_RECORDS_PRODUCED, "Consumer failedover beyond expected offset.");
         }
 
-        Map<TopicPartition, OffsetAndMetadata> primaryOffsets = waitForCheckpointOnAllPartitions(
-                primaryClient, consumerGroupName, BACKUP_CLUSTER_ALIAS, "backup.test-topic-1");
-
         assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
  
         primaryClient.close();
         backupClient.close();
-        
-        // Failback consumer group to primary cluster
-        try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumer(Collections.singletonMap("group.id", consumerGroupName))) {
-            primaryConsumer.assign(primaryOffsets.keySet());
-            primaryOffsets.forEach(primaryConsumer::seek);
-            primaryConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
-            primaryConsumer.commitAsync();
 
-            assertTrue(primaryConsumer.position(new TopicPartition("backup.test-topic-1", 0)) > 0, "Consumer failedback to zero downstream offset.");
-            assertTrue(primaryConsumer.position(
-                new TopicPartition("backup.test-topic-1", 0)) <= NUM_RECORDS_PRODUCED, "Consumer failedback beyond expected downstream offset.");
+        if (replicateBackupToPrimary) {
+            Map<TopicPartition, OffsetAndMetadata> primaryOffsets = waitForCheckpointOnAllPartitions(
+                    primaryClient, consumerGroupName, BACKUP_CLUSTER_ALIAS, reverseTopic1);
+
+            // Failback consumer group to primary cluster
+            try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumer(Collections.singletonMap("group.id", consumerGroupName))) {
+                primaryConsumer.assign(primaryOffsets.keySet());
+                primaryOffsets.forEach(primaryConsumer::seek);
+                primaryConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+                primaryConsumer.commitAsync();
+
+                assertTrue(primaryConsumer.position(new TopicPartition(reverseTopic1, 0)) > 0, "Consumer failedback to zero downstream offset.");
+                assertTrue(primaryConsumer.position(
+                        new TopicPartition(reverseTopic1, 0)) <= NUM_RECORDS_PRODUCED, "Consumer failedback beyond expected downstream offset.");
+            }
+
         }
-      
+
         // create more matching topics
         primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
-        backup.kafka().createTopic("test-topic-3", NUM_PARTITIONS);
+        String backupTopic2 = remoteTopicName("test-topic-2", PRIMARY_CLUSTER_ALIAS);
 
         // make sure the topic is auto-created in the other cluster
-        waitForTopicCreated(backup, "primary.test-topic-2");
-        waitForTopicCreated(primary, "backup.test-topic-3");
+        waitForTopicCreated(backup, backupTopic2);
 
         // only produce messages to the first partition
         produceMessages(primary, "test-topic-2", 1);
-        produceMessages(backup, "test-topic-3", 1);
-        
+
         // expect total consumed messages equals to NUM_RECORDS_PER_PARTITION
         assertEquals(NUM_RECORDS_PER_PARTITION, primary.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, "test-topic-2").count(),
             "Records were not produced to primary cluster.");
-        assertEquals(NUM_RECORDS_PER_PARTITION, backup.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, "test-topic-3").count(),
-            "Records were not produced to backup cluster.");
-
-        assertEquals(NUM_RECORDS_PER_PARTITION, primary.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, "backup.test-topic-3").count(),
-            "New topic was not replicated to primary cluster.");
-        assertEquals(NUM_RECORDS_PER_PARTITION, backup.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count(),
+        assertEquals(NUM_RECORDS_PER_PARTITION, backup.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, backupTopic2).count(),
             "New topic was not replicated to backup cluster.");
+
+        if (replicateBackupToPrimary) {
+            backup.kafka().createTopic("test-topic-3", NUM_PARTITIONS);
+            String reverseTopic3 = remoteTopicName("test-topic-3", BACKUP_CLUSTER_ALIAS);
+            waitForTopicCreated(primary, reverseTopic3);
+            produceMessages(backup, "test-topic-3", 1);
+            assertEquals(NUM_RECORDS_PER_PARTITION, backup.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, "test-topic-3").count(),
+                    "Records were not produced to backup cluster.");
+
+            assertEquals(NUM_RECORDS_PER_PARTITION, primary.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, reverseTopic3).count(),
+                    "New topic was not replicated to primary cluster.");
+        }
     }
     
     @Test
@@ -394,7 +413,7 @@ public class MirrorConnectorsIntegrationBaseTest {
         // sleep few seconds to have MM2 finish replication so that "end" consumer will consume some record
         Thread.sleep(TimeUnit.SECONDS.toMillis(3));
 
-        String backupTopic = PRIMARY_CLUSTER_ALIAS + "." + topic;
+        String backupTopic = remoteTopicName(topic, PRIMARY_CLUSTER_ALIAS);
 
         // consume all records from backup cluster
         try (Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps,
@@ -425,8 +444,9 @@ public class MirrorConnectorsIntegrationBaseTest {
         testOneWayReplicationWithOffsetSyncs(0);
     }
 
-    public void testOneWayReplicationWithOffsetSyncs(int offsetLagMax) throws InterruptedException {
+    private void testOneWayReplicationWithOffsetSyncs(int offsetLagMax) throws InterruptedException {
         produceMessages(primary, "test-topic-1");
+        String backupTopic1 = remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS);
         String consumerGroupName = "consumer-group-testOneWayReplicationWithAutoOffsetSync";
         Map<String, Object> consumerProps  = new HashMap<String, Object>() {{
                 put("group.id", consumerGroupName);
@@ -452,13 +472,16 @@ public class MirrorConnectorsIntegrationBaseTest {
         waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
 
         // make sure the topic is created in the primary cluster only
-        topicShouldNotBeCreated(primary, "backup.test-topic-1");
-        waitForTopicCreated(backup, "primary.test-topic-1");
+        String reverseTopic1 = remoteTopicName("test-topic-1", BACKUP_CLUSTER_ALIAS);
+        if (!"test-topic-1".equals(reverseTopic1)) {
+            topicShouldNotBeCreated(primary, reverseTopic1);
+        }
+        waitForTopicCreated(backup, backupTopic1);
         // create a consumer at backup cluster with same consumer group Id to consume 1 topic
         try (Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(
-            consumerProps, "primary.test-topic-1")) {
+            consumerProps, backupTopic1)) {
 
-            waitForConsumerGroupFullSync(backup, Collections.singletonList("primary.test-topic-1"),
+            waitForConsumerGroupFullSync(backup, Collections.singletonList(backupTopic1),
                     consumerGroupName, NUM_RECORDS_PRODUCED, offsetLagMax);
 
             assertDownstreamRedeliveriesBoundedByMaxLag(backupConsumer, offsetLagMax);
@@ -467,7 +490,8 @@ public class MirrorConnectorsIntegrationBaseTest {
         // now create a new topic in primary cluster
         primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
         // make sure the topic is created in backup cluster
-        waitForTopicCreated(backup, "primary.test-topic-2");
+        String remoteTopic2 = remoteTopicName("test-topic-2", PRIMARY_CLUSTER_ALIAS);
+        waitForTopicCreated(backup, remoteTopic2);
 
         // produce some records to the new topic in primary cluster
         produceMessages(primary, "test-topic-2");
@@ -481,9 +505,9 @@ public class MirrorConnectorsIntegrationBaseTest {
 
         // create a consumer at backup cluster with same consumer group Id to consume old and new topic
         try (Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-            "group.id", consumerGroupName), "primary.test-topic-1", "primary.test-topic-2")) {
+            "group.id", consumerGroupName), backupTopic1, remoteTopic2)) {
 
-            waitForConsumerGroupFullSync(backup, Arrays.asList("primary.test-topic-1", "primary.test-topic-2"),
+            waitForConsumerGroupFullSync(backup, Arrays.asList(backupTopic1, remoteTopic2),
                     consumerGroupName, NUM_RECORDS_PRODUCED, offsetLagMax);
 
             assertDownstreamRedeliveriesBoundedByMaxLag(backupConsumer, offsetLagMax);
@@ -694,7 +718,7 @@ public class MirrorConnectorsIntegrationBaseTest {
         }
     }
 
-    protected static void restartMirrorMakerConnectors(EmbeddedConnectCluster connectCluster, List<Class<? extends Connector>> connectorClasses)  {
+    private static void restartMirrorMakerConnectors(EmbeddedConnectCluster connectCluster, List<Class<? extends Connector>> connectorClasses)  {
         for (Class<? extends Connector> connector : connectorClasses) {
             connectCluster.restartConnectorAndTasks(connector.getSimpleName(), false, true, false);
         }
@@ -717,7 +741,7 @@ public class MirrorConnectorsIntegrationBaseTest {
     /*
      * delete all topics of the input kafka cluster
      */
-    public static void deleteAllTopics(EmbeddedKafkaCluster cluster) throws Exception {
+    private static void deleteAllTopics(EmbeddedKafkaCluster cluster) throws Exception {
         try (final Admin adminClient = cluster.createAdminClient()) {
             Set<String> topicsToBeDeleted = adminClient.listTopics().names().get();
             log.debug("Deleting topics: {} ", topicsToBeDeleted);
@@ -752,7 +776,7 @@ public class MirrorConnectorsIntegrationBaseTest {
     /*
      * produce messages to the cluster and topic partition less than numPartitions 
      */
-    protected void produceMessages(EmbeddedConnectCluster cluster, String topicName, int numPartitions) {
+    private void produceMessages(EmbeddedConnectCluster cluster, String topicName, int numPartitions) {
         int cnt = 0;
         for (int r = 0; r < NUM_RECORDS_PER_PARTITION; r++)
             for (int p = 0; p < numPartitions; p++)
@@ -772,7 +796,7 @@ public class MirrorConnectorsIntegrationBaseTest {
         cluster.produce(topic, partition, key, value);
     }
 
-    protected static Map<TopicPartition, OffsetAndMetadata> waitForCheckpointOnAllPartitions(
+    private static Map<TopicPartition, OffsetAndMetadata> waitForCheckpointOnAllPartitions(
             MirrorClient client, String consumerGroupName, String remoteClusterAlias, String topicName
     ) throws InterruptedException {
         AtomicReference<Map<TopicPartition, OffsetAndMetadata>> ret = new AtomicReference<>();
@@ -804,7 +828,7 @@ public class MirrorConnectorsIntegrationBaseTest {
      * given consumer group, topics and expected number of records, make sure the consumer group
      * offsets are eventually synced to the expected offset numbers
      */
-    protected static <T> void waitForConsumerGroupFullSync(
+    private static <T> void waitForConsumerGroupFullSync(
             EmbeddedConnectCluster connect,
             List<String> topics,
             String consumerGroupId,
@@ -853,7 +877,7 @@ public class MirrorConnectorsIntegrationBaseTest {
         }
     }
 
-    protected static void assertMonotonicCheckpoints(EmbeddedConnectCluster cluster, String checkpointTopic) {
+    private static void assertMonotonicCheckpoints(EmbeddedConnectCluster cluster, String checkpointTopic) {
         TopicPartition checkpointTopicPartition = new TopicPartition(checkpointTopic, 0);
         try (Consumer<byte[], byte[]> backupConsumer = cluster.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
                 "auto.offset.reset", "earliest"), checkpointTopic)) {
@@ -879,7 +903,7 @@ public class MirrorConnectorsIntegrationBaseTest {
         }
     }
 
-    protected static void assertDownstreamRedeliveriesBoundedByMaxLag(Consumer<byte[], byte[]> targetConsumer, int offsetLagMax) {
+    private static void assertDownstreamRedeliveriesBoundedByMaxLag(Consumer<byte[], byte[]> targetConsumer, int offsetLagMax) {
         ConsumerRecords<byte[], byte[]> records = targetConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
         // After a full sync, there should be at most offset.lag.max records per partition consumed by both upstream and downstream consumers.
         for (TopicPartition tp : records.partitions()) {
@@ -891,7 +915,7 @@ public class MirrorConnectorsIntegrationBaseTest {
     /*
      * make sure the consumer to consume expected number of records
      */
-    protected static <T> void waitForConsumingAllRecords(Consumer<T, T> consumer, int numExpectedRecords)
+    private static <T> void waitForConsumingAllRecords(Consumer<T, T> consumer, int numExpectedRecords)
             throws InterruptedException {
         final AtomicInteger totalConsumedRecords = new AtomicInteger(0);
         waitForCondition(() -> {
@@ -904,7 +928,7 @@ public class MirrorConnectorsIntegrationBaseTest {
     /*
      * MM2 config to use in integration tests
      */
-    protected static Map<String, String> basicMM2Config() {
+    private static Map<String, String> basicMM2Config() {
         Map<String, String> mm2Props = new HashMap<>();
         mm2Props.put("clusters", PRIMARY_CLUSTER_ALIAS + ", " + BACKUP_CLUSTER_ALIAS);
         mm2Props.put("max.tasks", "10");
@@ -944,8 +968,8 @@ public class MirrorConnectorsIntegrationBaseTest {
 
         // This can speed up some test cases
         if (createReplicatedTopicsUpfront) {
-            primary.kafka().createTopic("backup.test-topic-1", 1, 1, emptyMap, adminClientConfig);
-            backup.kafka().createTopic("primary.test-topic-1", 1, 1, emptyMap, adminClientConfig);
+            primary.kafka().createTopic(remoteTopicName("test-topic-1", BACKUP_CLUSTER_ALIAS), 1, 1, emptyMap, adminClientConfig);
+            backup.kafka().createTopic(remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS), 1, 1, emptyMap, adminClientConfig);
         }
     }
 
@@ -966,7 +990,7 @@ public class MirrorConnectorsIntegrationBaseTest {
     /*
      * making sure the topic isn't created on the cluster
      */
-    protected static void topicShouldNotBeCreated(EmbeddedConnectCluster cluster, String topicName) throws InterruptedException {
+    private static void topicShouldNotBeCreated(EmbeddedConnectCluster cluster, String topicName) throws InterruptedException {
         try (final Admin adminClient = cluster.kafka().createAdminClient()) {
             waitForCondition(() ->
                     !adminClient.listTopics().names()
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java
index b9efda4a4d7..ab149e68753 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsWithCustomForwardingAdminIntegrationTest.java
@@ -146,8 +146,6 @@ public class MirrorConnectorsWithCustomForwardingAdminIntegrationTest extends Mi
         backupWorkerProps.putAll(superUserConfig());
 
         HashMap<String, String> additionalConfig = new HashMap<String, String>(superUserConfig()) {{
-                put(PRIMARY_CLUSTER_ALIAS + "->" + BACKUP_CLUSTER_ALIAS + ".enabled", "true");
-                put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "true");
                 put(FORWARDING_ADMIN_CLASS, FakeForwardingAdminWithLocalMetadata.class.getName());
             }};