You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/03/25 04:19:58 UTC

[pulsar] branch master updated: Refactored ReplicatorTest to be more easily debuggable (#3892)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1921bd9  Refactored ReplicatorTest to be more easily debuggable (#3892)
1921bd9 is described below

commit 1921bd98f20991248892717bbcfc102545e7c49c
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sun Mar 24 21:19:54 2019 -0700

    Refactored ReplicatorTest to be more easily debuggable (#3892)
---
 .../pulsar/broker/service/ReplicatorTest.java      | 429 ++++------
 .../pulsar/broker/service/ReplicatorTestBase.java  |  71 +-
 .../broker/service/v1/V1_ReplicatorTest.java       | 937 ---------------------
 .../broker/service/v1/V1_ReplicatorTestBase.java   | 377 ---------
 4 files changed, 207 insertions(+), 1607 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index a66179b..3150983 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -38,6 +38,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import lombok.Cleanup;
+
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -85,7 +87,7 @@ import com.scurrilous.circe.checksum.Crc32cIntChecksum;
 import io.netty.buffer.ByteBuf;
 
 /**
- * Starts 2 brokers that are in 2 different clusters
+ * Starts 3 brokers that are in 3 different clusters
  */
 public class ReplicatorTest extends ReplicatorTestBase {
 
@@ -113,7 +115,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
         return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
     }
 
-    @Test(enabled = true, timeOut = 30000)
+    @Test
     public void testConfigChange() throws Exception {
         log.info("--- Starting ReplicatorTest::testConfigChange ---");
         // This test is to verify that the config change on global namespace is successfully applied in broker during
@@ -127,18 +129,16 @@ public class ReplicatorTest extends ReplicatorTestBase {
                 @Override
                 public Void call() throws Exception {
 
+                    @Cleanup
                     MessageProducer producer = new MessageProducer(url1, dest);
                     log.info("--- Starting producer --- " + url1);
 
+                    @Cleanup
                     MessageConsumer consumer = new MessageConsumer(url1, dest);
                     log.info("--- Starting Consumer --- " + url1);
 
                     producer.produce(2);
-
                     consumer.receive(2);
-
-                    producer.close();
-                    consumer.close();
                     return null;
                 }
             }));
@@ -207,6 +207,8 @@ public class ReplicatorTest extends ReplicatorTestBase {
         admin1.namespaces().createNamespace(namespace);
         admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
         final TopicName topicName = TopicName.get(String.format("persistent://" + namespace + "/topic-%d", 0));
+
+        @Cleanup
         PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
                 .build();
         Producer<byte[]> producer = client1.newProducer().topic(topicName.toString())
@@ -243,220 +245,154 @@ public class ReplicatorTest extends ReplicatorTestBase {
         Mockito.verify(pulsarClient, Mockito.times(1)).createProducerAsync(Mockito.any(ProducerConfigurationData.class),
                 Mockito.any(Schema.class), eq(null));
 
-        client1.shutdown();
+        executor.shutdown();
     }
 
-    @Test(enabled = false, timeOut = 30000)
-    public void testConfigChangeNegativeCases() throws Exception {
-        log.info("--- Starting ReplicatorTest::testConfigChangeNegativeCases ---");
-        // Negative test cases for global namespace config change. Verify that the namespace config change can not be
-        // updated when the namespace is being unloaded.
-        // Set up field access to internal namespace state in NamespaceService
-        Field ownershipCacheField = NamespaceService.class.getDeclaredField("ownershipCache");
-        ownershipCacheField.setAccessible(true);
-        OwnershipCache ownerCache = (OwnershipCache) ownershipCacheField.get(pulsar1.getNamespaceService());
-        Assert.assertNotNull(pulsar1, "pulsar1 is null");
-        Assert.assertNotNull(pulsar1.getNamespaceService(), "pulsar1.getNamespaceService() is null");
-        NamespaceBundle globalNsBundle = pulsar1.getNamespaceService().getNamespaceBundleFactory()
-                .getFullBundle(NamespaceName.get("pulsar/ns"));
-        ownerCache.tryAcquiringOwnership(globalNsBundle);
-        Assert.assertNotNull(ownerCache.getOwnedBundle(globalNsBundle),
-                "pulsar1.getNamespaceService().getOwnedServiceUnit(NamespaceName.get(\"pulsar/ns\")) is null");
-        Field stateField = OwnedBundle.class.getDeclaredField("isActive");
-        stateField.setAccessible(true);
-        // set the namespace to be disabled
-        ownerCache.disableOwnership(globalNsBundle);
-
-        // Make sure the namespace config update failed
-        try {
-            admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1"));
-            fail("Should have raised exception");
-        } catch (PreconditionFailedException pfe) {
-            // OK
-        }
-
-        // restore the namespace state
-        ownerCache.removeOwnership(globalNsBundle).get();
-        ownerCache.tryAcquiringOwnership(globalNsBundle);
+    @DataProvider(name = "namespace")
+    public Object[][] namespaceNameProvider() {
+        return new Object[][] { { "pulsar/ns" }, { "pulsar/global/ns" } };
     }
 
-    @Test(enabled = true, timeOut = 30000)
-    public void testReplication() throws Exception {
-
+    @Test(dataProvider = "namespace")
+    public void testReplication(String namespace) throws Exception {
         log.info("--- Starting ReplicatorTest::testReplication ---");
 
         // This test is to verify that the config change on global namespace is successfully applied in broker during
         // runtime.
         // Run a set of producer tasks to create the topics
-        SortedSet<String> testDests = new TreeSet<String>();
-        List<Future<Void>> results = Lists.newArrayList();
-        for (int i = 0; i < 3; i++) {
-            final TopicName dest = TopicName.get(String.format("persistent://pulsar/ns/repltopic-%d", i));
-            testDests.add(dest.toString());
-
-            results.add(executor.submit(new Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
+        final TopicName dest = TopicName
+                .get(String.format("persistent://%s/repltopic-%d", namespace, System.nanoTime()));
 
-                    MessageProducer producer1 = new MessageProducer(url1, dest);
-                    log.info("--- Starting producer --- " + url1);
-
-                    MessageProducer producer2 = new MessageProducer(url2, dest);
-                    log.info("--- Starting producer --- " + url2);
-
-                    MessageProducer producer3 = new MessageProducer(url3, dest);
-                    log.info("--- Starting producer --- " + url3);
+        @Cleanup
+        MessageProducer producer1 = new MessageProducer(url1, dest);
+        log.info("--- Starting producer --- " + url1);
 
-                    MessageConsumer consumer1 = new MessageConsumer(url1, dest);
-                    log.info("--- Starting Consumer --- " + url1);
+        @Cleanup
+        MessageProducer producer2 = new MessageProducer(url2, dest);
+        log.info("--- Starting producer --- " + url2);
 
-                    MessageConsumer consumer2 = new MessageConsumer(url2, dest);
-                    log.info("--- Starting Consumer --- " + url2);
+        @Cleanup
+        MessageProducer producer3 = new MessageProducer(url3, dest);
+        log.info("--- Starting producer --- " + url3);
 
-                    MessageConsumer consumer3 = new MessageConsumer(url3, dest);
-                    log.info("--- Starting Consumer --- " + url3);
+        @Cleanup
+        MessageConsumer consumer1 = new MessageConsumer(url1, dest);
+        log.info("--- Starting Consumer --- " + url1);
 
-                    // Produce from cluster1 and consume from the rest
-                    producer1.produce(2);
+        @Cleanup
+        MessageConsumer consumer2 = new MessageConsumer(url2, dest);
+        log.info("--- Starting Consumer --- " + url2);
 
-                    consumer1.receive(2);
+        @Cleanup
+        MessageConsumer consumer3 = new MessageConsumer(url3, dest);
+        log.info("--- Starting Consumer --- " + url3);
 
-                    consumer2.receive(2);
+        // Produce from cluster1 and consume from the rest
+        producer1.produce(2);
 
-                    consumer3.receive(2);
+        consumer1.receive(2);
 
-                    // Produce from cluster2 and consume from the rest
-                    producer2.produce(2);
+        consumer2.receive(2);
 
-                    consumer1.receive(2);
+        consumer3.receive(2);
 
-                    consumer2.receive(2);
+        // Produce from cluster2 and consume from the rest
+        producer2.produce(2);
 
-                    consumer3.receive(2);
+        consumer1.receive(2);
 
-                    // Produce from cluster3 and consume from the rest
-                    producer3.produce(2);
+        consumer2.receive(2);
 
-                    consumer1.receive(2);
+        consumer3.receive(2);
 
-                    consumer2.receive(2);
+        // Produce from cluster3 and consume from the rest
+        producer3.produce(2);
 
-                    consumer3.receive(2);
+        consumer1.receive(2);
 
-                    // Produce from cluster1&2 and consume from cluster3
-                    producer1.produce(1);
-                    producer2.produce(1);
+        consumer2.receive(2);
 
-                    consumer1.receive(1);
+        consumer3.receive(2);
 
-                    consumer2.receive(1);
+        // Produce from cluster1&2 and consume from cluster3
+        producer1.produce(1);
+        producer2.produce(1);
 
-                    consumer3.receive(1);
+        consumer1.receive(1);
 
-                    consumer1.receive(1);
+        consumer2.receive(1);
 
-                    consumer2.receive(1);
+        consumer3.receive(1);
 
-                    consumer3.receive(1);
+        consumer1.receive(1);
 
-                    producer1.close();
-                    producer2.close();
-                    producer3.close();
-                    consumer1.close();
-                    consumer2.close();
-                    consumer3.close();
-                    return null;
-                }
-            }));
-        }
+        consumer2.receive(1);
 
-        for (Future<Void> result : results) {
-            try {
-                result.get();
-            } catch (Exception e) {
-                log.error("exception in getting future result ", e);
-                fail(String.format("replication test failed with %s exception", e.getMessage()));
-            }
-        }
+        consumer3.receive(1);
     }
 
-    @Test(enabled = false, timeOut = 30000)
+    @Test
     public void testReplicationOverrides() throws Exception {
-
         log.info("--- Starting ReplicatorTest::testReplicationOverrides ---");
 
         // This test is to verify that the config change on global namespace is successfully applied in broker during
         // runtime.
         // Run a set of producer tasks to create the topics
-        SortedSet<String> testDests = new TreeSet<String>();
-        List<Future<Void>> results = Lists.newArrayList();
         for (int i = 0; i < 10; i++) {
-            final TopicName dest = TopicName.get(String.format("persistent://pulsar/ns/repltopic-%d", i));
-            testDests.add(dest.toString());
+            final TopicName dest = TopicName
+                    .get(String.format("persistent://pulsar/ns/repltopic-%d", System.nanoTime()));
 
-            results.add(executor.submit(new Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
+            @Cleanup
+            MessageProducer producer1 = new MessageProducer(url1, dest);
+            log.info("--- Starting producer --- " + url1);
 
-                    MessageProducer producer1 = new MessageProducer(url1, dest);
-                    log.info("--- Starting producer --- " + url1);
+            @Cleanup
+            MessageProducer producer2 = new MessageProducer(url2, dest);
+            log.info("--- Starting producer --- " + url2);
 
-                    MessageProducer producer2 = new MessageProducer(url2, dest);
-                    log.info("--- Starting producer --- " + url2);
+            @Cleanup
+            MessageProducer producer3 = new MessageProducer(url3, dest);
+            log.info("--- Starting producer --- " + url3);
 
-                    MessageProducer producer3 = new MessageProducer(url3, dest);
-                    log.info("--- Starting producer --- " + url3);
+            @Cleanup
+            MessageConsumer consumer1 = new MessageConsumer(url1, dest);
+            log.info("--- Starting Consumer --- " + url1);
 
-                    MessageConsumer consumer1 = new MessageConsumer(url1, dest);
-                    log.info("--- Starting Consumer --- " + url1);
+            @Cleanup
+            MessageConsumer consumer2 = new MessageConsumer(url2, dest);
+            log.info("--- Starting Consumer --- " + url2);
 
-                    MessageConsumer consumer2 = new MessageConsumer(url2, dest);
-                    log.info("--- Starting Consumer --- " + url2);
-
-                    MessageConsumer consumer3 = new MessageConsumer(url3, dest);
-                    log.info("--- Starting Consumer --- " + url3);
-
-                    // Produce a message that isn't replicated
-                    producer1.produce(1, producer1.newMessage().disableReplication());
-
-                    // Produce a message not replicated to r2
-                    producer1.produce(1, producer1.newMessage().replicationClusters(Lists.newArrayList("r1", "r3")));
-
-                    // Produce a default replicated message
-                    producer1.produce(1);
-
-                    consumer1.receive(3);
-                    consumer2.receive(1);
-                    if (!consumer2.drained()) {
-                        throw new Exception("consumer2 - unexpected message in queue");
-                    }
-                    consumer3.receive(2);
-                    if (!consumer3.drained()) {
-                        throw new Exception("consumer3 - unexpected message in queue");
-                    }
-
-                    producer1.close();
-                    producer2.close();
-                    producer3.close();
-                    consumer1.close();
-                    consumer2.close();
-                    consumer3.close();
-                    return null;
-                }
-            }));
-        }
+            @Cleanup
+            MessageConsumer consumer3 = new MessageConsumer(url3, dest);
+            log.info("--- Starting Consumer --- " + url3);
 
-        for (Future<Void> result : results) {
-            try {
-                result.get();
-            } catch (Exception e) {
-                log.error("exception in getting future result ", e);
-                fail(String.format("replication test failed with %s exception", e.getMessage()));
-            }
+            // Produce a message that isn't replicated
+            producer1.produce(1, producer1.newMessage().disableReplication());
+
+            consumer1.receive(1);
+            assertTrue(consumer2.drained());
+            assertTrue(consumer3.drained());
+
+            // Produce a message not replicated to r2
+            producer1.produce(1, producer1.newMessage().replicationClusters(Lists.newArrayList("r1", "r3")));
+            consumer1.receive(1);
+            assertTrue(consumer2.drained());
+            consumer3.receive(1);
+
+            // Produce a default replicated message
+            producer1.produce(1);
+
+            consumer1.receive(1);
+            consumer2.receive(1);
+            consumer3.receive(1);
+
+            assertTrue(consumer1.drained());
+            assertTrue(consumer2.drained());
+            assertTrue(consumer3.drained());
         }
     }
 
-    @Test(enabled = true, timeOut = 30000)
+    @Test()
     public void testFailures() throws Exception {
 
         log.info("--- Starting ReplicatorTest::testFailures ---");
@@ -479,17 +415,16 @@ public class ReplicatorTest extends ReplicatorTestBase {
     @Test(timeOut = 30000)
     public void testReplicatePeekAndSkip() throws Exception {
 
-        SortedSet<String> testDests = new TreeSet<String>();
-
         final TopicName dest = TopicName.get("persistent://pulsar/ns/peekAndSeekTopic");
-        testDests.add(dest.toString());
 
+        @Cleanup
         MessageProducer producer1 = new MessageProducer(url1, dest);
+
+        @Cleanup
         MessageConsumer consumer1 = new MessageConsumer(url3, dest);
 
         // Produce from cluster1 and consume from the rest
         producer1.produce(2);
-        producer1.close();
         PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
         PersistentReplicator replicator = (PersistentReplicator) topic.getReplicators()
                 .get(topic.getReplicators().keys().get(0));
@@ -497,7 +432,6 @@ public class ReplicatorTest extends ReplicatorTestBase {
         CompletableFuture<Entry> result = replicator.peekNthMessage(1);
         Entry entry = result.get(50, TimeUnit.MILLISECONDS);
         assertNull(entry);
-        consumer1.close();
     }
 
     @Test(timeOut = 30000)
@@ -509,12 +443,14 @@ public class ReplicatorTest extends ReplicatorTestBase {
         final TopicName dest = TopicName.get("persistent://pulsar/ns/clearBacklogTopic");
         testDests.add(dest.toString());
 
+        @Cleanup
         MessageProducer producer1 = new MessageProducer(url1, dest);
+
+        @Cleanup
         MessageConsumer consumer1 = new MessageConsumer(url3, dest);
 
         // Produce from cluster1 and consume from the rest
         producer1.produce(2);
-        producer1.close();
         PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
         PersistentReplicator replicator = (PersistentReplicator) spy(
                 topic.getReplicators().get(topic.getReplicators().keys().get(0)));
@@ -525,7 +461,6 @@ public class ReplicatorTest extends ReplicatorTestBase {
         replicator.expireMessages(1); // for code-coverage
         ReplicatorStats status = replicator.getStats();
         assertTrue(status.replicationBacklog == 0);
-        consumer1.close();
     }
 
     @Test(enabled = true, timeOut = 30000)
@@ -534,116 +469,73 @@ public class ReplicatorTest extends ReplicatorTestBase {
         log.info("--- Starting ReplicatorTest::testResetCursorNotFail ---");
 
         // This test is to verify that reset cursor fails on global topic
-        SortedSet<String> testDests = new TreeSet<String>();
-        List<Future<Void>> results = Lists.newArrayList();
-        for (int i = 0; i < 1; i++) {
-            final TopicName dest = TopicName.get(String.format("persistent://pulsar/ns/resetrepltopic-%d", i));
-            testDests.add(dest.toString());
-
-            results.add(executor.submit(new Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
-
-                    MessageProducer producer1 = new MessageProducer(url1, dest);
-                    log.info("--- Starting producer --- " + url1);
+        final TopicName dest = TopicName
+                .get(String.format("persistent://pulsar/ns/resetrepltopic-%d", System.nanoTime()));
 
-                    MessageConsumer consumer1 = new MessageConsumer(url1, dest);
-                    log.info("--- Starting Consumer --- " + url1);
+        @Cleanup
+        MessageProducer producer1 = new MessageProducer(url1, dest);
+        log.info("--- Starting producer --- " + url1);
 
-                    // Produce from cluster1 and consume from the rest
-                    producer1.produce(2);
+        @Cleanup
+        MessageConsumer consumer1 = new MessageConsumer(url1, dest);
+        log.info("--- Starting Consumer --- " + url1);
 
-                    consumer1.receive(2);
+        // Produce from cluster1 and consume from the rest
+        producer1.produce(2);
 
-                    producer1.close();
-                    consumer1.close();
-                    return null;
-                }
-            }));
-        }
+        consumer1.receive(2);
 
-        for (Future<Void> result : results) {
-            try {
-                result.get();
-            } catch (Exception e) {
-                log.error("exception in getting future result ", e);
-                fail(String.format("replication test failed with %s exception", e.getMessage()));
-            }
-        }
-        admin1.topics().resetCursor(testDests.first(), "sub-id", System.currentTimeMillis());
+        admin1.topics().resetCursor(dest.toString(), "sub-id", System.currentTimeMillis());
     }
 
-    @Test(enabled = true, timeOut = 30000)
+    @Test
     public void testReplicationForBatchMessages() throws Exception {
-
         log.info("--- Starting ReplicatorTest::testReplicationForBatchMessages ---");
 
         // Run a set of producer tasks to create the topics
-        SortedSet<String> testDests = new TreeSet<String>();
-        List<Future<Void>> results = Lists.newArrayList();
-        for (int i = 0; i < 3; i++) {
-            final TopicName dest = TopicName.get(String.format("persistent://pulsar/ns/repltopicbatch-%d", i));
-            testDests.add(dest.toString());
-
-            results.add(executor.submit(new Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
+        final TopicName dest = TopicName.get(String.format("persistent://pulsar/ns/repltopicbatch-%d", System.nanoTime()));
 
-                    MessageProducer producer1 = new MessageProducer(url1, dest, true);
-                    log.info("--- Starting producer --- " + url1);
-
-                    MessageProducer producer2 = new MessageProducer(url2, dest, true);
-                    log.info("--- Starting producer --- " + url2);
+        @Cleanup
+        MessageProducer producer1 = new MessageProducer(url1, dest, true);
+        log.info("--- Starting producer --- " + url1);
 
-                    MessageProducer producer3 = new MessageProducer(url3, dest, true);
-                    log.info("--- Starting producer --- " + url3);
+        @Cleanup
+        MessageProducer producer2 = new MessageProducer(url2, dest, true);
+        log.info("--- Starting producer --- " + url2);
 
-                    MessageConsumer consumer1 = new MessageConsumer(url1, dest);
-                    log.info("--- Starting Consumer --- " + url1);
+        @Cleanup
+        MessageProducer producer3 = new MessageProducer(url3, dest, true);
+        log.info("--- Starting producer --- " + url3);
 
-                    MessageConsumer consumer2 = new MessageConsumer(url2, dest);
-                    log.info("--- Starting Consumer --- " + url2);
+        @Cleanup
+        MessageConsumer consumer1 = new MessageConsumer(url1, dest);
+        log.info("--- Starting Consumer --- " + url1);
 
-                    MessageConsumer consumer3 = new MessageConsumer(url3, dest);
-                    log.info("--- Starting Consumer --- " + url3);
+        @Cleanup
+        MessageConsumer consumer2 = new MessageConsumer(url2, dest);
+        log.info("--- Starting Consumer --- " + url2);
 
-                    // Produce from cluster1 and consume from the rest
-                    producer1.produceBatch(10);
+        @Cleanup
+        MessageConsumer consumer3 = new MessageConsumer(url3, dest);
+        log.info("--- Starting Consumer --- " + url3);
 
-                    consumer1.receive(10);
+        // Produce from cluster1 and consume from the rest
+        producer1.produceBatch(10);
 
-                    consumer2.receive(10);
+        consumer1.receive(10);
 
-                    consumer3.receive(10);
+        consumer2.receive(10);
 
-                    // Produce from cluster2 and consume from the rest
-                    producer2.produceBatch(10);
+        consumer3.receive(10);
 
-                    consumer1.receive(10);
+        // Produce from cluster2 and consume from the rest
+        producer2.produceBatch(10);
 
-                    consumer2.receive(10);
+        consumer1.receive(10);
 
-                    consumer3.receive(10);
+        consumer2.receive(10);
 
-                    producer1.close();
-                    producer2.close();
-                    producer3.close();
-                    consumer1.close();
-                    consumer2.close();
-                    consumer3.close();
-                    return null;
-                }
-            }));
-        }
-
-        for (Future<Void> result : results) {
-            try {
-                result.get(5, TimeUnit.SECONDS);
-            } catch (Exception e) {
-                log.error("exception in getting future result ", e);
-                fail(String.format("replication test failed with %s exception", e.getMessage()));
-            }
-        }
+        consumer3.receive(10);
     }
 
     /**
@@ -657,6 +549,8 @@ public class ReplicatorTest extends ReplicatorTestBase {
         log.info("--- Starting ReplicatorTest::testDeleteReplicatorFailure ---");
         final String topicName = "persistent://pulsar/ns/repltopicbatch";
         final TopicName dest = TopicName.get(topicName);
+
+        @Cleanup
         MessageProducer producer1 = new MessageProducer(url1, dest);
         PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(topicName).get();
         final String replicatorClusterName = topic.getReplicators().keys().get(0);
@@ -688,8 +582,6 @@ public class ReplicatorTest extends ReplicatorTestBase {
             assertNull(topic.getPersistentReplicator(replicatorClusterName));
             return null;
         });
-
-        producer1.close();
     }
 
     @SuppressWarnings("unchecked")
@@ -698,6 +590,8 @@ public class ReplicatorTest extends ReplicatorTestBase {
         log.info("--- Starting ReplicatorTest::testDeleteReplicatorFailure ---");
         final String topicName = "persistent://pulsar/ns/repltopicbatch";
         final TopicName dest = TopicName.get(topicName);
+
+        @Cleanup
         MessageProducer producer1 = new MessageProducer(url1, dest);
         PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(topicName).get();
         final String replicatorClusterName = topic.getReplicators().keys().get(0);
@@ -710,7 +604,6 @@ public class ReplicatorTest extends ReplicatorTestBase {
         field.setAccessible(true);
         ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) field.get(replicator);
         assertNull(producer);
-        producer1.close();
     }
 
     /**
@@ -737,9 +630,11 @@ public class ReplicatorTest extends ReplicatorTestBase {
                     .get(String.format("persistent://pulsar/ns1/%s-%d", policy, System.currentTimeMillis()));
 
             // Producer on r1
+            @Cleanup
             MessageProducer producer1 = new MessageProducer(url1, dest);
 
             // Consumer on r2
+            @Cleanup
             MessageConsumer consumer2 = new MessageConsumer(url2, dest);
 
             // Replicator for r1 -> r2
@@ -780,9 +675,6 @@ public class ReplicatorTest extends ReplicatorTestBase {
             }
 
             assertEquals(replicator.getStats().replicationBacklog, 0);
-
-            producer1.close();
-            consumer2.close();
         }
     }
 
@@ -796,10 +688,13 @@ public class ReplicatorTest extends ReplicatorTestBase {
     public void testCloseReplicatorStartProducer() throws Exception {
         TopicName dest = TopicName.get("persistent://pulsar/ns1/closeCursor");
         // Producer on r1
+        @Cleanup
         MessageProducer producer1 = new MessageProducer(url1, dest);
         // Consumer on r1
+        @Cleanup
         MessageConsumer consumer1 = new MessageConsumer(url1, dest);
         // Consumer on r2
+        @Cleanup
         MessageConsumer consumer2 = new MessageConsumer(url2, dest);
 
         // Replicator for r1 -> r2
@@ -833,10 +728,6 @@ public class ReplicatorTest extends ReplicatorTestBase {
         @SuppressWarnings("unchecked")
         ProducerImpl<byte[]> replicatorProducer = (ProducerImpl<byte[]>) producerField.get(replicator);
         assertEquals(replicatorProducer, null);
-
-        producer1.close();
-        consumer1.close();
-        consumer2.close();
     }
 
     @Test(timeOut = 30000)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index 6aac08a..73b9b91 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -19,13 +19,15 @@
 package org.apache.pulsar.broker.service;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 
 import com.google.common.collect.Sets;
 
+import io.netty.util.concurrent.DefaultThreadFactory;
+
 import java.net.URL;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
+import java.util.Set;
+import java.util.TreeSet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -37,16 +39,15 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 import org.apache.pulsar.zookeeper.ZookeeperServerTest;
 import org.slf4j.Logger;
@@ -80,7 +81,8 @@ public class ReplicatorTestBase {
 
     ZookeeperServerTest globalZkS;
 
-    ExecutorService executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
+    ExecutorService executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
+            new DefaultThreadFactory("ReplicatorTestBase"));
 
     static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;
 
@@ -211,13 +213,10 @@ public class ReplicatorTestBase {
         admin1.clusters().createCluster("r3", new ClusterData(url3.toString(), urlTls3.toString(),
                 pulsar3.getBrokerServiceUrl(), pulsar3.getBrokerServiceUrlTls()));
 
-        admin1.clusters().createCluster("global", new ClusterData("http://global:8080", "https://global:8443"));
         admin1.tenants().createTenant("pulsar",
                 new TenantInfo(Sets.newHashSet("appid1", "appid2", "appid3"), Sets.newHashSet("r1", "r2", "r3")));
-        admin1.namespaces().createNamespace("pulsar/ns");
-        admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns", Sets.newHashSet("r1", "r2", "r3"));
-        admin1.namespaces().createNamespace("pulsar/ns1");
-        admin1.namespaces().setNamespaceReplicationClusters("pulsar/ns1", Sets.newHashSet("r1", "r2"));
+        admin1.namespaces().createNamespace("pulsar/ns", Sets.newHashSet("r1", "r2", "r3"));
+        admin1.namespaces().createNamespace("pulsar/ns1", Sets.newHashSet("r1", "r2"));
 
         assertEquals(admin2.clusters().getCluster("r1").getServiceUrl(), url1.toString());
         assertEquals(admin2.clusters().getCluster("r2").getServiceUrl(), url2.toString());
@@ -226,6 +225,11 @@ public class ReplicatorTestBase {
         assertEquals(admin2.clusters().getCluster("r2").getBrokerServiceUrl(), pulsar2.getBrokerServiceUrl());
         assertEquals(admin2.clusters().getCluster("r3").getBrokerServiceUrl(), pulsar3.getBrokerServiceUrl());
 
+        // Also create V1 namespace for compatibility check
+        admin1.clusters().createCluster("global", new ClusterData("http://global:8080", "https://global:8443"));
+        admin1.namespaces().createNamespace("pulsar/global/ns");
+        admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/ns", Sets.newHashSet("r1", "r2", "r3"));
+
         Thread.sleep(100);
         log.info("--- ReplicatorTestBase::setup completed ---");
 
@@ -253,7 +257,7 @@ public class ReplicatorTestBase {
         globalZkS.stop();
     }
 
-    static class MessageProducer {
+    static class MessageProducer implements AutoCloseable {
         URL url;
         String namespace;
         String topicName;
@@ -289,13 +293,12 @@ public class ReplicatorTestBase {
 
         void produceBatch(int messages) throws Exception {
             log.info("Start sending batch messages");
-            List<CompletableFuture<MessageId>> futureList = new ArrayList<>();
 
             for (int i = 0; i < messages; i++) {
-                futureList.add(producer.sendAsync(("test-" + i).getBytes()));
+                producer.sendAsync(("test-" + i).getBytes());
                 log.info("queued message {}", ("test-" + i));
             }
-            FutureUtil.waitForAll(futureList).get();
+            producer.flush();
         }
 
         void produce(int messages) throws Exception {
@@ -315,19 +318,23 @@ public class ReplicatorTestBase {
         void produce(int messages, TypedMessageBuilder<byte[]> messageBuilder) throws Exception {
             log.info("Start sending messages");
             for (int i = 0; i < messages; i++) {
-                final String m = new String("test-builder-" + i);
+                final String m = new String("test-" + i);
                 messageBuilder.value(m.getBytes()).send();
                 log.info("Sent message {}", m);
             }
         }
 
-        void close() throws Exception {
-            client.close();
+        public void close() {
+            try {
+                client.close();
+            } catch (PulsarClientException e) {
+                log.warn("Failed to close client", e);
+            }
         }
 
     }
 
-    static class MessageConsumer {
+    static class MessageConsumer implements AutoCloseable {
         final URL url;
         final String namespace;
         final String topicName;
@@ -357,12 +364,24 @@ public class ReplicatorTestBase {
             log.info("Start receiving messages");
             Message<byte[]> msg;
 
-            for (int i = 0; i < messages; i++) {
-                msg = consumer.receive();
+            Set<String> receivedMessages = new TreeSet<>();
+
+            int i = 0;
+            while (i < messages) {
+                msg = consumer.receive(10, TimeUnit.SECONDS);
+                assertNotNull(msg);
                 consumer.acknowledge(msg);
+
                 String msgData = new String(msg.getData());
-                assertEquals(msgData, "test-" + i);
                 log.info("Received message {}", msgData);
+
+                boolean added = receivedMessages.add(msgData);
+                if (added) {
+                    assertEquals(msgData, "test-" + i);
+                    i++;
+                } else {
+                    log.info("Ignoring duplicate {}", msgData);
+                }
             }
         }
 
@@ -370,8 +389,12 @@ public class ReplicatorTestBase {
             return consumer.receive(0, TimeUnit.MICROSECONDS) == null;
         }
 
-        void close() throws Exception {
-            client.close();
+        public void close() {
+            try {
+                client.close();
+            } catch (PulsarClientException e) {
+                log.warn("Failed to close client", e);
+            }
         }
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTest.java
deleted file mode 100644
index 5d51db8..0000000
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTest.java
+++ /dev/null
@@ -1,937 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service.v1;
-
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.spy;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
-import com.google.common.collect.Sets;
-import com.scurrilous.circe.checksum.Crc32cIntChecksum;
-
-import io.netty.buffer.ByteBuf;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.util.List;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
-import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
-import org.apache.bookkeeper.mledger.Entry;
-import org.apache.bookkeeper.mledger.ManagedCursor;
-import org.apache.bookkeeper.mledger.ManagedLedgerException;
-import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
-import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
-import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.broker.namespace.OwnedBundle;
-import org.apache.pulsar.broker.namespace.OwnershipCache;
-import org.apache.pulsar.broker.service.AbstractReplicator;
-import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
-import org.apache.pulsar.broker.service.Replicator;
-import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
-import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.RawMessage;
-import org.apache.pulsar.client.api.RawReader;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.impl.ProducerImpl;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-import org.apache.pulsar.common.api.Commands;
-import org.apache.pulsar.common.naming.NamespaceBundle;
-import org.apache.pulsar.common.naming.NamespaceName;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.BacklogQuota;
-import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
-import org.apache.pulsar.common.policies.data.ReplicatorStats;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
-import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-import org.testng.collections.Lists;
-
-/**
- * Starts 2 brokers that are in 2 different clusters
- */
-public class V1_ReplicatorTest extends V1_ReplicatorTestBase {
-
-    protected String methodName;
-
-    @BeforeMethod
-    public void beforeMethod(Method m) {
-        methodName = m.getName();
-    }
-
-    @Override
-    @BeforeClass(timeOut = 60000)
-    void setup() throws Exception {
-        super.setup();
-    }
-
-    @Override
-    @AfterClass(timeOut = 60000)
-    void shutdown() throws Exception {
-        super.shutdown();
-    }
-
-    @DataProvider(name = "partitionedTopic")
-    public Object[][] partitionedTopicProvider() {
-        return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
-    }
-
-    @Test(enabled = true, timeOut = 30000)
-    public void testConfigChange() throws Exception {
-        log.info("--- Starting V1_ReplicatorTest::testConfigChange ---");
-        // This test is to verify that the config change on global namespace is successfully applied in broker during
-        // runtime.
-        // Run a set of producer tasks to create the topics
-        List<Future<Void>> results = Lists.newArrayList();
-        for (int i = 0; i < 10; i++) {
-            final TopicName dest = TopicName.get(String.format("persistent://pulsar/global/ns/topic-%d", i));
-
-            results.add(executor.submit(new Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
-
-                    MessageProducer producer = new MessageProducer(url1, dest);
-                    log.info("--- Starting producer --- " + url1);
-
-                    MessageConsumer consumer = new MessageConsumer(url1, dest);
-                    log.info("--- Starting Consumer --- " + url1);
-
-                    producer.produce(2);
-
-                    consumer.receive(2);
-
-                    producer.close();
-                    consumer.close();
-                    return null;
-                }
-            }));
-        }
-
-        for (Future<Void> result : results) {
-            try {
-                result.get();
-            } catch (Exception e) {
-                log.error("exception in getting future result ", e);
-                fail(String.format("replication test failed with %s exception", e.getMessage()));
-            }
-        }
-
-        Thread.sleep(1000L);
-        // Make sure that the internal replicators map contains remote cluster info
-        ConcurrentOpenHashMap<String, PulsarClient> replicationClients1 = ns1.getReplicationClients();
-        ConcurrentOpenHashMap<String, PulsarClient> replicationClients2 = ns2.getReplicationClients();
-        ConcurrentOpenHashMap<String, PulsarClient> replicationClients3 = ns3.getReplicationClients();
-
-        Assert.assertNotNull(replicationClients1.get("r2"));
-        Assert.assertNotNull(replicationClients1.get("r3"));
-        Assert.assertNotNull(replicationClients2.get("r1"));
-        Assert.assertNotNull(replicationClients2.get("r3"));
-        Assert.assertNotNull(replicationClients3.get("r1"));
-        Assert.assertNotNull(replicationClients3.get("r2"));
-
-        // Case 1: Update the global namespace replication configuration to only contains the local cluster itself
-        admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/ns", Sets.newHashSet("r1"));
-
-        // Wait for config changes to be updated.
-        Thread.sleep(1000L);
-
-        // Make sure that the internal replicators map still contains remote cluster info
-        Assert.assertNotNull(replicationClients1.get("r2"));
-        Assert.assertNotNull(replicationClients1.get("r3"));
-        Assert.assertNotNull(replicationClients2.get("r1"));
-        Assert.assertNotNull(replicationClients2.get("r3"));
-        Assert.assertNotNull(replicationClients3.get("r1"));
-        Assert.assertNotNull(replicationClients3.get("r2"));
-
-        // Case 2: Update the configuration back
-        admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/ns", Sets.newHashSet("r1", "r2", "r3"));
-
-        // Wait for config changes to be updated.
-        Thread.sleep(1000L);
-
-        // Make sure that the internal replicators map still contains remote cluster info
-        Assert.assertNotNull(replicationClients1.get("r2"));
-        Assert.assertNotNull(replicationClients1.get("r3"));
-        Assert.assertNotNull(replicationClients2.get("r1"));
-        Assert.assertNotNull(replicationClients2.get("r3"));
-        Assert.assertNotNull(replicationClients3.get("r1"));
-        Assert.assertNotNull(replicationClients3.get("r2"));
-
-        // Case 3: TODO: Once automatic cleanup is implemented, add tests case to verify auto removal of clusters
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test(timeOut = 30000)
-    public void testConcurrentReplicator() throws Exception {
-
-        log.info("--- Starting V1_ReplicatorTest::testConcurrentReplicator ---");
-
-        final String namespace = "pulsar/global/concurrent";
-        admin1.namespaces().createNamespace(namespace);
-        admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
-        final TopicName topicName = TopicName.get(String.format("persistent://" + namespace + "/topic-%d", 0));
-        PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
-                .build();
-        Producer<byte[]> producer = client1.newProducer().topic(topicName.toString()).create();
-        producer.close();
-
-        PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName.toString()).get();
-
-        PulsarClientImpl pulsarClient = spy((PulsarClientImpl) pulsar1.getBrokerService().getReplicationClient("r3"));
-        final Method startRepl = PersistentTopic.class.getDeclaredMethod("startReplicator", String.class);
-        startRepl.setAccessible(true);
-
-        Field replClientField = BrokerService.class.getDeclaredField("replicationClients");
-        replClientField.setAccessible(true);
-        ConcurrentOpenHashMap<String, PulsarClient> replicationClients = (ConcurrentOpenHashMap<String, PulsarClient>) replClientField
-                .get(pulsar1.getBrokerService());
-        replicationClients.put("r3", pulsarClient);
-
-        admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3"));
-        ExecutorService executor = Executors.newFixedThreadPool(5);
-        for (int i = 0; i < 5; i++) {
-            executor.submit(() -> {
-                try {
-                    startRepl.invoke(topic, "r3");
-                } catch (Exception e) {
-                    fail("setting replicator failed", e);
-                }
-            });
-        }
-        Thread.sleep(3000);
-
-        Mockito.verify(pulsarClient, Mockito.times(1)).createProducerAsync(Mockito.any(ProducerConfigurationData.class),
-                Mockito.any(Schema.class), eq(null));
-
-        client1.shutdown();
-    }
-
-    @Test(enabled = false, timeOut = 30000)
-    public void testConfigChangeNegativeCases() throws Exception {
-        log.info("--- Starting V1_ReplicatorTest::testConfigChangeNegativeCases ---");
-        // Negative test cases for global namespace config change. Verify that the namespace config change can not be
-        // updated when the namespace is being unloaded.
-        // Set up field access to internal namespace state in NamespaceService
-        Field ownershipCacheField = NamespaceService.class.getDeclaredField("ownershipCache");
-        ownershipCacheField.setAccessible(true);
-        OwnershipCache ownerCache = (OwnershipCache) ownershipCacheField.get(pulsar1.getNamespaceService());
-        Assert.assertNotNull(pulsar1, "pulsar1 is null");
-        Assert.assertNotNull(pulsar1.getNamespaceService(), "pulsar1.getNamespaceService() is null");
-        NamespaceBundle globalNsBundle = pulsar1.getNamespaceService().getNamespaceBundleFactory()
-                .getFullBundle(NamespaceName.get("pulsar/global/ns"));
-        ownerCache.tryAcquiringOwnership(globalNsBundle);
-        Assert.assertNotNull(ownerCache.getOwnedBundle(globalNsBundle),
-                "pulsar1.getNamespaceService().getOwnedServiceUnit(NamespaceName.get(\"pulsar/global/ns\")) is null");
-        Field stateField = OwnedBundle.class.getDeclaredField("isActive");
-        stateField.setAccessible(true);
-        // set the namespace to be disabled
-        ownerCache.disableOwnership(globalNsBundle);
-
-        // Make sure the namespace config update failed
-        try {
-            admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/ns", Sets.newHashSet("r1"));
-            fail("Should have raised exception");
-        } catch (PreconditionFailedException pfe) {
-            // OK
-        }
-
-        // restore the namespace state
-        ownerCache.removeOwnership(globalNsBundle).get();
-        ownerCache.tryAcquiringOwnership(globalNsBundle);
-    }
-
-    @Test(enabled = true, timeOut = 30000)
-    public void testReplication() throws Exception {
-
-        log.info("--- Starting V1_ReplicatorTest::testReplication ---");
-
-        // This test is to verify that the config change on global namespace is successfully applied in broker during
-        // runtime.
-        // Run a set of producer tasks to create the topics
-        SortedSet<String> testDests = new TreeSet<String>();
-        List<Future<Void>> results = Lists.newArrayList();
-        for (int i = 0; i < 3; i++) {
-            final TopicName dest = TopicName.get(String.format("persistent://pulsar/global/ns/repltopic-%d", i));
-            testDests.add(dest.toString());
-
-            results.add(executor.submit(new Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
-
-                    MessageProducer producer1 = new MessageProducer(url1, dest);
-                    log.info("--- Starting producer --- " + url1);
-
-                    MessageProducer producer2 = new MessageProducer(url2, dest);
-                    log.info("--- Starting producer --- " + url2);
-
-                    MessageProducer producer3 = new MessageProducer(url3, dest);
-                    log.info("--- Starting producer --- " + url3);
-
-                    MessageConsumer consumer1 = new MessageConsumer(url1, dest);
-                    log.info("--- Starting Consumer --- " + url1);
-
-                    MessageConsumer consumer2 = new MessageConsumer(url2, dest);
-                    log.info("--- Starting Consumer --- " + url2);
-
-                    MessageConsumer consumer3 = new MessageConsumer(url3, dest);
-                    log.info("--- Starting Consumer --- " + url3);
-
-                    // Produce from cluster1 and consume from the rest
-                    producer1.produce(2);
-
-                    consumer1.receive(2);
-
-                    consumer2.receive(2);
-
-                    consumer3.receive(2);
-
-                    // Produce from cluster2 and consume from the rest
-                    producer2.produce(2);
-
-                    consumer1.receive(2);
-
-                    consumer2.receive(2);
-
-                    consumer3.receive(2);
-
-                    // Produce from cluster3 and consume from the rest
-                    producer3.produce(2);
-
-                    consumer1.receive(2);
-
-                    consumer2.receive(2);
-
-                    consumer3.receive(2);
-
-                    // Produce from cluster1&2 and consume from cluster3
-                    producer1.produce(1);
-                    producer2.produce(1);
-
-                    consumer1.receive(1);
-
-                    consumer2.receive(1);
-
-                    consumer3.receive(1);
-
-                    consumer1.receive(1);
-
-                    consumer2.receive(1);
-
-                    consumer3.receive(1);
-
-                    producer1.close();
-                    producer2.close();
-                    producer3.close();
-                    consumer1.close();
-                    consumer2.close();
-                    consumer3.close();
-                    return null;
-                }
-            }));
-        }
-
-        for (Future<Void> result : results) {
-            try {
-                result.get();
-            } catch (Exception e) {
-                log.error("exception in getting future result ", e);
-                fail(String.format("replication test failed with %s exception", e.getMessage()));
-            }
-        }
-    }
-
-    @Test(enabled = false, timeOut = 30000)
-    public void testReplicationOverrides() throws Exception {
-
-        log.info("--- Starting V1_ReplicatorTest::testReplicationOverrides ---");
-
-        // This test is to verify that the config change on global namespace is successfully applied in broker during
-        // runtime.
-        // Run a set of producer tasks to create the topics
-        SortedSet<String> testDests = new TreeSet<String>();
-        List<Future<Void>> results = Lists.newArrayList();
-        for (int i = 0; i < 10; i++) {
-            final TopicName dest = TopicName.get(String.format("persistent://pulsar/global/ns/repltopic-%d", i));
-            testDests.add(dest.toString());
-
-            results.add(executor.submit(new Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
-
-                    MessageProducer producer1 = new MessageProducer(url1, dest);
-                    log.info("--- Starting producer --- " + url1);
-
-                    MessageProducer producer2 = new MessageProducer(url2, dest);
-                    log.info("--- Starting producer --- " + url2);
-
-                    MessageProducer producer3 = new MessageProducer(url3, dest);
-                    log.info("--- Starting producer --- " + url3);
-
-                    MessageConsumer consumer1 = new MessageConsumer(url1, dest);
-                    log.info("--- Starting Consumer --- " + url1);
-
-                    MessageConsumer consumer2 = new MessageConsumer(url2, dest);
-                    log.info("--- Starting Consumer --- " + url2);
-
-                    MessageConsumer consumer3 = new MessageConsumer(url3, dest);
-                    log.info("--- Starting Consumer --- " + url3);
-
-                    // Produce a message that isn't replicated
-                    producer1.produce(1, producer1.newMessage().disableReplication());
-
-                    // Produce a message not replicated to r2
-                    producer1.produce(1,
-                            producer1.newMessage().replicationClusters(Lists.newArrayList("r1", "r3")));
-
-                    // Produce a default replicated message
-                    producer1.produce(1);
-
-                    consumer1.receive(3);
-                    consumer2.receive(1);
-                    if (!consumer2.drained()) {
-                        throw new Exception("consumer2 - unexpected message in queue");
-                    }
-                    consumer3.receive(2);
-                    if (!consumer3.drained()) {
-                        throw new Exception("consumer3 - unexpected message in queue");
-                    }
-
-                    producer1.close();
-                    producer2.close();
-                    producer3.close();
-                    consumer1.close();
-                    consumer2.close();
-                    consumer3.close();
-                    return null;
-                }
-            }));
-        }
-
-        for (Future<Void> result : results) {
-            try {
-                result.get();
-            } catch (Exception e) {
-                log.error("exception in getting future result ", e);
-                fail(String.format("replication test failed with %s exception", e.getMessage()));
-            }
-        }
-    }
-
-    @Test(enabled = true, timeOut = 30000)
-    public void testFailures() throws Exception {
-
-        log.info("--- Starting V1_ReplicatorTest::testFailures ---");
-
-        try {
-            // 1. Create a consumer using the reserved consumer id prefix "pulsar.repl."
-
-            final TopicName dest = TopicName.get(String.format("persistent://pulsar/global/ns/res-cons-id"));
-
-            // Create another consumer using replication prefix as sub id
-            MessageConsumer consumer = new MessageConsumer(url2, dest, "pulsar.repl.");
-            consumer.close();
-
-        } catch (Exception e) {
-            // SUCCESS
-        }
-
-    }
-
-    @Test(timeOut = 30000)
-    public void testReplicatePeekAndSkip() throws Exception {
-
-        SortedSet<String> testDests = new TreeSet<String>();
-
-        final TopicName dest = TopicName.get("persistent://pulsar/global/ns/peekAndSeekTopic");
-        testDests.add(dest.toString());
-
-        MessageProducer producer1 = new MessageProducer(url1, dest);
-        MessageConsumer consumer1 = new MessageConsumer(url3, dest);
-
-        // Produce from cluster1 and consume from the rest
-        producer1.produce(2);
-        producer1.close();
-        PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
-        PersistentReplicator replicator = (PersistentReplicator) topic.getReplicators()
-                .get(topic.getReplicators().keys().get(0));
-        replicator.skipMessages(2);
-        CompletableFuture<Entry> result = replicator.peekNthMessage(1);
-        Entry entry = result.get(50, TimeUnit.MILLISECONDS);
-        assertNull(entry);
-        consumer1.close();
-    }
-
-    @Test(timeOut = 30000)
-    public void testReplicatorClearBacklog() throws Exception {
-
-        // This test is to verify that reset cursor fails on global topic
-        SortedSet<String> testDests = new TreeSet<String>();
-
-        final TopicName dest = TopicName.get("persistent://pulsar/global/ns/clearBacklogTopic");
-        testDests.add(dest.toString());
-
-        MessageProducer producer1 = new MessageProducer(url1, dest);
-        MessageConsumer consumer1 = new MessageConsumer(url3, dest);
-
-        // Produce from cluster1 and consume from the rest
-        producer1.produce(2);
-        producer1.close();
-        PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
-        PersistentReplicator replicator = (PersistentReplicator) spy(
-                topic.getReplicators().get(topic.getReplicators().keys().get(0)));
-        replicator.readEntriesFailed(new ManagedLedgerException.InvalidCursorPositionException("failed"), null);
-        replicator.clearBacklog().get();
-        Thread.sleep(100);
-        replicator.updateRates(); // for code-coverage
-        replicator.expireMessages(1); // for code-coverage
-        ReplicatorStats status = replicator.getStats();
-        assertTrue(status.replicationBacklog == 0);
-        consumer1.close();
-    }
-
-    @SuppressWarnings("deprecation")
-    @Test(enabled = true, timeOut = 30000)
-    public void testResetCursorNotFail() throws Exception {
-
-        log.info("--- Starting V1_ReplicatorTest::testResetCursorNotFail ---");
-
-        // This test is to verify that reset cursor fails on global topic
-        SortedSet<String> testDests = new TreeSet<String>();
-        List<Future<Void>> results = Lists.newArrayList();
-        for (int i = 0; i < 1; i++) {
-            final TopicName dest = TopicName.get(String.format("persistent://pulsar/global/ns/resetrepltopic-%d", i));
-            testDests.add(dest.toString());
-
-            results.add(executor.submit(new Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
-
-                    MessageProducer producer1 = new MessageProducer(url1, dest);
-                    log.info("--- Starting producer --- " + url1);
-
-                    MessageConsumer consumer1 = new MessageConsumer(url1, dest);
-                    log.info("--- Starting Consumer --- " + url1);
-
-                    // Produce from cluster1 and consume from the rest
-                    producer1.produce(2);
-
-                    consumer1.receive(2);
-
-                    producer1.close();
-                    consumer1.close();
-                    return null;
-                }
-            }));
-        }
-
-        for (Future<Void> result : results) {
-            try {
-                result.get();
-            } catch (Exception e) {
-                log.error("exception in getting future result ", e);
-                fail(String.format("replication test failed with %s exception", e.getMessage()));
-            }
-        }
-        admin1.persistentTopics().resetCursor(testDests.first(), "sub-id", System.currentTimeMillis());
-    }
-
-    @Test(enabled = true, timeOut = 30000)
-    public void testReplicationForBatchMessages() throws Exception {
-
-        log.info("--- Starting V1_ReplicatorTest::testReplicationForBatchMessages ---");
-
-        // Run a set of producer tasks to create the topics
-        SortedSet<String> testDests = new TreeSet<String>();
-        List<Future<Void>> results = Lists.newArrayList();
-        for (int i = 0; i < 3; i++) {
-            final TopicName dest = TopicName.get(String.format("persistent://pulsar/global/ns/repltopicbatch-%d", i));
-            testDests.add(dest.toString());
-
-            results.add(executor.submit(new Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
-
-                    MessageProducer producer1 = new MessageProducer(url1, dest, true);
-                    log.info("--- Starting producer --- " + url1);
-
-                    MessageProducer producer2 = new MessageProducer(url2, dest, true);
-                    log.info("--- Starting producer --- " + url2);
-
-                    MessageProducer producer3 = new MessageProducer(url3, dest, true);
-                    log.info("--- Starting producer --- " + url3);
-
-                    MessageConsumer consumer1 = new MessageConsumer(url1, dest);
-                    log.info("--- Starting Consumer --- " + url1);
-
-                    MessageConsumer consumer2 = new MessageConsumer(url2, dest);
-                    log.info("--- Starting Consumer --- " + url2);
-
-                    MessageConsumer consumer3 = new MessageConsumer(url3, dest);
-                    log.info("--- Starting Consumer --- " + url3);
-
-                    // Produce from cluster1 and consume from the rest
-                    producer1.produceBatch(10);
-
-                    consumer1.receive(10);
-
-                    consumer2.receive(10);
-
-                    consumer3.receive(10);
-
-                    // Produce from cluster2 and consume from the rest
-                    producer2.produceBatch(10);
-
-                    consumer1.receive(10);
-
-                    consumer2.receive(10);
-
-                    consumer3.receive(10);
-
-                    producer1.close();
-                    producer2.close();
-                    producer3.close();
-                    consumer1.close();
-                    consumer2.close();
-                    consumer3.close();
-                    return null;
-                }
-            }));
-        }
-
-        for (Future<Void> result : results) {
-            try {
-                result.get(5, TimeUnit.SECONDS);
-            } catch (Exception e) {
-                log.error("exception in getting future result ", e);
-                fail(String.format("replication test failed with %s exception", e.getMessage()));
-            }
-        }
-    }
-
-    /**
-     * It verifies that: if it fails while removing replicator-cluster-cursor: it should not restart the replicator and
-     * it should have cleaned up from the list
-     *
-     * @throws Exception
-     */
-    @Test(timeOut = 30000)
-    public void testDeleteReplicatorFailure() throws Exception {
-        log.info("--- Starting V1_ReplicatorTest::testDeleteReplicatorFailure ---");
-        final String topicName = "persistent://pulsar/global/ns/repltopicbatch";
-        final TopicName dest = TopicName.get(topicName);
-        MessageProducer producer1 = new MessageProducer(url1, dest);
-        PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(topicName).get();
-        final String replicatorClusterName = topic.getReplicators().keys().get(0);
-        ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger();
-        CountDownLatch latch = new CountDownLatch(1);
-        // delete cursor already : so next time if topic.removeReplicator will get exception but then it should
-        // remove-replicator from the list even with failure
-        ledger.asyncDeleteCursor("pulsar.repl." + replicatorClusterName, new DeleteCursorCallback() {
-            @Override
-            public void deleteCursorComplete(Object ctx) {
-                latch.countDown();
-            }
-
-            @Override
-            public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
-                latch.countDown();
-            }
-        }, null);
-        latch.await();
-
-        Method removeReplicator = PersistentTopic.class.getDeclaredMethod("removeReplicator", String.class);
-        removeReplicator.setAccessible(true);
-        // invoke removeReplicator : it fails as cursor is not present: but still it should remove the replicator from
-        // list without restarting it
-        @SuppressWarnings("unchecked")
-        CompletableFuture<Void> result = (CompletableFuture<Void>) removeReplicator.invoke(topic,
-                replicatorClusterName);
-        result.thenApply((v) -> {
-            assertNull(topic.getPersistentReplicator(replicatorClusterName));
-            return null;
-        });
-
-        producer1.close();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test(priority = 5, timeOut = 30000)
-    public void testReplicatorProducerClosing() throws Exception {
-        log.info("--- Starting V1_ReplicatorTest::testDeleteReplicatorFailure ---");
-        final String topicName = "persistent://pulsar/global/ns/repltopicbatch";
-        final TopicName dest = TopicName.get(topicName);
-        MessageProducer producer1 = new MessageProducer(url1, dest);
-        PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(topicName).get();
-        final String replicatorClusterName = topic.getReplicators().keys().get(0);
-        Replicator replicator = topic.getPersistentReplicator(replicatorClusterName);
-        pulsar2.close();
-        pulsar3.close();
-        replicator.disconnect(false);
-        Thread.sleep(100);
-        Field field = AbstractReplicator.class.getDeclaredField("producer");
-        field.setAccessible(true);
-        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) field.get(replicator);
-        assertNull(producer);
-        producer1.close();
-    }
-
-    /**
-     * Issue #199
-     *
-     * It verifies that: if the remote cluster reaches backlog quota limit, replicator temporarily stops and once the
-     * backlog drains it should resume replication.
-     *
-     * @throws Exception
-     */
-
-    @Test(timeOut = 60000, enabled = true, priority = -1)
-    public void testResumptionAfterBacklogRelaxed() throws Exception {
-        List<RetentionPolicy> policies = Lists.newArrayList();
-        policies.add(RetentionPolicy.producer_exception);
-        policies.add(RetentionPolicy.producer_request_hold);
-
-        for (RetentionPolicy policy : policies) {
-            // Use 1Mb quota by default
-            admin1.namespaces().setBacklogQuota("pulsar/global/ns1", new BacklogQuota(1 * 1024 * 1024, policy));
-            Thread.sleep(200);
-
-            TopicName dest = TopicName
-                    .get(String.format("persistent://pulsar/global/ns1/%s-%d", policy, System.currentTimeMillis()));
-
-            // Producer on r1
-            MessageProducer producer1 = new MessageProducer(url1, dest);
-
-            // Consumer on r2
-            MessageConsumer consumer2 = new MessageConsumer(url2, dest);
-
-            // Replicator for r1 -> r2
-            PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
-            Replicator replicator = topic.getPersistentReplicator("r2");
-
-            // Produce 1 message in r1. This message will be replicated immediately into r2 and it will become part of
-            // local backlog
-            producer1.produce(1);
-
-            Thread.sleep(500);
-
-            // Restrict backlog quota limit to 1 byte to stop replication
-            admin1.namespaces().setBacklogQuota("pulsar/global/ns1", new BacklogQuota(1, policy));
-
-            Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
-
-            assertEquals(replicator.getStats().replicationBacklog, 0);
-
-            // Next message will not be replicated, because r2 has reached the quota
-            producer1.produce(1);
-
-            Thread.sleep(500);
-
-            assertEquals(replicator.getStats().replicationBacklog, 1);
-
-            // Consumer will now drain 1 message and the replication backlog will be cleared
-            consumer2.receive(1);
-
-            // Wait until the 2nd message got delivered to consumer
-            consumer2.receive(1);
-
-            int retry = 10;
-            for (int i = 0; i < retry && replicator.getStats().replicationBacklog > 0; i++) {
-                if (i != retry - 1) {
-                    Thread.sleep(100);
-                }
-            }
-
-            assertEquals(replicator.getStats().replicationBacklog, 0);
-
-            producer1.close();
-            consumer2.close();
-        }
-    }
-
-    /**
-     * It verifies that PersistentReplicator considers CursorAlreadyClosedException as non-retriable-read exception and
-     * it should closed the producer as cursor is already closed because replicator is already deleted.
-     *
-     * @throws Exception
-     */
-    @Test(timeOut = 15000)
-    public void testCloseReplicatorStartProducer() throws Exception {
-
-        TopicName dest = TopicName.get("persistent://pulsar/global/ns1/closeCursor");
-        // Producer on r1
-        MessageProducer producer1 = new MessageProducer(url1, dest);
-        // Consumer on r1
-        MessageConsumer consumer1 = new MessageConsumer(url1, dest);
-        // Consumer on r2
-        MessageConsumer consumer2 = new MessageConsumer(url2, dest);
-
-        // Replicator for r1 -> r2
-        PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
-        PersistentReplicator replicator = (PersistentReplicator) topic.getPersistentReplicator("r2");
-
-        // close the cursor
-        Field cursorField = PersistentReplicator.class.getDeclaredField("cursor");
-        cursorField.setAccessible(true);
-        ManagedCursor cursor = (ManagedCursor) cursorField.get(replicator);
-        cursor.close();
-        // try to read entries
-        CountDownLatch latch = new CountDownLatch(1);
-        producer1.produce(10);
-        cursor.asyncReadEntriesOrWait(10, new ReadEntriesCallback() {
-            @Override
-            public void readEntriesComplete(List<Entry> entries, Object ctx) {
-                latch.countDown();
-                fail("it should have been failed");
-            }
-
-            @Override
-            public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
-                latch.countDown();
-                assertTrue(exception instanceof CursorAlreadyClosedException);
-            }
-        }, null);
-
-        // replicator-readException: cursorAlreadyClosed
-        replicator.readEntriesFailed(new CursorAlreadyClosedException("Cursor already closed exception"), null);
-
-        // wait replicator producer to be closed
-        Thread.sleep(1000);
-
-        // Replicator producer must be closed
-        Field producerField = AbstractReplicator.class.getDeclaredField("producer");
-        producerField.setAccessible(true);
-        @SuppressWarnings("unchecked")
-        ProducerImpl<byte[]> replicatorProducer = (ProducerImpl<byte[]>) producerField.get(replicator);
-        assertEquals(replicatorProducer, null);
-
-        producer1.close();
-        consumer1.close();
-        consumer2.close();
-    }
-
-    @Test(timeOut = 30000)
-    public void verifyChecksumAfterReplication() throws Exception {
-        final String topicName = "persistent://pulsar/global/ns/checksumAfterReplication";
-
-        PulsarClient c1 = PulsarClient.builder().serviceUrl(url1.toString()).build();
-        Producer<byte[]> p1 = c1.newProducer().topic(topicName).create();
-
-        PulsarClient c2 = PulsarClient.builder().serviceUrl(url2.toString()).build();
-        RawReader reader2 = RawReader.create(c2, topicName, "sub").get();
-
-        p1.send("Hello".getBytes());
-
-        RawMessage msg = reader2.readNextAsync().get();
-
-        ByteBuf b = msg.getHeadersAndPayload();
-
-        assertTrue(Commands.hasChecksum(b));
-        int parsedChecksum = Commands.readChecksum(b);
-        int computedChecksum = Crc32cIntChecksum.computeChecksum(b);
-
-        assertEquals(parsedChecksum, computedChecksum);
-
-        p1.close();
-        reader2.closeAsync().get();
-    }
-
-    /**
-     * It verifies that broker should not start replicator for partitioned-topic (topic without -partition postfix)
-     *
-     * @param isPartitionedTopic
-     * @throws Exception
-     */
-    @SuppressWarnings("deprecation")
-    @Test(dataProvider = "partitionedTopic")
-    public void testReplicatorOnPartitionedTopic(boolean isPartitionedTopic) throws Exception {
-
-        log.info("--- Starting V1_ReplicatorTest::{} --- ", methodName);
-
-        final String namespace = "pulsar/global/partitionedNs-" + isPartitionedTopic;
-        final String persistentTopicName = "persistent://" + namespace + "/partTopic-" + isPartitionedTopic;
-        final String nonPersistentTopicName = "non-persistent://" + namespace + "/partTopic-" + isPartitionedTopic;
-        BrokerService brokerService = pulsar1.getBrokerService();
-
-        admin1.namespaces().createNamespace(namespace);
-        admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3"));
-
-        if (isPartitionedTopic) {
-            admin1.persistentTopics().createPartitionedTopic(persistentTopicName, 5);
-            admin1.nonPersistentTopics().createPartitionedTopic(nonPersistentTopicName, 5);
-        }
-
-        // load namespace with dummy topic on ns
-        PulsarClient client = PulsarClient.builder().serviceUrl(url1.toString()).build();
-        client.newProducer().topic("persistent://" + namespace + "/dummyTopic").create();
-
-        // persistent topic test
-        try {
-            brokerService.getOrCreateTopic(persistentTopicName).get();
-            if (isPartitionedTopic) {
-                fail("Topic creation fails with partitioned topic as replicator init fails");
-            }
-        } catch (Exception e) {
-            if (!isPartitionedTopic) {
-                fail("Topic creation should not fail without any partitioned topic");
-            }
-            assertTrue(e.getCause() instanceof NamingException);
-        }
-
-        // non-persistent topic test
-        try {
-            brokerService.getOrCreateTopic(nonPersistentTopicName).get();
-            if (isPartitionedTopic) {
-                fail("Topic creation fails with partitioned topic as replicator init fails");
-            }
-        } catch (Exception e) {
-            if (!isPartitionedTopic) {
-                fail("Topic creation should not fail without any partitioned topic");
-            }
-            assertTrue(e.getCause() instanceof NamingException);
-        }
-
-    }
-
-    private static final Logger log = LoggerFactory.getLogger(V1_ReplicatorTest.class);
-
-}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase.java
deleted file mode 100644
index 4026632..0000000
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase.java
+++ /dev/null
@@ -1,377 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service.v1;
-
-import static org.testng.Assert.assertEquals;
-
-import com.google.common.collect.Sets;
-
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.bookkeeper.test.PortManager;
-import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerBuilder;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.TypedMessageBuilder;
-import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
-import org.apache.pulsar.zookeeper.ZookeeperServerTest;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class V1_ReplicatorTestBase {
-    URL url1;
-    URL urlTls1;
-    ServiceConfiguration config1 = new ServiceConfiguration();
-    PulsarService pulsar1;
-    BrokerService ns1;
-
-    PulsarAdmin admin1;
-    LocalBookkeeperEnsemble bkEnsemble1;
-
-    URL url2;
-    URL urlTls2;
-    ServiceConfiguration config2 = new ServiceConfiguration();
-    PulsarService pulsar2;
-    BrokerService ns2;
-    PulsarAdmin admin2;
-    LocalBookkeeperEnsemble bkEnsemble2;
-
-    URL url3;
-    URL urlTls3;
-    ServiceConfiguration config3 = new ServiceConfiguration();
-    PulsarService pulsar3;
-    BrokerService ns3;
-    PulsarAdmin admin3;
-    LocalBookkeeperEnsemble bkEnsemble3;
-
-    ZookeeperServerTest globalZkS;
-
-    ExecutorService executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
-
-    static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;
-
-    protected final static String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/certificate/server.crt";
-    protected final static String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/certificate/server.key";
-
-    // Default frequency
-    public int getBrokerServicePurgeInactiveFrequency() {
-        return 60;
-    }
-
-    public boolean isBrokerServicePurgeInactiveTopic() {
-        return false;
-    }
-
-    void setup() throws Exception {
-        log.info("--- Starting V1_ReplicatorTestBase::setup ---");
-        int globalZKPort = PortManager.nextFreePort();
-        globalZkS = new ZookeeperServerTest(globalZKPort);
-        globalZkS.start();
-
-        // Start region 1
-        int zkPort1 = PortManager.nextFreePort();
-        bkEnsemble1 = new LocalBookkeeperEnsemble(3, zkPort1, () -> PortManager.nextFreePort());
-        bkEnsemble1.start();
-
-        int webServicePort1 = PortManager.nextFreePort();
-        int webServicePortTls1 = PortManager.nextFreePort();
-
-        // NOTE: we have to instantiate a new copy of System.getProperties() to make sure pulsar1 and pulsar2 have
-        // completely
-        // independent config objects instead of referring to the same properties object
-        config1.setClusterName("r1");
-        config1.setAdvertisedAddress("localhost");
-        config1.setWebServicePort(webServicePort1);
-        config1.setWebServicePortTls(webServicePortTls1);
-        config1.setZookeeperServers("127.0.0.1:" + zkPort1);
-        config1.setConfigurationStoreServers("127.0.0.1:" + globalZKPort + "/foo");
-        config1.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
-        config1.setBrokerServicePurgeInactiveFrequencyInSeconds(
-                inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
-        config1.setBrokerServicePort(PortManager.nextFreePort());
-        config1.setBrokerServicePortTls(PortManager.nextFreePort());
-        config1.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
-        config1.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
-        config1.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
-        config1.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
-        config1.setDefaultNumberOfNamespaceBundles(1);
-        pulsar1 = new PulsarService(config1);
-        pulsar1.start();
-        ns1 = pulsar1.getBrokerService();
-
-        url1 = new URL("http://localhost:" + webServicePort1);
-        urlTls1 = new URL("https://localhost:" + webServicePortTls1);
-        admin1 = PulsarAdmin.builder().serviceHttpUrl(url1.toString()).build();
-
-        // Start region 2
-
-        // Start zk & bks
-        int zkPort2 = PortManager.nextFreePort();
-        bkEnsemble2 = new LocalBookkeeperEnsemble(3, zkPort2, () -> PortManager.nextFreePort());
-        bkEnsemble2.start();
-
-        int webServicePort2 = PortManager.nextFreePort();
-        int webServicePortTls2 = PortManager.nextFreePort();
-        config2.setClusterName("r2");
-        config2.setAdvertisedAddress("localhost");
-        config2.setWebServicePort(webServicePort2);
-        config2.setWebServicePortTls(webServicePortTls2);
-        config2.setZookeeperServers("127.0.0.1:" + zkPort2);
-        config2.setConfigurationStoreServers("127.0.0.1:" + globalZKPort + "/foo");
-        config2.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
-        config2.setBrokerServicePurgeInactiveFrequencyInSeconds(
-                inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
-        config2.setBrokerServicePort(PortManager.nextFreePort());
-        config2.setBrokerServicePortTls(PortManager.nextFreePort());
-        config2.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
-        config2.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
-        config2.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
-        config2.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
-        config2.setDefaultNumberOfNamespaceBundles(1);
-        pulsar2 = new PulsarService(config2);
-        pulsar2.start();
-        ns2 = pulsar2.getBrokerService();
-
-        url2 = new URL("http://localhost:" + webServicePort2);
-        urlTls2 = new URL("https://localhost:" + webServicePortTls2);
-        admin2 = PulsarAdmin.builder().serviceHttpUrl(url2.toString()).build();
-
-        // Start region 3
-
-        // Start zk & bks
-        int zkPort3 = PortManager.nextFreePort();
-        bkEnsemble3 = new LocalBookkeeperEnsemble(3, zkPort3, () -> PortManager.nextFreePort());
-        bkEnsemble3.start();
-
-        int webServicePort3 = PortManager.nextFreePort();
-        int webServicePortTls3 = PortManager.nextFreePort();
-        config3.setClusterName("r3");
-        config3.setAdvertisedAddress("localhost");
-        config3.setWebServicePort(webServicePort3);
-        config3.setWebServicePortTls(webServicePortTls3);
-        config3.setZookeeperServers("127.0.0.1:" + zkPort3);
-        config3.setConfigurationStoreServers("127.0.0.1:" + globalZKPort + "/foo");
-        config3.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
-        config3.setBrokerServicePurgeInactiveFrequencyInSeconds(
-                inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
-        config3.setBrokerServicePort(PortManager.nextFreePort());
-        config3.setBrokerServicePortTls(PortManager.nextFreePort());
-        config3.setTlsEnabled(true);
-        config3.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
-        config3.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
-        config3.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
-        config3.setDefaultNumberOfNamespaceBundles(1);
-        pulsar3 = new PulsarService(config3);
-        pulsar3.start();
-        ns3 = pulsar3.getBrokerService();
-
-        url3 = new URL("http://localhost:" + webServicePort3);
-        urlTls3 = new URL("https://localhost:" + webServicePortTls3);
-        admin3 = PulsarAdmin.builder().serviceHttpUrl(url3.toString()).build();
-
-        // Provision the global namespace
-        admin1.clusters().createCluster("r1", new ClusterData(url1.toString(), urlTls1.toString(),
-                pulsar1.getBrokerServiceUrl(), pulsar1.getBrokerServiceUrlTls()));
-        admin1.clusters().createCluster("r2", new ClusterData(url2.toString(), urlTls2.toString(),
-                pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls()));
-        admin1.clusters().createCluster("r3", new ClusterData(url3.toString(), urlTls3.toString(),
-                pulsar3.getBrokerServiceUrl(), pulsar3.getBrokerServiceUrlTls()));
-
-        admin1.clusters().createCluster("global", new ClusterData("http://global:8080", "https://global:8443"));
-        admin1.tenants().createTenant("pulsar",
-                new TenantInfo(Sets.newHashSet("appid1", "appid2", "appid3"), Sets.newHashSet("r1", "r2", "r3")));
-        admin1.namespaces().createNamespace("pulsar/global/ns");
-        admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/ns", Sets.newHashSet("r1", "r2", "r3"));
-        admin1.namespaces().createNamespace("pulsar/global/ns1");
-        admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/ns1", Sets.newHashSet("r1", "r2"));
-
-        assertEquals(admin2.clusters().getCluster("r1").getServiceUrl(), url1.toString());
-        assertEquals(admin2.clusters().getCluster("r2").getServiceUrl(), url2.toString());
-        assertEquals(admin2.clusters().getCluster("r3").getServiceUrl(), url3.toString());
-        assertEquals(admin2.clusters().getCluster("r1").getBrokerServiceUrl(), pulsar1.getBrokerServiceUrl());
-        assertEquals(admin2.clusters().getCluster("r2").getBrokerServiceUrl(), pulsar2.getBrokerServiceUrl());
-        assertEquals(admin2.clusters().getCluster("r3").getBrokerServiceUrl(), pulsar3.getBrokerServiceUrl());
-
-        Thread.sleep(100);
-        log.info("--- V1_ReplicatorTestBase::setup completed ---");
-
-    }
-
-    private int inSec(int time, TimeUnit unit) {
-        return (int) TimeUnit.SECONDS.convert(time, unit);
-    }
-
-    void shutdown() throws Exception {
-        log.info("--- Shutting down ---");
-        executor.shutdown();
-
-        admin1.close();
-        admin2.close();
-        admin3.close();
-
-        pulsar3.close();
-        pulsar2.close();
-        pulsar1.close();
-
-        bkEnsemble1.stop();
-        bkEnsemble2.stop();
-        bkEnsemble3.stop();
-        globalZkS.stop();
-    }
-
-    static class MessageProducer {
-        URL url;
-        String namespace;
-        String topicName;
-        PulsarClient client;
-        Producer<byte[]> producer;
-
-        MessageProducer(URL url, final TopicName dest) throws Exception {
-            this.url = url;
-            this.namespace = dest.getNamespace();
-            this.topicName = dest.toString();
-            client = PulsarClient.builder().serviceUrl(url.toString()).statsInterval(0, TimeUnit.SECONDS).build();
-            producer = client.newProducer().topic(topicName).create();
-
-        }
-
-        MessageProducer(URL url, final TopicName dest, boolean batch) throws Exception {
-            this.url = url;
-            this.namespace = dest.getNamespace();
-            this.topicName = dest.toString();
-            client = PulsarClient.builder().serviceUrl(url.toString()).statsInterval(0, TimeUnit.SECONDS).build();
-            ProducerBuilder<byte[]> producerBuilder = client.newProducer().topic(topicName);
-            if (batch) {
-                producerBuilder.enableBatching(true);
-                producerBuilder.batchingMaxPublishDelay(1, TimeUnit.SECONDS);
-                producerBuilder.batchingMaxMessages(5);
-            }
-            producer = producerBuilder.create();
-
-        }
-
-        void produceBatch(int messages) throws Exception {
-            log.info("Start sending batch messages");
-            List<CompletableFuture<MessageId>> futureList = new ArrayList<>();
-
-            for (int i = 0; i < messages; i++) {
-                futureList.add(producer.sendAsync(("test-" + i).getBytes()));
-                log.info("queued message {}", ("test-" + i));
-            }
-            FutureUtil.waitForAll(futureList).get();
-        }
-
-        void produce(int messages) throws Exception {
-
-            log.info("Start sending messages");
-            for (int i = 0; i < messages; i++) {
-                producer.send(("test-" + i).getBytes());
-                log.info("Sent message {}", ("test-" + i));
-            }
-
-        }
-
-        void produce(int messages, TypedMessageBuilder<byte[]> messageBuilder) throws Exception {
-            log.info("Start sending messages");
-            for (int i = 0; i < messages; i++) {
-                final String m = new String("test-builder-" + i);
-                messageBuilder.value(m.getBytes());
-                messageBuilder.send();
-                log.info("Sent message {}", m);
-            }
-        }
-
-        TypedMessageBuilder<byte[]> newMessage() {
-            return producer.newMessage();
-        }
-
-        void close() throws Exception {
-            client.close();
-        }
-
-    }
-
-    static class MessageConsumer {
-        final URL url;
-        final String namespace;
-        final String topicName;
-        final PulsarClient client;
-        final Consumer<byte[]> consumer;
-
-        MessageConsumer(URL url, final TopicName dest) throws Exception {
-            this(url, dest, "sub-id");
-        }
-
-        MessageConsumer(URL url, final TopicName dest, String subId) throws Exception {
-            this.url = url;
-            this.namespace = dest.getNamespace();
-            this.topicName = dest.toString();
-
-            client = PulsarClient.builder().serviceUrl(url.toString()).statsInterval(0, TimeUnit.SECONDS).build();
-
-            try {
-                consumer = client.newConsumer().topic(topicName).subscriptionName(subId).subscribe();
-            } catch (Exception e) {
-                client.close();
-                throw e;
-            }
-        }
-
-        void receive(int messages) throws Exception {
-            log.info("Start receiving messages");
-            Message<byte[]> msg = null;
-
-            for (int i = 0; i < messages; i++) {
-                msg = consumer.receive();
-                consumer.acknowledge(msg);
-                String msgData = new String(msg.getData());
-                assertEquals(msgData, "test-" + i);
-                log.info("Received message {}", msgData);
-            }
-        }
-
-        boolean drained() throws Exception {
-            return consumer.receive(0, TimeUnit.MICROSECONDS) == null;
-        }
-
-        void close() throws Exception {
-            client.close();
-        }
-    }
-
-    private static final Logger log = LoggerFactory.getLogger(V1_ReplicatorTestBase.class);
-}