You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/11/09 11:55:38 UTC

[pulsar] branch branch-2.9 updated: Fix call sync method in an async callback when enabling geo replicator. (#12590)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new a8a207b  Fix call sync method in an async callback when enabling geo replicator. (#12590)
a8a207b is described below

commit a8a207b1ae11564b98e3332204c0b20cb6c91118
Author: lipenghui <pe...@apache.org>
AuthorDate: Sat Nov 6 22:06:18 2021 +0800

    Fix call sync method in an async callback when enabling geo replicator. (#12590)
    
    After enabled geo-replication, we are not able to produce messages to a partitoned non-persistent topic.
    
    Reproduce step:
    
    1. Start a geo-replication instance with 2 clusters, I have 3 brokers for each cluster
    2. Create a non-persistent partitioned topic such as 10 partitions
    3. Use pulsar-perf to publish messages to the partitioned topic
    4. Verify if the message produce throughput is 0, such as `bin/pulsar-perf produce -s 2048 -r 100 -bm 1 non-persistent://public/default/test`
    
    The root cause is there are 2 places calling a sync method in the async method callback.
    So the fix is:
    
    1. Async the `validatePartitionedTopicAsync` method
    2. Avoid call get cluster sync method when getting the replication client
    
    Integration tests added.
    
    (cherry picked from commit a2b7cae3cfe8d3776483b1ecaf69af47949b70e1)
---
 .../pulsar/broker/resources/BaseResources.java     |   4 +
 .../broker/resources/NamespaceResources.java       |  11 +-
 .../apache/pulsar/broker/admin/AdminResource.java  |  31 ++++--
 .../broker/admin/impl/PersistentTopicsBase.java    |  11 +-
 .../pulsar/broker/service/AbstractReplicator.java  |  31 +++---
 .../pulsar/broker/service/BrokerService.java       |  56 +++++++----
 .../nonpersistent/NonPersistentReplicator.java     |   7 +-
 .../service/nonpersistent/NonPersistentTopic.java  |  53 +++++-----
 .../service/persistent/PersistentReplicator.java   |   8 +-
 .../broker/service/persistent/PersistentTopic.java |  85 ++++++++--------
 .../pulsar/broker/service/PersistentTopicTest.java |  10 +-
 .../service/ReplicatorRemoveClusterTest.java       |   4 +-
 .../pulsar/broker/service/ReplicatorTest.java      |   4 +-
 .../integration/messaging/GeoReplicationTest.java  | 112 +++++++++++++++++++++
 .../integration/topologies/PulsarCluster.java      |  59 +++++++----
 .../integration/topologies/PulsarGeoCluster.java   |  82 +++++++++++++++
 .../topologies/PulsarGeoClusterTestBase.java       |  92 +++++++++++++++++
 .../integration/topologies/PulsarTestBase.java     |   9 ++
 .../src/test/resources/pulsar-messaging.xml        |   1 +
 19 files changed, 516 insertions(+), 154 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
index 9061dd7..1e463fa 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
@@ -167,6 +167,10 @@ public class BaseResources<T> {
         }
     }
 
+    protected CompletableFuture<Boolean> existsAsync(String path) {
+        return cache.exists(path);
+    }
+
     public int getOperationTimeoutSec() {
         return operationTimeoutSec;
     }
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
index dcba2ac..2beeab8 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
@@ -18,14 +18,11 @@
  */
 package org.apache.pulsar.broker.resources;
 
-import static org.apache.pulsar.common.policies.path.PolicyPath.path;
 import com.fasterxml.jackson.core.type.TypeReference;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import lombok.Getter;
@@ -39,7 +36,6 @@ import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
 import org.apache.pulsar.common.util.Codec;
-import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -122,7 +118,7 @@ public class NamespaceResources extends BaseResources<Policies> {
     }
 
     public static boolean pathIsFromNamespace(String path) {
-        return path.startsWith(BASE_POLICIES_PATH);
+        return path.startsWith(BASE_POLICIES_PATH) && path.substring(BASE_POLICIES_PATH.length() + 1).contains("/");
     }
 
     public static NamespaceName namespaceFromPath(String path) {
@@ -208,6 +204,11 @@ public class NamespaceResources extends BaseResources<Policies> {
                     tn.getEncodedLocalName()));
         }
 
+        public CompletableFuture<Boolean> partitionedTopicExistsAsync(TopicName tn) {
+            return existsAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(),
+                    tn.getEncodedLocalName()));
+        }
+
         public CompletableFuture<Void> deletePartitionedTopicAsync(TopicName tn) {
             return deleteAsync(joinPath(PARTITIONED_TOPIC_PATH, tn.getNamespace(), tn.getDomain().value(),
                     tn.getEncodedLocalName()));
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 625f419..f3a94d2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -632,10 +632,7 @@ public abstract class AdminResource extends PulsarWebResource {
             return;
         }
 
-        List<CompletableFuture<Void>> createFutureList = new ArrayList<>();
-
         CompletableFuture<Void> createLocalFuture = new CompletableFuture<>();
-        createFutureList.add(createLocalFuture);
         checkTopicExistsAsync(topicName).thenAccept(exists -> {
             if (exists) {
                 log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
@@ -658,7 +655,13 @@ public abstract class AdminResource extends PulsarWebResource {
             return null;
         });
 
-        FutureUtil.waitForAll(createFutureList).whenComplete((ignored, ex) -> {
+        List<String> replicatedClusters = new ArrayList<>();
+        if (!createLocalTopicOnly && topicName.isGlobal() && isNamespaceReplicated(namespaceName)) {
+            getNamespaceReplicatedClusters(namespaceName)
+                    .stream().filter(cluster -> !cluster.equals(pulsar().getConfiguration().getClusterName()))
+                    .forEach(replicatedClusters::add);
+        }
+        createLocalFuture.whenComplete((ignored, ex) -> {
             if (ex != null) {
                 log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName, ex.getCause());
                 if (ex.getCause() instanceof RestException) {
@@ -669,14 +672,20 @@ public abstract class AdminResource extends PulsarWebResource {
                 return;
             }
 
-            if (!createLocalTopicOnly && topicName.isGlobal() && isNamespaceReplicated(namespaceName)) {
-                getNamespaceReplicatedClusters(namespaceName)
-                        .stream()
-                        .filter(cluster -> !cluster.equals(pulsar().getConfiguration().getClusterName()))
-                        .forEach(cluster -> createFutureList.add(
-                                ((TopicsImpl) pulsar().getBrokerService().getClusterPulsarAdmin(cluster).topics())
+            if (!replicatedClusters.isEmpty()) {
+                replicatedClusters.forEach(cluster -> {
+                    pulsar().getPulsarResources().getClusterResources().getClusterAsync(cluster)
+                            .thenAccept(clusterDataOp -> {
+                                ((TopicsImpl) pulsar().getBrokerService()
+                                        .getClusterPulsarAdmin(cluster, clusterDataOp).topics())
                                         .createPartitionedTopicAsync(
-                                                topicName.getPartitionedTopicName(), numPartitions, true)));
+                                                topicName.getPartitionedTopicName(), numPartitions, true);
+                            })
+                            .exceptionally(throwable -> {
+                                log.error("Failed to create partition topic in cluster {}.", cluster, throwable);
+                                return null;
+                            });
+                });
             }
 
             log.info("[{}] Successfully created partitions for topic {} in cluster {}",
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 0271abd..a6d8f2c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -508,9 +508,14 @@ public class PersistentTopicsBase extends AdminResource {
             if (cluster.equals(pulsar().getConfig().getClusterName())) {
                 return;
             }
-            results.add(pulsar().getBrokerService().getClusterPulsarAdmin(cluster).topics()
-                    .updatePartitionedTopicAsync(topicName.toString(),
-                            numPartitions, true, false));
+            CompletableFuture<Void> updatePartitionTopicFuture =
+                pulsar().getPulsarResources().getClusterResources().getClusterAsync(cluster)
+                    .thenApply(clusterDataOp ->
+                            pulsar().getBrokerService().getClusterPulsarAdmin(cluster, clusterDataOp))
+                    .thenCompose(pulsarAdmin ->
+                            pulsarAdmin.topics().updatePartitionedTopicAsync(
+                                    topicName.toString(), numPartitions, true, false));
+            results.add(updatePartitionTopicFuture);
         });
         return FutureUtil.waitForAll(results);
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index b7e85da..59ec74f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,14 +65,14 @@ public abstract class AbstractReplicator {
     }
 
     public AbstractReplicator(String topicName, String replicatorPrefix, String localCluster, String remoteCluster,
-                              BrokerService brokerService) throws NamingException, PulsarServerException {
-        validatePartitionedTopic(topicName, brokerService);
+                              BrokerService brokerService, PulsarClientImpl replicationClient)
+            throws PulsarServerException {
         this.brokerService = brokerService;
         this.topicName = topicName;
         this.replicatorPrefix = replicatorPrefix;
         this.localCluster = localCluster.intern();
         this.remoteCluster = remoteCluster.intern();
-        this.replicationClient = (PulsarClientImpl) brokerService.getReplicationClient(remoteCluster);
+        this.replicationClient = replicationClient;
         this.client = (PulsarClientImpl) brokerService.pulsar().getClient();
         this.producer = null;
         this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();
@@ -242,20 +243,18 @@ public abstract class AbstractReplicator {
      * @param topic
      * @param brokerService
      */
-    private void validatePartitionedTopic(String topic, BrokerService brokerService) throws NamingException {
+    public static CompletableFuture<Void> validatePartitionedTopicAsync(String topic, BrokerService brokerService) {
         TopicName topicName = TopicName.get(topic);
-        boolean isPartitionedTopic = false;
-        try {
-            isPartitionedTopic =
-                    brokerService.pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
-                            .partitionedTopicExists(topicName);
-        } catch (Exception e) {
-            log.warn("Failed to verify partitioned topic {}-{}", topicName, e.getMessage());
-        }
-        if (isPartitionedTopic) {
-            throw new NamingException(
-                    topicName + " is a partitioned-topic and replication can't be started for partitioned-producer ");
-        }
+        return brokerService.pulsar().getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
+            .partitionedTopicExistsAsync(topicName).thenCompose(isPartitionedTopic -> {
+                if (isPartitionedTopic) {
+                    String s = topicName
+                            + " is a partitioned-topic and replication can't be started for partitioned-producer ";
+                    log.error(s);
+                    return FutureUtil.failedFuture(new NamingException(s));
+                }
+                return CompletableFuture.completedFuture(null);
+            });
     }
 
     private static final Logger log = LoggerFactory.getLogger(AbstractReplicator.class);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index c3a241d..456af1f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -994,6 +994,7 @@ public class BrokerService implements Closeable {
     }
 
     private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic) {
+        CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>();
         if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
             if (log.isDebugEnabled()) {
                 log.debug("Broker is unable to load non-persistent topic {}", topic);
@@ -1003,27 +1004,40 @@ public class BrokerService implements Closeable {
         }
         final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
         NonPersistentTopic nonPersistentTopic = new NonPersistentTopic(topic, this);
-
-        CompletableFuture<Optional<Topic>> future = nonPersistentTopic.initialize()
-                .thenCompose(__ -> nonPersistentTopic.checkReplication())
-                .thenApply(__ -> {
-            log.info("Created topic {}", nonPersistentTopic);
-            long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs;
-            pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
-            addTopicToStatsMaps(TopicName.get(topic), nonPersistentTopic);
-            return Optional.of(nonPersistentTopic);
-        });
-
-        future.exceptionally((ex) -> {
-            log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex);
-            nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> {
-                pulsar.getExecutor().execute(() -> topics.remove(topic, future));
+        CompletableFuture<Void> isOwner = checkTopicNsOwnership(topic);
+        isOwner.thenRun(() -> {
+            nonPersistentTopic.initialize()
+                    .thenCompose(__ -> nonPersistentTopic.checkReplication())
+                    .thenRun(() -> {
+                        log.info("Created topic {}", nonPersistentTopic);
+                        long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs;
+                        pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
+                        addTopicToStatsMaps(TopicName.get(topic), nonPersistentTopic);
+                        topicFuture.complete(Optional.of(nonPersistentTopic));
+                    }).exceptionally(ex -> {
+                log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex.getCause());
+                nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> {
+                    pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
+                    topicFuture.completeExceptionally(ex);
+                });
+                return null;
             });
-
+        }).exceptionally(e -> {
+            log.warn("CheckTopicNsOwnership fail when createNonPersistentTopic! {}", topic, e.getCause());
+            // CheckTopicNsOwnership fail dont create nonPersistentTopic, when topic do lookup will find the correct
+            // broker. When client get non-persistent-partitioned topic
+            // metadata will the non-persistent-topic will be created.
+            // so we should add checkTopicNsOwnership logic otherwise the topic will be created
+            // if it dont own by this broker,we should return success
+            // otherwise it will keep retrying getPartitionedTopicMetadata
+            topicFuture.complete(Optional.of(nonPersistentTopic));
+            // after get metadata return success, we should delete this topic from this broker, because this topic not
+            // owner by this broker and it don't initialize and checkReplication
+            pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
             return null;
         });
 
-        return future;
+        return topicFuture;
     }
 
     private <T> CompletableFuture<T> futureWithDeadline() {
@@ -1031,7 +1045,7 @@ public class BrokerService implements Closeable {
                 () -> FUTURE_DEADLINE_TIMEOUT_EXCEPTION);
     }
 
-    public PulsarClient getReplicationClient(String cluster) {
+    public PulsarClient getReplicationClient(String cluster, Optional<ClusterData> clusterDataOp) {
         PulsarClient client = replicationClients.get(cluster);
         if (client != null) {
             return client;
@@ -1039,7 +1053,7 @@ public class BrokerService implements Closeable {
 
         return replicationClients.computeIfAbsent(cluster, key -> {
             try {
-                ClusterData data = pulsar.getPulsarResources().getClusterResources().getCluster(cluster)
+                ClusterData data = clusterDataOp
                         .orElseThrow(() -> new MetadataStoreException.NotFoundException(cluster));
                 ClientBuilder clientBuilder = PulsarClient.builder()
                         .enableTcpNoDelay(false)
@@ -1106,14 +1120,14 @@ public class BrokerService implements Closeable {
         }
     }
 
-    public PulsarAdmin getClusterPulsarAdmin(String cluster) {
+    public PulsarAdmin getClusterPulsarAdmin(String cluster, Optional<ClusterData> clusterDataOp) {
         PulsarAdmin admin = clusterAdmins.get(cluster);
         if (admin != null) {
             return admin;
         }
         return clusterAdmins.computeIfAbsent(cluster, key -> {
             try {
-                ClusterData data = pulsar.getPulsarResources().getClusterResources().getCluster(cluster)
+                ClusterData data = clusterDataOp
                         .orElseThrow(() -> new MetadataStoreException.NotFoundException(cluster));
 
                 ServiceConfiguration conf = pulsar.getConfig();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
index dd57fd9..ce1fe34 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
@@ -28,13 +28,13 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.service.AbstractReplicator;
 import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import org.apache.pulsar.broker.service.Replicator;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.SendCallback;
 import org.apache.pulsar.common.policies.data.stats.NonPersistentReplicatorStatsImpl;
 import org.apache.pulsar.common.stats.Rate;
@@ -49,8 +49,9 @@ public class NonPersistentReplicator extends AbstractReplicator implements Repli
     private final NonPersistentReplicatorStatsImpl stats = new NonPersistentReplicatorStatsImpl();
 
     public NonPersistentReplicator(NonPersistentTopic topic, String localCluster, String remoteCluster,
-            BrokerService brokerService) throws NamingException, PulsarServerException {
-        super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService);
+            BrokerService brokerService, PulsarClientImpl replicationClient) throws PulsarServerException {
+        super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService,
+                replicationClient);
 
         producerBuilder.blockIfQueueFull(false);
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index c1687a3..2c62f2e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -32,13 +32,14 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.LongAdder;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.resources.NamespaceResources;
+import org.apache.pulsar.broker.service.AbstractReplicator;
 import org.apache.pulsar.broker.service.AbstractTopic;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -61,6 +62,7 @@ import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.NamespaceStats;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.KeySharedMeta;
@@ -528,14 +530,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
                         }
 
                         if (!replicators.containsKey(cluster)) {
-                            if (!startReplicator(cluster)) {
-                                // it happens when global topic is a partitioned topic and replicator can't start on
-                                // original
-                                // non partitioned-topic (topic without partition prefix)
-                                return FutureUtil
-                                        .failedFuture(new NamingException(
-                                                topic + " failed to start replicator for " + cluster));
-                            }
+                            futures.add(startReplicator(cluster));
                         }
                     }
 
@@ -553,29 +548,35 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
 
     }
 
-    boolean startReplicator(String remoteCluster) {
+    CompletableFuture<Void> startReplicator(String remoteCluster) {
         log.info("[{}] Starting replicator to remote: {}", topic, remoteCluster);
         String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
         return addReplicationCluster(remoteCluster, NonPersistentTopic.this, localCluster);
     }
 
-    protected boolean addReplicationCluster(String remoteCluster, NonPersistentTopic nonPersistentTopic,
+    protected CompletableFuture<Void> addReplicationCluster(String remoteCluster, NonPersistentTopic nonPersistentTopic,
             String localCluster) {
-        AtomicBoolean isReplicatorStarted = new AtomicBoolean(true);
-        replicators.computeIfAbsent(remoteCluster, r -> {
-            try {
-                return new NonPersistentReplicator(NonPersistentTopic.this, localCluster, remoteCluster, brokerService);
-            } catch (NamingException | PulsarServerException e) {
-                isReplicatorStarted.set(false);
-                log.error("[{}] Replicator startup failed due to partitioned-topic {}", topic, remoteCluster);
-            }
-            return null;
-        });
-        // clean up replicator if startup is failed
-        if (!isReplicatorStarted.get()) {
-            replicators.remove(remoteCluster);
-        }
-        return isReplicatorStarted.get();
+        return AbstractReplicator.validatePartitionedTopicAsync(nonPersistentTopic.getName(), brokerService)
+                .thenCompose(__ -> brokerService.pulsar().getPulsarResources().getClusterResources()
+                        .getClusterAsync(remoteCluster)
+                        .thenApply(clusterData ->
+                                brokerService.getReplicationClient(remoteCluster, clusterData)))
+                .thenAccept(replicationClient -> {
+                    replicators.computeIfAbsent(remoteCluster, r -> {
+                        try {
+                            return new NonPersistentReplicator(NonPersistentTopic.this, localCluster,
+                                    remoteCluster, brokerService, (PulsarClientImpl) replicationClient);
+                        } catch (PulsarServerException e) {
+                            log.error("[{}] Replicator startup failed {}", topic, remoteCluster, e);
+                        }
+                        return null;
+                    });
+
+                    // clean up replicator if startup is failed
+                    if (replicators.containsKey(remoteCluster) && replicators.get(remoteCluster) == null) {
+                        replicators.remove(remoteCluster);
+                    }
+                });
     }
 
     CompletableFuture<Void> removeReplicator(String remoteCluster) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index aa53e15..4d79c9a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -45,7 +45,6 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.service.AbstractReplicator;
 import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
 import org.apache.pulsar.broker.service.Replicator;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
@@ -54,6 +53,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.SendCallback;
 import org.apache.pulsar.common.api.proto.MarkerType;
 import org.apache.pulsar.common.policies.data.Policies;
@@ -105,8 +105,10 @@ public class PersistentReplicator extends AbstractReplicator
     private final ReplicatorStatsImpl stats = new ReplicatorStatsImpl();
 
     public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster, String remoteCluster,
-                                BrokerService brokerService) throws NamingException, PulsarServerException {
-        super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService);
+                                BrokerService brokerService, PulsarClientImpl replicationClient)
+            throws PulsarServerException {
+        super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService,
+                replicationClient);
         this.topic = topic;
         this.cursor = cursor;
         this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 64e4b7f..f74060c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -76,6 +76,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.resources.NamespaceResources.PartitionedTopicResources;
+import org.apache.pulsar.broker.service.AbstractReplicator;
 import org.apache.pulsar.broker.service.AbstractTopic;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -116,6 +117,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
@@ -265,20 +267,8 @@ public class PersistentTopic extends AbstractTopic
         this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
 
         for (ManagedCursor cursor : ledger.getCursors()) {
-            if (cursor.getName().startsWith(replicatorPrefix)) {
-                String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
-                String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName());
-                boolean isReplicatorStarted = false;
-                try {
-                    isReplicatorStarted = addReplicationCluster(remoteCluster, cursor, localCluster);
-                } catch (Exception e) {
-                    log.warn("[{}] failed to start replication", topic, e);
-                }
-                if (!isReplicatorStarted) {
-                    throw new NamingException(
-                            PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster);
-                }
-            } else if (cursor.getName().equals(DEDUPLICATION_CURSOR_NAME)) {
+            if (cursor.getName().equals(DEDUPLICATION_CURSOR_NAME)
+                    || cursor.getName().startsWith(replicatorPrefix)) {
                 // This is not a regular subscription, we are going to
                 // ignore it for now and let the message dedup logic to take care of it
             } else {
@@ -309,7 +299,16 @@ public class PersistentTopic extends AbstractTopic
 
     @Override
     public CompletableFuture<Void> initialize() {
-        return brokerService.pulsar().getPulsarResources().getNamespaceResources()
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        for (ManagedCursor cursor : ledger.getCursors()) {
+            if (cursor.getName().startsWith(replicatorPrefix)) {
+                String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
+                String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName());
+                futures.add(addReplicationCluster(remoteCluster, cursor, localCluster));
+            }
+        }
+        return FutureUtil.waitForAll(futures).thenCompose(__ ->
+            brokerService.pulsar().getPulsarResources().getNamespaceResources()
                 .getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
                 .thenAccept(optPolicies -> {
                     if (!optPolicies.isPresent()) {
@@ -338,7 +337,7 @@ public class PersistentTopic extends AbstractTopic
                     updateUnackedMessagesAppliedOnSubscription(null);
                     updateUnackedMessagesExceededOnConsumer(null);
                     return null;
-                });
+                }));
     }
 
     // for testing purposes
@@ -1519,13 +1518,13 @@ public class PersistentTopic extends AbstractTopic
             @Override
             public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                 String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
-                boolean isReplicatorStarted = addReplicationCluster(remoteCluster, cursor, localCluster);
-                if (isReplicatorStarted) {
-                    future.complete(null);
-                } else {
-                    future.completeExceptionally(new NamingException(
-                            PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster));
-                }
+                addReplicationCluster(remoteCluster, cursor, localCluster).whenComplete((__, ex) -> {
+                    if (ex == null) {
+                        future.complete(null);
+                    } else {
+                        future.completeExceptionally(ex);
+                    }
+                });
             }
 
             @Override
@@ -1538,23 +1537,29 @@ public class PersistentTopic extends AbstractTopic
         return future;
     }
 
-    protected boolean addReplicationCluster(String remoteCluster, ManagedCursor cursor, String localCluster) {
-        AtomicBoolean isReplicatorStarted = new AtomicBoolean(true);
-        replicators.computeIfAbsent(remoteCluster, r -> {
-            try {
-                return new PersistentReplicator(PersistentTopic.this, cursor, localCluster, remoteCluster,
-                        brokerService);
-            } catch (NamingException | PulsarServerException e) {
-                isReplicatorStarted.set(false);
-                log.error("[{}] Replicator startup failed due to partitioned-topic {}", topic, remoteCluster);
-            }
-            return null;
-        });
-        // clean up replicator if startup is failed
-        if (!isReplicatorStarted.get()) {
-            replicators.remove(remoteCluster);
-        }
-        return isReplicatorStarted.get();
+    protected CompletableFuture<Void> addReplicationCluster(String remoteCluster, ManagedCursor cursor,
+            String localCluster) {
+        return AbstractReplicator.validatePartitionedTopicAsync(PersistentTopic.this.getName(), brokerService)
+                .thenCompose(__ -> brokerService.pulsar().getPulsarResources().getClusterResources()
+                        .getClusterAsync(remoteCluster)
+                        .thenApply(clusterData ->
+                                brokerService.getReplicationClient(remoteCluster, clusterData)))
+                .thenAccept(replicationClient -> {
+                    replicators.computeIfAbsent(remoteCluster, r -> {
+                        try {
+                            return new PersistentReplicator(PersistentTopic.this, cursor, localCluster,
+                                    remoteCluster, brokerService, (PulsarClientImpl) replicationClient);
+                        } catch (PulsarServerException e) {
+                            log.error("[{}] Replicator startup failed {}", topic, remoteCluster, e);
+                        }
+                        return null;
+                    });
+
+                    // clean up replicator if startup is failed
+                    if (replicators.containsKey(remoteCluster) && replicators.get(remoteCluster) == null) {
+                        replicators.remove(remoteCluster);
+                    }
+                });
     }
 
     CompletableFuture<Void> removeReplicator(String remoteCluster) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 8603404..543d897 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -1768,7 +1768,10 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
         doReturn(remoteCluster).when(cursor).getName();
         brokerService.getReplicationClients().put(remoteCluster, client);
         PersistentReplicator replicator = spy(
-                new PersistentReplicator(topic, cursor, localCluster, remoteCluster, brokerService));
+                new PersistentReplicator(topic, cursor, localCluster, remoteCluster, brokerService,
+                        (PulsarClientImpl) brokerService.getReplicationClient(remoteCluster,
+                                brokerService.pulsar().getPulsarResources().getClusterResources()
+                                .getCluster(remoteCluster))));
         replicatorMap.put(remoteReplicatorName, replicator);
 
         // step-1 remove replicator : it will disconnect the producer but it will wait for callback to be completed
@@ -1813,7 +1816,10 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
         ManagedCursor cursor = mock(ManagedCursorImpl.class);
         doReturn(remoteCluster).when(cursor).getName();
         brokerService.getReplicationClients().put(remoteCluster, client);
-        PersistentReplicator replicator = new PersistentReplicator(topic, cursor, localCluster, remoteCluster, brokerService);
+        PersistentReplicator replicator = new PersistentReplicator(topic, cursor, localCluster, remoteCluster,
+                brokerService, (PulsarClientImpl) brokerService.getReplicationClient(remoteCluster,
+                brokerService.pulsar().getPulsarResources().getClusterResources()
+                        .getCluster(remoteCluster)));
 
         // PersistentReplicator constructor calls startProducer()
         verify(clientImpl)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRemoveClusterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRemoveClusterTest.java
index 701ab47..65e9096 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRemoveClusterTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRemoveClusterTest.java
@@ -75,7 +75,9 @@ public class ReplicatorRemoveClusterTest extends ReplicatorTestBase {
 
         admin1.namespaces().createNamespace("pulsar1/ns1", Sets.newHashSet("r1", "r2", "r3"));
 
-        PulsarClient repClient1 = pulsar1.getBrokerService().getReplicationClient("r3");
+        PulsarClient repClient1 = pulsar1.getBrokerService().getReplicationClient("r3",
+                pulsar1.getBrokerService().pulsar().getPulsarResources().getClusterResources()
+                .getCluster("r3"));
         Assert.assertNotNull(repClient1);
         Assert.assertFalse(repClient1.isClosed());
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 9b4fcc8..11c8e19 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -262,7 +262,9 @@ public class ReplicatorTest extends ReplicatorTestBase {
                 .getOrCreateTopic(topicName.toString()).get();
 
         PulsarClientImpl pulsarClient = spy((PulsarClientImpl) pulsar1.getBrokerService()
-                .getReplicationClient("r3"));
+                .getReplicationClient("r3",
+                        pulsar1.getBrokerService().pulsar().getPulsarResources().getClusterResources()
+                        .getCluster("r3")));
         final Method startRepl = PersistentTopic.class.getDeclaredMethod("startReplicator", String.class);
         startRepl.setAccessible(true);
 
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/GeoReplicationTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/GeoReplicationTest.java
new file mode 100644
index 0000000..75ce363
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/GeoReplicationTest.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.messaging;
+
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.tests.integration.topologies.PulsarGeoClusterTestBase;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Geo replication test.
+ */
+@Slf4j
+public class GeoReplicationTest extends PulsarGeoClusterTestBase {
+
+    @BeforeClass(alwaysRun = true)
+    public final void setupBeforeClass() throws Exception {
+        setup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    public final void tearDownAfterClass() throws Exception {
+        cleanup();
+    }
+
+    @Test(timeOut = 1000 * 30, dataProvider = "TopicDomain")
+    public void testTopicReplication(String domain) throws Exception {
+        String cluster1 = getGeoCluster().getClusters()[0].getClusterName();
+        String cluster2 = getGeoCluster().getClusters()[1].getClusterName();
+
+        @Cleanup
+        PulsarAdmin admin = PulsarAdmin.builder()
+                .serviceHttpUrl(getGeoCluster().getClusters()[0].getHttpServiceUrl())
+                .requestTimeout(30, TimeUnit.SECONDS)
+                .build();
+
+        String topic = domain + "://public/default/testTopicReplication-" + UUID.randomUUID();
+        Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+            try {
+                admin.topics().createPartitionedTopic(topic, 10);
+            } catch (Exception e) {
+                log.error("Failed to create partitioned topic {}.", topic, e);
+                Assert.fail("Failed to create partitioned topic " + topic);
+            }
+            Assert.assertEquals(admin.topics().getPartitionedTopicMetadata(topic).partitions, 10);
+        });
+        log.info("Test geo-replication produce and consume for topic {}.", topic);
+
+        @Cleanup
+        PulsarClient client1 = PulsarClient.builder()
+                .serviceUrl(getGeoCluster().getClusters()[0].getPlainTextServiceUrl())
+                .build();
+
+        @Cleanup
+        PulsarClient client2 = PulsarClient.builder()
+                .serviceUrl(getGeoCluster().getClusters()[1].getPlainTextServiceUrl())
+                .build();
+
+        @Cleanup
+        Producer<byte[]> p = client1.newProducer()
+                .topic(topic)
+                .create();
+        log.info("Successfully create producer in cluster {} for topic {}.", cluster1, topic);
+
+        @Cleanup
+        Consumer<byte[]> c = client2.newConsumer()
+                .topic(topic)
+                .subscriptionName("geo-sub")
+                .subscribe();
+        log.info("Successfully create consumer in cluster {} for topic {}.", cluster2, topic);
+
+        for (int i = 0; i < 10; i++) {
+            p.send(String.format("Message [%d]", i).getBytes(StandardCharsets.UTF_8));
+        }
+        log.info("Successfully produce message to cluster {} for topic {}.", cluster1, topic);
+
+        for (int i = 0; i < 10; i++) {
+            Message<byte[]> message = c.receive(10, TimeUnit.SECONDS);
+            Assert.assertNotNull(message);
+        }
+        log.info("Successfully consume message from cluster {} for topic {}.", cluster2, topic);
+    }
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
index 043762c..2191799 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java
@@ -70,7 +70,14 @@ public class PulsarCluster {
      * @return the built pulsar cluster
      */
     public static PulsarCluster forSpec(PulsarClusterSpec spec) {
-        return new PulsarCluster(spec);
+        CSContainer csContainer = new CSContainer(spec.clusterName)
+                .withNetwork(Network.newNetwork())
+                .withNetworkAliases(CSContainer.NAME);
+        return new PulsarCluster(spec, csContainer, false);
+    }
+
+    public static PulsarCluster forSpec(PulsarClusterSpec spec, CSContainer csContainer) {
+        return new PulsarCluster(spec, csContainer, true);
     }
 
     private final PulsarClusterSpec spec;
@@ -80,6 +87,7 @@ public class PulsarCluster {
     private final Network network;
     private final ZKContainer zkContainer;
     private final CSContainer csContainer;
+    private final boolean sharedCsContainer;
     private final Map<String, BKContainer> bookieContainers;
     private final Map<String, BrokerContainer> brokerContainers;
     private final Map<String, WorkerContainer> workerContainers;
@@ -90,11 +98,12 @@ public class PulsarCluster {
     private Map<String, GenericContainer<?>> externalServices = Collections.emptyMap();
     private final boolean enablePrestoWorker;
 
-    private PulsarCluster(PulsarClusterSpec spec) {
+    private PulsarCluster(PulsarClusterSpec spec, CSContainer csContainer, boolean sharedCsContainer) {
 
         this.spec = spec;
+        this.sharedCsContainer = sharedCsContainer;
         this.clusterName = spec.clusterName();
-        this.network = Network.newNetwork();
+        this.network = csContainer.getNetwork();
         this.enablePrestoWorker = spec.enablePrestoWorker();
 
         this.sqlFollowWorkerContainers = Maps.newTreeMap();
@@ -109,26 +118,24 @@ public class PulsarCluster {
         this.zkContainer = new ZKContainer(clusterName);
         this.zkContainer
             .withNetwork(network)
-            .withNetworkAliases(ZKContainer.NAME)
+            .withNetworkAliases(appendClusterName(ZKContainer.NAME))
             .withEnv("clusterName", clusterName)
-            .withEnv("zkServers", ZKContainer.NAME)
+            .withEnv("zkServers", appendClusterName(ZKContainer.NAME))
             .withEnv("configurationStore", CSContainer.NAME + ":" + CS_PORT)
             .withEnv("forceSync", "no")
-            .withEnv("pulsarNode", "pulsar-broker-0");
+            .withEnv("pulsarNode", appendClusterName("pulsar-broker-0"));
 
-        this.csContainer = new CSContainer(clusterName)
-            .withNetwork(network)
-            .withNetworkAliases(CSContainer.NAME);
+        this.csContainer = csContainer;
 
         this.bookieContainers = Maps.newTreeMap();
         this.brokerContainers = Maps.newTreeMap();
         this.workerContainers = Maps.newTreeMap();
 
-        this.proxyContainer = new ProxyContainer(clusterName, ProxyContainer.NAME)
+        this.proxyContainer = new ProxyContainer(appendClusterName("pulsar-proxy"), ProxyContainer.NAME)
             .withNetwork(network)
-            .withNetworkAliases("pulsar-proxy")
-            .withEnv("zkServers", ZKContainer.NAME)
-            .withEnv("zookeeperServers", ZKContainer.NAME)
+            .withNetworkAliases(appendClusterName("pulsar-proxy"))
+            .withEnv("zkServers", appendClusterName(ZKContainer.NAME))
+            .withEnv("zookeeperServers", appendClusterName(ZKContainer.NAME))
             .withEnv("configurationStoreServers", CSContainer.NAME + ":" + CS_PORT)
             .withEnv("clusterName", clusterName);
         if (spec.proxyEnvs != null) {
@@ -142,8 +149,8 @@ public class PulsarCluster {
         bookieContainers.putAll(
                 runNumContainers("bookie", spec.numBookies(), (name) -> new BKContainer(clusterName, name)
                         .withNetwork(network)
-                        .withNetworkAliases(name)
-                        .withEnv("zkServers", ZKContainer.NAME)
+                        .withNetworkAliases(appendClusterName(name))
+                        .withEnv("zkServers", appendClusterName(ZKContainer.NAME))
                         .withEnv("useHostNameAsBookieID", "true")
                         // Disable fsyncs for tests since they're slow within the containers
                         .withEnv("journalSyncData", "false")
@@ -156,11 +163,11 @@ public class PulsarCluster {
         // create brokers
         brokerContainers.putAll(
             runNumContainers("broker", spec.numBrokers(), (name) -> {
-                    BrokerContainer brokerContainer = new BrokerContainer(clusterName, name)
+                    BrokerContainer brokerContainer = new BrokerContainer(clusterName, appendClusterName(name))
                         .withNetwork(network)
-                        .withNetworkAliases(name)
-                        .withEnv("zkServers", ZKContainer.NAME)
-                        .withEnv("zookeeperServers", ZKContainer.NAME)
+                        .withNetworkAliases(appendClusterName(name))
+                        .withEnv("zkServers", appendClusterName(ZKContainer.NAME))
+                        .withEnv("zookeeperServers", appendClusterName(ZKContainer.NAME))
                         .withEnv("configurationStoreServers", CSContainer.NAME + ":" + CS_PORT)
                         .withEnv("clusterName", clusterName)
                         .withEnv("brokerServiceCompactionMonitorIntervalInSeconds", "1")
@@ -235,8 +242,10 @@ public class PulsarCluster {
         log.info("Successfully started local zookeeper container.");
 
         // start the configuration store
-        csContainer.start();
-        log.info("Successfully started configuration store container.");
+        if (!sharedCsContainer) {
+            csContainer.start();
+            log.info("Successfully started configuration store container.");
+        }
 
         // init the cluster
         zkContainer.execCmd(
@@ -335,9 +344,11 @@ public class PulsarCluster {
         if (null != proxyContainer) {
             containers.add(proxyContainer);
         }
-        if (null != csContainer) {
+
+        if (!sharedCsContainer && null != csContainer) {
             containers.add(csContainer);
         }
+
         if (null != zkContainer) {
             containers.add(zkContainer);
         }
@@ -668,4 +679,8 @@ public class PulsarCluster {
             }
         }
     }
+
+    private String appendClusterName(String name) {
+        return sharedCsContainer ? clusterName + "-" + name : name;
+    }
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarGeoCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarGeoCluster.java
new file mode 100644
index 0000000..9be3c38
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarGeoCluster.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.topologies;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.tests.integration.containers.CSContainer;
+import org.testcontainers.containers.Network;
+
+@Slf4j
+public class PulsarGeoCluster {
+
+    @Getter
+    private final PulsarClusterSpec[] clusterSpecs;
+
+    @Getter
+    private final CSContainer csContainer;
+
+    @Getter
+    private final PulsarCluster[] clusters;
+
+    /**
+     * Pulsar Cluster Spec
+     *
+     * @param specs each pulsar cluster spec.
+     * @return the built a pulsar cluster with geo replication
+     */
+    public static PulsarGeoCluster forSpec(PulsarClusterSpec... specs) {
+        return new PulsarGeoCluster(specs);
+    }
+
+    public PulsarGeoCluster(PulsarClusterSpec... clusterSpecs) {
+        this.clusterSpecs = clusterSpecs;
+        this.clusters = new PulsarCluster[clusterSpecs.length];
+
+        this.csContainer = new CSContainer("geo-cluster")
+                .withNetwork(Network.newNetwork())
+                .withNetworkAliases(CSContainer.NAME);
+
+        for (int i = 0; i < this.clusters.length; i++) {
+            clusters[i] = PulsarCluster.forSpec(this.clusterSpecs[i], this.csContainer);
+        }
+    }
+
+    public void start() throws Exception {
+        // start the configuration store
+        this.csContainer.start();
+        log.info("Successfully started configuration store container.");
+
+        for (PulsarCluster cluster : clusters) {
+            cluster.start();
+            log.info("Successfully started all components for cluster {}.", cluster.getClusterName());
+        }
+    }
+
+    public void stop() throws Exception {
+        for (PulsarCluster cluster : clusters) {
+            cluster.stop();
+            log.info("Successfully stopped all components for cluster {}.", cluster.getClusterName());
+        }
+        // stop the configuration store
+        this.csContainer.stop();
+        log.info("Successfully stopped configuration store container.");
+    }
+
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarGeoClusterTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarGeoClusterTestBase.java
new file mode 100644
index 0000000..51c74ee
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarGeoClusterTestBase.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.topologies;
+
+import static java.util.stream.Collectors.joining;
+import java.util.stream.Stream;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class PulsarGeoClusterTestBase extends PulsarTestBase {
+
+    @Override
+    protected final void setup() throws Exception {
+        setupCluster();
+    }
+
+    @Override
+    protected final void cleanup() throws Exception {
+        tearDownCluster();
+    }
+
+    protected void setupCluster() throws Exception {
+        this.setupCluster("");
+    }
+
+    @Getter
+    private PulsarGeoCluster geoCluster;
+
+    public void setupCluster(String namePrefix) throws Exception {
+        PulsarClusterSpec.PulsarClusterSpecBuilder[] specBuilders = new PulsarClusterSpec.PulsarClusterSpecBuilder[2];
+        for (int i = 0; i < 2; i++) {
+            String clusterName = Stream.of(this.getClass().getSimpleName(), namePrefix, String.valueOf(i),
+                            randomName(5))
+                    .filter(s -> s != null && !s.isEmpty())
+                    .collect(joining("-"));
+            specBuilders[i] = PulsarClusterSpec.builder().clusterName(clusterName);
+        }
+        specBuilders = beforeSetupCluster(specBuilders);
+        PulsarClusterSpec[] specs = new PulsarClusterSpec[2];
+        for (int i = 0; i < specBuilders.length; i++) {
+            specs[i] = specBuilders[i].build();
+        }
+        setupCluster0(specs);
+    }
+
+    protected PulsarClusterSpec.PulsarClusterSpecBuilder[] beforeSetupCluster (
+            PulsarClusterSpec.PulsarClusterSpecBuilder... specBuilder) {
+        return specBuilder;
+    }
+
+    protected void setupCluster0(PulsarClusterSpec... specs) throws Exception {
+        incrementSetupNumber();
+        log.info("Setting up geo cluster with {} local clusters}", specs.length);
+
+        this.geoCluster = PulsarGeoCluster.forSpec(specs);
+
+        beforeStartCluster();
+
+        this.geoCluster.start();
+
+        log.info("Geo Cluster is setup!");
+    }
+
+    protected void beforeStartCluster() throws Exception {
+        // no-op
+    }
+
+    public void tearDownCluster() throws Exception {
+        markCurrentSetupNumberCleaned();
+        if (null != this.geoCluster) {
+            this.geoCluster.stop();
+        }
+    }
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java
index 9989b15..ebdfbe8 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarTestBase.java
@@ -34,9 +34,18 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.tests.TestRetrySupport;
 import org.testng.Assert;
+import org.testng.annotations.DataProvider;
 
 public abstract class PulsarTestBase extends TestRetrySupport {
 
+    @DataProvider(name = "TopicDomain")
+    public Object[][] topicDomain() {
+        return new Object[][] {
+                {"persistent"},
+                {"non-persistent"}
+        };
+    }
+
     public static String randomName(int numChars) {
         StringBuilder sb = new StringBuilder();
         for (int i = 0; i < numChars; i++) {
diff --git a/tests/integration/src/test/resources/pulsar-messaging.xml b/tests/integration/src/test/resources/pulsar-messaging.xml
index aa31852..6421561 100644
--- a/tests/integration/src/test/resources/pulsar-messaging.xml
+++ b/tests/integration/src/test/resources/pulsar-messaging.xml
@@ -24,6 +24,7 @@
         <classes>
             <class name="org.apache.pulsar.tests.integration.messaging.PersistentTopicMessagingTest" />
             <class name="org.apache.pulsar.tests.integration.messaging.NonPersistentTopicMessagingTest" />
+            <class name="org.apache.pulsar.tests.integration.messaging.GeoReplicationTest" />
             <class name="org.apache.pulsar.tests.integration.messaging.DelayMessagingTest" />
             <class name="org.apache.pulsar.tests.integration.admin.AdminTest" />
             <class name="org.apache.pulsar.tests.integration.io.sources.AvroKafkaSourceTest" />