You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/30 02:27:38 UTC

[pulsar] branch branch-2.7 updated: [Branch-2.7] Fix create partitioned topic in replicated namespace (#11140)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 0fb9c52  [Branch-2.7] Fix create partitioned topic in replicated namespace (#11140)
0fb9c52 is described below

commit 0fb9c52cf07edc647359fcb84939363e0fc26ca4
Author: ran <ga...@126.com>
AuthorDate: Wed Jun 30 10:26:51 2021 +0800

    [Branch-2.7] Fix create partitioned topic in replicated namespace (#11140)
    
    Because there are many conflicts to cherry-pick the PR https://github.com/apache/pulsar/pull/10963 to branch-2.7, so add a new PR for branch-2.7.
    
    Fixes https://github.com/apache/pulsar/issues/10673 Bug-2
    
    ### Motivation
    
    Currently, create a partitioned topic in the replicated namespace will not create metadata path `/managed-ledgers` on replicated clusters.
    
    ### Modifications
    
    Add a new flag `createLocalTopicOnly` to indicate whether create the partitioned path in replicated clusters or not.
    If the flag is false, make remote calls to create partitioned topics on replicated clusters.
---
 .../apache/pulsar/broker/admin/AdminResource.java  | 130 ++++++++++++++-------
 .../broker/admin/v1/NonPersistentTopics.java       |  15 ++-
 .../pulsar/broker/admin/v1/PersistentTopics.java   |  13 ++-
 .../broker/admin/v2/NonPersistentTopics.java       |   5 +-
 .../pulsar/broker/admin/v2/PersistentTopics.java   |   5 +-
 .../org/apache/pulsar/broker/admin/AdminTest.java  |   2 +-
 .../pulsar/broker/admin/PersistentTopicsTest.java  |  20 ++--
 .../pulsar/broker/service/ReplicatorTest.java      |  57 +++++++++
 .../pulsar/client/admin/internal/TopicsImpl.java   |   8 +-
 9 files changed, 188 insertions(+), 67 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 5c87776..3e26aa7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -50,6 +50,7 @@ import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.internal.TopicsImpl;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.naming.Constants;
@@ -811,7 +812,8 @@ public abstract class AdminResource extends PulsarWebResource {
         return partitionedTopics;
     }
 
-    protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int numPartitions) {
+    protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int numPartitions,
+                                                  boolean createLocalTopicOnly) {
         final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
         try {
             validateAdminAccessForTenant(topicName.getTenant());
@@ -828,56 +830,102 @@ public abstract class AdminResource extends PulsarWebResource {
             asyncResponse.resume(new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be less than or equal to " + maxPartitions));
             return;
         }
+
+        List<CompletableFuture<Void>> createFutureList = new ArrayList<>();
+
+        CompletableFuture<Void> createLocalFuture = new CompletableFuture<>();
+        createFutureList.add(createLocalFuture);
         checkTopicExistsAsync(topicName).thenAccept(exists -> {
             if (exists) {
                 log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
-                asyncResponse.resume(new RestException(Status.CONFLICT, "This topic already exists"));
-            } else {
+                createLocalFuture.completeExceptionally(
+                        new RestException(Status.CONFLICT, "This topic already exists"));
+                return;
+            }
 
-                try {
-                    String path = ZkAdminPaths.partitionedTopicPath(topicName);
-                    byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
-                    zkCreateOptimisticAsync(globalZk(), path, data, (rc, s, o, s1) -> {
-                        if (KeeperException.Code.OK.intValue() == rc) {
-                            globalZk().sync(path, (rc2, s2, ctx) -> {
-                                if (KeeperException.Code.OK.intValue() == rc2) {
-                                    log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
-                                    tryCreatePartitionsAsync(numPartitions).thenAccept(v -> {
-                                        log.info("[{}] Successfully created partitions for topic {}", clientAppId(), topicName);
-                                        asyncResponse.resume(Response.noContent().build());
-                                    }).exceptionally(e -> {
-                                        log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName);
-                                        // The partitioned topic is created but there are some partitions create failed
-                                        asyncResponse.resume(new RestException(e));
-                                        return null;
-                                    });
-                                } else {
-                                    log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, KeeperException.create(KeeperException.Code.get(rc2)));
-                                    asyncResponse.resume(new RestException(KeeperException.create(KeeperException.Code.get(rc2))));
-                                }
-                            }, null);
-                        } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
-                            log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
-                            asyncResponse.resume(new RestException(Status.CONFLICT, "Partitioned topic already exists"));
-                        } else if (KeeperException.Code.BADVERSION.intValue() == rc) {
-                            log.warn("[{}] Failed to create partitioned topic {}: concurrent modification", clientAppId(),
-                                    topicName);
-                            asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
-                        } else {
-                            log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, KeeperException.create(KeeperException.Code.get(rc)));
-                            asyncResponse.resume(new RestException(KeeperException.create(KeeperException.Code.get(rc))));
+            provisionPartitionedTopicPath(numPartitions, createLocalTopicOnly)
+                    .thenCompose(ignored -> tryCreatePartitionsAsync(numPartitions))
+                    .whenComplete((ignored, ex) -> {
+                        if (ex != null) {
+                            createLocalFuture.completeExceptionally(ex);
+                            return;
                         }
+                        createLocalFuture.complete(null);
                     });
-                } catch (Exception e) {
-                    log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
-                    resumeAsyncResponseExceptionally(asyncResponse, e);
-                }
-            }
         }).exceptionally(ex -> {
             log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, ex);
-            resumeAsyncResponseExceptionally(asyncResponse, ex);
+            createLocalFuture.completeExceptionally(ex);
             return null;
         });
+
+        if (!createLocalTopicOnly && topicName.isGlobal() && isNamespaceReplicated(namespaceName)) {
+            getNamespaceReplicatedClusters(namespaceName)
+                    .stream()
+                    .filter(cluster -> !cluster.equals(pulsar().getConfiguration().getClusterName()))
+                    .forEach(cluster -> createFutureList.add(
+                            ((TopicsImpl) pulsar().getBrokerService().getClusterPulsarAdmin(cluster).topics())
+                                    .createPartitionedTopicAsync(
+                                            topicName.getPartitionedTopicName(), numPartitions, true)));
+        }
+
+        FutureUtil.waitForAll(createFutureList).whenComplete((ignored, ex) -> {
+            if (ex != null) {
+                log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName, ex.getCause());
+                if (ex.getCause() instanceof RestException) {
+                    asyncResponse.resume(ex.getCause());
+                } else {
+                    resumeAsyncResponseExceptionally(asyncResponse, ex.getCause());
+                }
+                return;
+            }
+            log.info("[{}] Successfully created partitions for topic {} in cluster {}",
+                    clientAppId(), topicName, pulsar().getConfiguration().getClusterName());
+            asyncResponse.resume(Response.noContent().build());
+        });
+    }
+
+    private CompletableFuture<Void> provisionPartitionedTopicPath(int numPartitions, boolean createLocalTopicOnly) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        try {
+            String path = ZkAdminPaths.partitionedTopicPath(topicName);
+            byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
+            zkCreateOptimisticAsync(globalZk(), path, data, (rc, s, o, s1) -> {
+                if (KeeperException.Code.OK.intValue() == rc) {
+                    globalZk().sync(path, (rc2, s2, ctx) -> {
+                        if (KeeperException.Code.OK.intValue() == rc2) {
+                            log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
+                            future.complete(null);
+                        } else {
+                            log.error("[{}] Failed to create partitioned topic {}",
+                                    clientAppId(), topicName, KeeperException.create(KeeperException.Code.get(rc2)));
+                            future.completeExceptionally(
+                                    new RestException(KeeperException.create(KeeperException.Code.get(rc2))));
+                        }
+                    }, null);
+                } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
+                    if (createLocalTopicOnly) {
+                        future.complete(null);
+                        return;
+                    }
+                    log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
+                    future.completeExceptionally(
+                            new RestException(Status.CONFLICT, "Partitioned topic already exists"));
+                } else if (KeeperException.Code.BADVERSION.intValue() == rc) {
+                    log.warn("[{}] Failed to create partitioned topic {}: concurrent modification", clientAppId(),
+                            topicName);
+                    future.completeExceptionally(new RestException(Status.CONFLICT, "Concurrent modification"));
+                } else {
+                    log.error("[{}] Failed to create partitioned topic {}",
+                            clientAppId(), topicName, KeeperException.create(KeeperException.Code.get(rc)));
+                    future.completeExceptionally(
+                            new RestException(KeeperException.create(KeeperException.Code.get(rc))));
+                }
+            });
+        } catch (Exception e) {
+            log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
+            future.completeExceptionally(e);
+        }
+        return future;
     }
 
     /**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index 06a9f92..b872f64 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -139,14 +139,17 @@ public class NonPersistentTopics extends PersistentTopics {
             @ApiResponse(code = 406, message = "The number of partitions should be more than 0 and less than or equal"
                     + " to maxNumPartitionsPerPartitionedTopic"),
             @ApiResponse(code = 409, message = "Partitioned topic already exist")})
-    public void createPartitionedTopic(@Suspended final AsyncResponse asyncResponse,
-                                       @PathParam("property") String property, @PathParam("cluster") String cluster,
-                                       @PathParam("namespace") String namespace, @PathParam("topic") @Encoded
-                                               String encodedTopic,
-                                       int numPartitions) {
+    public void createPartitionedTopic(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("property") String property,
+            @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            int numPartitions,
+            @QueryParam("createLocalTopicOnly") @DefaultValue("false") boolean createLocalTopicOnly) {
         try {
             validateTopicName(property, cluster, namespace, encodedTopic);
-            internalCreatePartitionedTopic(asyncResponse, numPartitions);
+            internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly);
         } catch (Exception e) {
             log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
             resumeAsyncResponseExceptionally(asyncResponse, e);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 1ba1467..4038dcb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -150,12 +150,17 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 406, message = "The number of partitions should be more than 0 and less than or equal to maxNumPartitionsPerPartitionedTopic"),
             @ApiResponse(code = 409, message = "Partitioned topic already exist") })
-    public void createPartitionedTopic(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, @PathParam("cluster") String cluster,
-            @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
-            int numPartitions) {
+    public void createPartitionedTopic(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("property") String property,
+            @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            int numPartitions,
+            @QueryParam("createLocalTopicOnly") @DefaultValue("false") boolean createLocalTopicOnly) {
         try {
             validateTopicName(property, cluster, namespace, encodedTopic);
-            internalCreatePartitionedTopic(asyncResponse, numPartitions);
+            internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly);
         } catch (Exception e) {
             log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
             resumeAsyncResponseExceptionally(asyncResponse, e);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 769e60d..26a9fe2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -182,13 +182,14 @@ public class NonPersistentTopics extends PersistentTopics {
             @ApiParam(value = "Specify topic name", required = true)
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "The number of partitions for the topic", required = true, type = "int", defaultValue = "0")
-            int numPartitions) {
+            int numPartitions,
+            @QueryParam("createLocalTopicOnly") @DefaultValue("false") boolean createLocalTopicOnly) {
 
         try {
             validateGlobalNamespaceOwnership(tenant,namespace);
             validateTopicName(tenant, namespace, encodedTopic);
             validateAdminAccessForTenant(topicName.getTenant());
-            internalCreatePartitionedTopic(asyncResponse, numPartitions);
+            internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly);
         } catch (Exception e) {
             log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
             resumeAsyncResponseExceptionally(asyncResponse, e);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index d7a1805..9267c28 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -216,12 +216,13 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Specify topic name", required = true)
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "The number of partitions for the topic", required = true, type = "int", defaultValue = "0")
-            int numPartitions) {
+            int numPartitions,
+            @QueryParam("createLocalTopicOnly") @DefaultValue("false") boolean createLocalTopicOnly) {
         try {
             validateGlobalNamespaceOwnership(tenant,namespace);
             validatePartitionedTopicName(tenant, namespace, encodedTopic);
             validateAdminAccessForTenant(topicName.getTenant());
-            internalCreatePartitionedTopic(asyncResponse, numPartitions);
+            internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly);
         } catch (Exception e) {
             log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
             resumeAsyncResponseExceptionally(asyncResponse, e);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index c80e727..2eab88f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -724,7 +724,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
         assertEquals(persistentTopics.getPartitionedTopicList(property, cluster, namespace), Lists.newArrayList());
         response = mock(AsyncResponse.class);
         ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
-        persistentTopics.createPartitionedTopic(response, property, cluster, namespace, topic, 5);
+        persistentTopics.createPartitionedTopic(response, property, cluster, namespace, topic, 5, false);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
         assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
         assertEquals(persistentTopics.getPartitionedTopicList(property, cluster, namespace), Lists
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 0a208a8..485b42c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -172,7 +172,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         // 3) Create the partitioned topic
         response = mock(AsyncResponse.class);
         ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
-        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, 3);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, 3, true);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
 
@@ -293,7 +293,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
 
         // 3) Create the partitioned topic
         AsyncResponse response  = mock(AsyncResponse.class);
-        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, 1);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, 1, true);
         ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
@@ -378,7 +378,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         doReturn(CompletableFuture.completedFuture(ImmutableSet.of(nonPartitionTopicName1, nonPartitionTopicName2))).when(mockZooKeeperChildrenCache).getAsync(anyString());
         AsyncResponse response = mock(AsyncResponse.class);
         ArgumentCaptor<RestException> errCaptor = ArgumentCaptor.forClass(RestException.class);
-        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, 5);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, 5, true);
         verify(response, timeout(5000).times(1)).resume(errCaptor.capture());
         Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), Response.Status.CONFLICT.getStatusCode());
     }
@@ -402,7 +402,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         doNothing().when(persistentTopics).validateAdminAccessForTenant(anyString());
         AsyncResponse response = mock(AsyncResponse.class);
         ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
-        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, 5);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, 5, true);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
         persistentTopics.updatePartitionedTopic(testTenant, testNamespace, partitionedTopicName, true, false, 10);
@@ -431,7 +431,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         // 3) create partitioned topic and unload
         response = mock(AsyncResponse.class);
         responseCaptor = ArgumentCaptor.forClass(Response.class);
-        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionTopicName, 6);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionTopicName, 6, true);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
         response = mock(AsyncResponse.class);
@@ -461,13 +461,13 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
     public void testGetPartitionedTopicsList() throws KeeperException, InterruptedException, PulsarAdminException {
         AsyncResponse response = mock(AsyncResponse.class);
         ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
-        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, "test-topic1", 3);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, "test-topic1", 3, true);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
 
         response = mock(AsyncResponse.class);
         responseCaptor = ArgumentCaptor.forClass(Response.class);
-        nonPersistentTopic.createPartitionedTopic(response, testTenant, testNamespace, "test-topic2", 3);
+        nonPersistentTopic.createPartitionedTopic(response, testTenant, testNamespace, "test-topic2", 3, true);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
 
@@ -499,7 +499,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         final int numPartitions = 5;
         AsyncResponse response = mock(AsyncResponse.class);
         ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
-        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, numPartitions);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, numPartitions, true);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
 
@@ -539,7 +539,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         final int numPartitions = 5;
         AsyncResponse response = mock(AsyncResponse.class);
         ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
-        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, numPartitions);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, numPartitions, true);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
         String role = "role";
@@ -582,7 +582,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
 
         // create partitioned topic and compaction on it
         response = mock(AsyncResponse.class);
-        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionTopicName, 2);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionTopicName, 2, true);
         persistentTopics.compact(response, testTenant, testNamespace, partitionTopicName, true);
         responseCaptor = ArgumentCaptor.forClass(Response.class);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index 75e214f..f3e90b1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -35,6 +35,7 @@ import io.netty.buffer.ByteBuf;
 
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.SortedSet;
@@ -58,9 +59,11 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.commons.lang.RandomStringUtils;
 import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageRoutingMode;
@@ -85,6 +88,7 @@ import org.awaitility.Awaitility;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
@@ -1065,6 +1069,59 @@ public class ReplicatorTest extends ReplicatorTestBase {
 
         consumer.close();
     }
+
+    @Test
+    public void createPartitionedTopicTest() throws Exception {
+        final String cluster1 = pulsar1.getConfig().getClusterName();
+        final String cluster2 = pulsar2.getConfig().getClusterName();
+        final String cluster3 = pulsar3.getConfig().getClusterName();
+        final String namespace = "pulsar/ns-" + RandomStringUtils.randomAlphabetic(5).toLowerCase();
+
+        final String persistentPartitionedTopic = "persistent://" + namespace + "/partitioned-topic";
+        final String persistentNonPartitionedTopic = "persistent://" + namespace + "/non-partitioned-topic";
+        final String nonPersistentPartitionedTopic = "non-persistent://" + namespace + "/partitioned-topic";
+        final String nonPersistentNonPartitionedTopic = "non-persistent://" + namespace + "/non-partitioned-topic";
+        final int numPartitions = 3;
+
+        admin1.namespaces().createNamespace(namespace, Sets.newHashSet(cluster1, cluster2, cluster3));
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3"));
+
+        admin1.topics().createPartitionedTopic(persistentPartitionedTopic, numPartitions);
+        admin1.topics().createPartitionedTopic(nonPersistentPartitionedTopic, numPartitions);
+        admin1.topics().createNonPartitionedTopic(persistentNonPartitionedTopic);
+        admin1.topics().createNonPartitionedTopic(nonPersistentNonPartitionedTopic);
+
+        List<String> partitionedTopicList = admin1.topics().getPartitionedTopicList(namespace);
+        Assert.assertTrue(partitionedTopicList.contains(persistentPartitionedTopic));
+        Assert.assertTrue(partitionedTopicList.contains(nonPersistentPartitionedTopic));
+
+        // expected topic list didn't contain non-persistent-non-partitioned topic,
+        // because this model topic didn't create path in local metadata store.
+        List<String> expectedTopicList = Lists.newArrayList(
+                persistentNonPartitionedTopic, nonPersistentNonPartitionedTopic);
+        TopicName pt = TopicName.get(persistentPartitionedTopic);
+        for (int i = 0; i < numPartitions; i++) {
+            expectedTopicList.add(pt.getPartition(i).toString());
+        }
+
+        checkListContainExpectedTopic(admin1, namespace, expectedTopicList);
+        checkListContainExpectedTopic(admin2, namespace, expectedTopicList);
+        checkListContainExpectedTopic(admin3, namespace, expectedTopicList);
+    }
+
+    private void checkListContainExpectedTopic(PulsarAdmin admin, String namespace, List<String> expectedTopicList) {
+        // wait non-partitioned topics replicators created finished
+        final List<String> list = new ArrayList<>();
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> {
+            list.clear();
+            list.addAll(admin.topics().getList(namespace));
+            return list.size() == expectedTopicList.size();
+        });
+        for (String expectTopic : expectedTopicList) {
+            Assert.assertTrue(list.contains(expectTopic));
+        }
+    }
+
     private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class);
 
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 9f2a911..c68b0bc 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -366,9 +366,15 @@ public class TopicsImpl extends BaseResource implements Topics {
 
     @Override
     public CompletableFuture<Void> createPartitionedTopicAsync(String topic, int numPartitions) {
+        return createPartitionedTopicAsync(topic, numPartitions, false);
+    }
+
+    public CompletableFuture<Void> createPartitionedTopicAsync(
+            String topic, int numPartitions, boolean createLocalTopicOnly) {
         checkArgument(numPartitions > 0, "Number of partitions should be more than 0");
         TopicName tn = validateTopic(topic);
-        WebTarget path = topicPath(tn, "partitions");
+        WebTarget path = topicPath(tn, "partitions")
+                .queryParam("createLocalTopicOnly", Boolean.toString(createLocalTopicOnly));
         return asyncPutRequest(path, Entity.entity(numPartitions, MediaType.APPLICATION_JSON));
     }