You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/25 04:56:04 UTC

[pulsar] 07/13: [Broker] Fix create partitioned topic in replicated namespace (#10963)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c019c2a113a555c25fd65a6a450649ba25d8a1b4
Author: ran <ga...@126.com>
AuthorDate: Tue Jun 22 07:31:19 2021 +0800

    [Broker] Fix create partitioned topic in replicated namespace (#10963)
    
    Fixes https://github.com/apache/pulsar/issues/10673 Bug-2
    
    ### Motivation
    
    Currently, create a partitioned topic in the replicated namespace will not create metadata path `/managed-ledgers` on replicated clusters.
    
    ### Modifications
    
    Add a new flag `createLocalTopicOnly` to indicate whether create the partitioned path in replicated clusters or not.
    If the flag is false, make remote calls to create partitioned topics on replicated clusters.
    
    
    (cherry picked from commit 2f8c175f52a1731f794c741f8fc3347b680191eb)
---
 .../apache/pulsar/broker/admin/AdminResource.java  | 120 ++++++++++++++-------
 .../broker/admin/v1/NonPersistentTopics.java       |  15 +--
 .../pulsar/broker/admin/v1/PersistentTopics.java   |  15 +--
 .../broker/admin/v2/NonPersistentTopics.java       |   6 +-
 .../pulsar/broker/admin/v2/PersistentTopics.java   |   5 +-
 .../org/apache/pulsar/broker/admin/AdminTest.java  |   2 +-
 .../pulsar/broker/admin/PersistentTopicsTest.java  |  27 ++---
 .../pulsar/broker/service/ReplicatorTest.java      |  62 ++++++++++-
 .../pulsar/client/admin/internal/TopicsImpl.java   |   8 +-
 9 files changed, 188 insertions(+), 72 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 8806497..530b350 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -42,6 +42,7 @@ import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.internal.TopicsImpl;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.naming.Constants;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -619,7 +620,8 @@ public abstract class AdminResource extends PulsarWebResource {
         return topicPartitions;
     }
 
-    protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int numPartitions) {
+    protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int numPartitions,
+                                                  boolean createLocalTopicOnly) {
         Integer maxTopicsPerNamespace = null;
 
         try {
@@ -672,55 +674,57 @@ public abstract class AdminResource extends PulsarWebResource {
                     "Number of partitions should be less than or equal to " + maxPartitions));
             return;
         }
+
+        List<CompletableFuture<Void>> createFutureList = new ArrayList<>();
+
+        CompletableFuture<Void> createLocalFuture = new CompletableFuture<>();
+        createFutureList.add(createLocalFuture);
         checkTopicExistsAsync(topicName).thenAccept(exists -> {
             if (exists) {
                 log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
                 asyncResponse.resume(new RestException(Status.CONFLICT, "This topic already exists"));
-            } else {
-
-                try {
-                    String path = ZkAdminPaths.partitionedTopicPath(topicName);
-                    namespaceResources().getPartitionedTopicResources()
-                            .createAsync(path, new PartitionedTopicMetadata(numPartitions)).thenAccept(r -> {
-                                log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
-                                tryCreatePartitionsAsync(numPartitions).thenAccept(v -> {
-                                    log.info("[{}] Successfully created partitions for topic {}", clientAppId(),
-                                            topicName);
-                                    asyncResponse.resume(Response.noContent().build());
-                                }).exceptionally(e -> {
-                                    log.error("[{}] Failed to create partitions for topic {}", clientAppId(),
-                                            topicName);
-                                    // The partitioned topic is created but there are some partitions create failed
-                                    asyncResponse.resume(new RestException(e));
-                                    return null;
-                                });
-                            }).exceptionally(ex -> {
-                                if (ex.getCause() instanceof AlreadyExistsException) {
-                                    log.warn("[{}] Failed to create already existing partitioned topic {}",
-                                            clientAppId(), topicName);
-                                    asyncResponse.resume(
-                                            new RestException(Status.CONFLICT, "Partitioned topic already exists"));
-                                } else if (ex.getCause() instanceof BadVersionException) {
-                                    log.warn("[{}] Failed to create partitioned topic {}: concurrent modification",
-                                            clientAppId(), topicName);
-                                    asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
-                                } else {
-                                    log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName,
-                                            ex.getCause());
-                                    asyncResponse.resume(new RestException(ex.getCause()));
-                                }
-                                return null;
-                            });
-                } catch (Exception e) {
-                    log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
-                    resumeAsyncResponseExceptionally(asyncResponse, e);
-                }
+                return;
             }
+
+            provisionPartitionedTopicPath(asyncResponse, numPartitions, createLocalTopicOnly)
+                    .thenCompose(ignored -> tryCreatePartitionsAsync(numPartitions))
+                    .whenComplete((ignored, ex) -> {
+                        if (ex != null) {
+                            createLocalFuture.completeExceptionally(ex);
+                            return;
+                        }
+                        createLocalFuture.complete(null);
+                    });
         }).exceptionally(ex -> {
             log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, ex);
             resumeAsyncResponseExceptionally(asyncResponse, ex);
             return null;
         });
+
+        if (!createLocalTopicOnly && topicName.isGlobal() && isNamespaceReplicated(namespaceName)) {
+            getNamespaceReplicatedClusters(namespaceName)
+                    .stream()
+                    .filter(cluster -> !cluster.equals(pulsar().getConfiguration().getClusterName()))
+                    .forEach(cluster -> createFutureList.add(
+                            ((TopicsImpl) pulsar().getBrokerService().getClusterPulsarAdmin(cluster).topics())
+                                    .createPartitionedTopicAsync(
+                                            topicName.getPartitionedTopicName(), numPartitions, true)));
+        }
+
+        FutureUtil.waitForAll(createFutureList).whenComplete((ignored, ex) -> {
+            if (ex != null) {
+                log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName, ex.getCause());
+                if (ex.getCause() instanceof RestException) {
+                    asyncResponse.resume(ex.getCause());
+                } else {
+                    resumeAsyncResponseExceptionally(asyncResponse, ex.getCause());
+                }
+                return;
+            }
+            log.info("[{}] Successfully created partitions for topic {} in cluster {}",
+                    clientAppId(), topicName, pulsar().getConfiguration().getClusterName());
+            asyncResponse.resume(Response.noContent().build());
+        });
     }
 
     /**
@@ -747,6 +751,42 @@ public abstract class AdminResource extends PulsarWebResource {
                 });
     }
 
+    private CompletableFuture<Void> provisionPartitionedTopicPath(AsyncResponse asyncResponse,
+                                                                  int numPartitions,
+                                                                  boolean createLocalTopicOnly) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        String partitionedTopicPath = ZkAdminPaths.partitionedTopicPath(topicName);
+        namespaceResources()
+                .getPartitionedTopicResources()
+                .createAsync(partitionedTopicPath, new PartitionedTopicMetadata(numPartitions))
+                .whenComplete((ignored, ex) -> {
+                    if (ex != null) {
+                        if (ex instanceof AlreadyExistsException) {
+                            if (createLocalTopicOnly) {
+                                future.complete(null);
+                                return;
+                            }
+                            log.warn("[{}] Failed to create already existing partitioned topic {}",
+                                    clientAppId(), topicName);
+                            future.completeExceptionally(
+                                    new RestException(Status.CONFLICT, "Partitioned topic already exists"));
+                        } else if (ex instanceof BadVersionException) {
+                            log.warn("[{}] Failed to create partitioned topic {}: concurrent modification",
+                                    clientAppId(), topicName);
+                            future.completeExceptionally(
+                                    new RestException(Status.CONFLICT, "Concurrent modification"));
+                        } else {
+                            log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, ex);
+                            future.completeExceptionally(new RestException(ex.getCause()));
+                        }
+                        return;
+                    }
+                    log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
+                    future.complete(null);
+                });
+        return future;
+    }
+
     protected void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable throwable) {
         if (throwable instanceof WebApplicationException) {
             asyncResponse.resume((WebApplicationException) throwable);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index 20fd24d..daf6ea0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -142,14 +142,17 @@ public class NonPersistentTopics extends PersistentTopics {
             @ApiResponse(code = 406, message = "The number of partitions should be more than 0 and less than or equal"
                     + " to maxNumPartitionsPerPartitionedTopic"),
             @ApiResponse(code = 409, message = "Partitioned topic already exist")})
-    public void createPartitionedTopic(@Suspended final AsyncResponse asyncResponse,
-                                       @PathParam("property") String property, @PathParam("cluster") String cluster,
-                                       @PathParam("namespace") String namespace, @PathParam("topic") @Encoded
-                                               String encodedTopic,
-                                       int numPartitions) {
+    public void createPartitionedTopic(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("property") String property,
+            @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            int numPartitions,
+            @QueryParam("createLocalTopicOnly") @DefaultValue("false") boolean createLocalTopicOnly) {
         try {
             validateTopicName(property, cluster, namespace, encodedTopic);
-            internalCreatePartitionedTopic(asyncResponse, numPartitions);
+            internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly);
         } catch (Exception e) {
             log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
             resumeAsyncResponseExceptionally(asyncResponse, e);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 46708dd..babff0d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -154,14 +154,17 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 406, message = "The number of partitions should be "
                     + "more than 0 and less than or equal to maxNumPartitionsPerPartitionedTopic"),
             @ApiResponse(code = 409, message = "Partitioned topic already exist")})
-    public void createPartitionedTopic(@Suspended final AsyncResponse asyncResponse,
-                                       @PathParam("property") String property, @PathParam("cluster") String cluster,
-                                       @PathParam("namespace") String namespace, @PathParam("topic") @Encoded
-                                                   String encodedTopic,
-                                       int numPartitions) {
+    public void createPartitionedTopic(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("property") String property,
+            @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            int numPartitions,
+            @QueryParam("createLocalTopicOnly") @DefaultValue("false") boolean createLocalTopicOnly) {
         try {
             validateTopicName(property, cluster, namespace, encodedTopic);
-            internalCreatePartitionedTopic(asyncResponse, numPartitions);
+            internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly);
         } catch (Exception e) {
             log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
             resumeAsyncResponseExceptionally(asyncResponse, e);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index cd6b31c..9d29967 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -188,12 +188,12 @@ public class NonPersistentTopics extends PersistentTopics {
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "The number of partitions for the topic",
                     required = true, type = "int", defaultValue = "0")
-                    int numPartitions) {
-
+                    int numPartitions,
+            @QueryParam("createLocalTopicOnly") @DefaultValue("false") boolean createLocalTopicOnly) {
         try {
             validateGlobalNamespaceOwnership(tenant, namespace);
             validateTopicName(tenant, namespace, encodedTopic);
-            internalCreatePartitionedTopic(asyncResponse, numPartitions);
+            internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly);
         } catch (Exception e) {
             log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
             resumeAsyncResponseExceptionally(asyncResponse, e);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 9772757..56e5ce1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -231,12 +231,13 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "The number of partitions for the topic",
                     required = true, type = "int", defaultValue = "0")
-                    int numPartitions) {
+                    int numPartitions,
+            @QueryParam("createLocalTopicOnly") @DefaultValue("false") boolean createLocalTopicOnly) {
         try {
             validateGlobalNamespaceOwnership(tenant, namespace);
             validatePartitionedTopicName(tenant, namespace, encodedTopic);
             validateTopicPolicyOperation(topicName, PolicyName.PARTITION, PolicyOperation.WRITE);
-            internalCreatePartitionedTopic(asyncResponse, numPartitions);
+            internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly);
         } catch (Exception e) {
             log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
             resumeAsyncResponseExceptionally(asyncResponse, e);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index db6caca..81ec0d5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -785,7 +785,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
         assertEquals(persistentTopics.getPartitionedTopicList(property, cluster, namespace), Lists.newArrayList());
         response = mock(AsyncResponse.class);
         ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
-        persistentTopics.createPartitionedTopic(response, property, cluster, namespace, topic, 5);
+        persistentTopics.createPartitionedTopic(response, property, cluster, namespace, topic, 5, false);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
         assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
         assertEquals(persistentTopics.getPartitionedTopicList(property, cluster, namespace), Lists
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 39a99d2..a1c7276 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -53,6 +53,7 @@ import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.internal.TopicsImpl;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
@@ -169,7 +170,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         // 3) Create the partitioned topic
         response = mock(AsyncResponse.class);
         ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
-        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, 3);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, 3, true);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
 
@@ -289,7 +290,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
 
         // 3) Create the partitioned topic
         AsyncResponse response  = mock(AsyncResponse.class);
-        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, 1);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, 1, true);
         ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
@@ -375,7 +376,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         doReturn(new Policies()).when(persistentTopics).getNamespacePolicies(any());
         AsyncResponse response = mock(AsyncResponse.class);
         ArgumentCaptor<RestException> errCaptor = ArgumentCaptor.forClass(RestException.class);
-        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, 5);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, 5, true);
         verify(response, timeout(5000).times(1)).resume(errCaptor.capture());
         Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), Response.Status.CONFLICT.getStatusCode());
     }
@@ -399,7 +400,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         doNothing().when(persistentTopics).validateAdminAccessForTenant(anyString());
         AsyncResponse response = mock(AsyncResponse.class);
         ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
-        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, 5);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, 5, true);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
         persistentTopics.updatePartitionedTopic(testTenant, testNamespace, partitionedTopicName, true, false, 10);
@@ -428,7 +429,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         // 3) create partitioned topic and unload
         response = mock(AsyncResponse.class);
         responseCaptor = ArgumentCaptor.forClass(Response.class);
-        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionTopicName, 6);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionTopicName, 6, true);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
         response = mock(AsyncResponse.class);
@@ -458,13 +459,13 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
     public void testGetPartitionedTopicsList() throws KeeperException, InterruptedException, PulsarAdminException {
         AsyncResponse response = mock(AsyncResponse.class);
         ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
-        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, "test-topic1", 3);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, "test-topic1", 3, true);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
 
         response = mock(AsyncResponse.class);
         responseCaptor = ArgumentCaptor.forClass(Response.class);
-        nonPersistentTopic.createPartitionedTopic(response, testTenant, testNamespace, "test-topic2", 3);
+        nonPersistentTopic.createPartitionedTopic(response, testTenant, testNamespace, "test-topic2", 3, true);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
 
@@ -494,7 +495,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
     public void testCreateExistedPartition() {
         final AsyncResponse response = mock(AsyncResponse.class);
         final String topicName = "test-create-existed-partition";
-        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, topicName, 3);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, topicName, 3, true);
 
         final String partitionName = TopicName.get(topicName).getPartition(0).getLocalName();
         try {
@@ -513,7 +514,8 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         final int numPartitions = 5;
         AsyncResponse response = mock(AsyncResponse.class);
         ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
-        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, numPartitions);
+        persistentTopics.createPartitionedTopic(
+                response, testTenant, testNamespace, partitionedTopicName, numPartitions, true);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
 
@@ -553,7 +555,8 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         final int numPartitions = 5;
         AsyncResponse response = mock(AsyncResponse.class);
         ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
-        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, numPartitions);
+        persistentTopics.createPartitionedTopic(
+                response, testTenant, testNamespace, partitionedTopicName, numPartitions, true);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
         String role = "role";
@@ -596,7 +599,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
 
         // create partitioned topic and compaction on it
         response = mock(AsyncResponse.class);
-        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionTopicName, 2);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionTopicName, 2, true);
         persistentTopics.compact(response, testTenant, testNamespace, partitionTopicName, true);
         responseCaptor = ArgumentCaptor.forClass(Response.class);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
@@ -613,7 +616,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
                 topicName).toString();
         final String subscriptionName = "sub";
 
-        admin.topics().createPartitionedTopic(topic, 3);
+        ((TopicsImpl) admin.topics()).createPartitionedTopicAsync(topic, 3, true).get();
 
         final String partitionedTopic = topic + "-partition-0";
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index d261e85..0ea39aa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service;
 
 import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
+import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
@@ -35,6 +36,7 @@ import io.netty.buffer.ByteBuf;
 
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.SortedSet;
@@ -62,6 +64,7 @@ import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageRoutingMode;
@@ -79,7 +82,6 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.ReplicatorStats;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
@@ -1121,6 +1123,64 @@ public class ReplicatorTest extends ReplicatorTestBase {
 
         consumer.close();
     }
+
+
+    @Test
+    public void createPartitionedTopicTest() throws Exception {
+        final String cluster1 = pulsar1.getConfig().getClusterName();
+        final String cluster2 = pulsar2.getConfig().getClusterName();
+        final String cluster3 = pulsar3.getConfig().getClusterName();
+        final String namespace = newUniqueName("pulsar/ns");
+
+        final String persistentPartitionedTopic =
+                newUniqueName("persistent://" + namespace + "/partitioned");
+        final String persistentNonPartitionedTopic =
+                newUniqueName("persistent://" + namespace + "/non-partitioned");
+        final String nonPersistentPartitionedTopic =
+                newUniqueName("non-persistent://" + namespace + "/partitioned");
+        final String nonPersistentNonPartitionedTopic =
+                newUniqueName("non-persistent://" + namespace + "/non-partitioned");
+        final int numPartitions = 3;
+
+        admin1.namespaces().createNamespace(namespace, Sets.newHashSet(cluster1, cluster2, cluster3));
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3"));
+
+        admin1.topics().createPartitionedTopic(persistentPartitionedTopic, numPartitions);
+        admin1.topics().createPartitionedTopic(nonPersistentPartitionedTopic, numPartitions);
+        admin1.topics().createNonPartitionedTopic(persistentNonPartitionedTopic);
+        admin1.topics().createNonPartitionedTopic(nonPersistentNonPartitionedTopic);
+
+        List<String> partitionedTopicList = admin1.topics().getPartitionedTopicList(namespace);
+        Assert.assertTrue(partitionedTopicList.contains(persistentPartitionedTopic));
+        Assert.assertTrue(partitionedTopicList.contains(nonPersistentPartitionedTopic));
+
+        // expected topic list didn't contain non-persistent-non-partitioned topic,
+        // because this model topic didn't create path in local metadata store.
+        List<String> expectedTopicList = Lists.newArrayList(
+                persistentNonPartitionedTopic, nonPersistentNonPartitionedTopic);
+        TopicName pt = TopicName.get(persistentPartitionedTopic);
+        for (int i = 0; i < numPartitions; i++) {
+            expectedTopicList.add(pt.getPartition(i).toString());
+        }
+
+        checkListContainExpectedTopic(admin1, namespace, expectedTopicList);
+        checkListContainExpectedTopic(admin2, namespace, expectedTopicList);
+        checkListContainExpectedTopic(admin3, namespace, expectedTopicList);
+    }
+
+    private void checkListContainExpectedTopic(PulsarAdmin admin, String namespace, List<String> expectedTopicList) {
+        // wait non-partitioned topics replicators created finished
+        final List<String> list = new ArrayList<>();
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> {
+            list.clear();
+            list.addAll(admin.topics().getList(namespace));
+            return list.size() == expectedTopicList.size();
+        });
+        for (String expectTopic : expectedTopicList) {
+            Assert.assertTrue(list.contains(expectTopic));
+        }
+    }
+
     private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class);
 
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 267a2a0..0dcf7fa 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -384,9 +384,15 @@ public class TopicsImpl extends BaseResource implements Topics {
 
     @Override
     public CompletableFuture<Void> createPartitionedTopicAsync(String topic, int numPartitions) {
+        return createPartitionedTopicAsync(topic, numPartitions, false);
+    }
+
+    public CompletableFuture<Void> createPartitionedTopicAsync(
+            String topic, int numPartitions, boolean createLocalTopicOnly) {
         checkArgument(numPartitions > 0, "Number of partitions should be more than 0");
         TopicName tn = validateTopic(topic);
-        WebTarget path = topicPath(tn, "partitions");
+        WebTarget path = topicPath(tn, "partitions")
+                .queryParam("createLocalTopicOnly", Boolean.toString(createLocalTopicOnly));
         return asyncPutRequest(path, Entity.entity(numPartitions, MediaType.APPLICATION_JSON));
     }