You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/06/18 21:18:48 UTC

[GitHub] [pulsar] gaoran10 opened a new pull request #10963: [Broker] Fix create partitioned topic in replicated namespace

gaoran10 opened a new pull request #10963:
URL: https://github.com/apache/pulsar/pull/10963


   Fixes https://github.com/apache/pulsar/issues/10673 Bug-2
   
   ### Motivation
   
   Currently, create a partitioned topic in the replicated namespace will not create metadata path `/managed-ledgers` on replicated clusters.
   
   ### Modifications
   
   Add a new flag `createLocalTopicOnly` to indicate whether create the partitioned path in replicated clusters or not.
   If the flag is false, make remote calls to create partitioned topics on replicated clusters.
   
   ### Verifying this change
   
   Add a new test.
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API: (no)
     - The schema: (no)
     - The default values of configurations: (no)
     - The wire protocol: (no)
     - The rest endpoints: (no)
     - The admin cli options: (no)
     - Anything that affects deployment: (no)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10963: [Broker] Fix create partitioned topic in replicated namespace

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10963:
URL: https://github.com/apache/pulsar/pull/10963#discussion_r655245674



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
##########
@@ -29,4 +31,8 @@ public static String newUniqueName(String prefix) {
         return prefix + "-" + UUID.randomUUID();
     }
 
+    public static String randomSuffixString(String content, int numSuffix) {
+        return content + "-" + RandomStringUtils.randomAlphabetic(numSuffix).toLowerCase();

Review comment:
       we can enhance newUniqueName in a follow up patch if you prefer.
   at the moment it is better to have only one single way of generating random topic names
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10963: [Broker] Fix create partitioned topic in replicated namespace

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10963:
URL: https://github.com/apache/pulsar/pull/10963#discussion_r654284341



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
##########
@@ -29,4 +31,8 @@ public static String newUniqueName(String prefix) {
         return prefix + "-" + UUID.randomUUID();
     }
 
+    public static String randomSuffixString(String content, int numSuffix) {
+        return content + "-" + RandomStringUtils.randomAlphabetic(numSuffix).toLowerCase();

Review comment:
       why newUniqueName is not a good fit for you ?

##########
File path: pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
##########
@@ -361,6 +361,42 @@
      */
     CompletableFuture<Void> createPartitionedTopicAsync(String topic, int numPartitions);
 
+    /**
+     * Create a partitioned topic.
+     * <p/>
+     * Create a partitioned topic. It needs to be called before creating a producer for a partitioned topic.
+     * <p/>
+     *
+     * @param topic
+     *            Topic name
+     * @param numPartitions
+     *            Number of partitions to create of the topic
+     * @param createLocalTopicOnly
+     *            False indicate create topic in all replicate clusters,
+     *            true indicate only create topic in local cluster.
+     * @throws PulsarAdminException
+     */
+    void createPartitionedTopic(

Review comment:
       this is a new public API for users.
   
   I am not sure we should add this here.
   
   it is fine to have it in the Implementation but we should not give this to the users
   
   @merlimat WDYT ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #10963: [Broker] Fix create partitioned topic in replicated namespace

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #10963:
URL: https://github.com/apache/pulsar/pull/10963#discussion_r654156171



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
##########
@@ -1004,6 +1004,59 @@ public void testTopicReplicatedAndProducerCreate(String topicPrefix, String topi
         nonPersistentProducer2.close();
     }
 
+    @Test
+    public void createPartitionedTopicTest() throws Exception {
+        final String cluster1 = pulsar1.getConfig().getClusterName();
+        final String cluster2 = pulsar2.getConfig().getClusterName();
+        final String cluster3 = pulsar3.getConfig().getClusterName();
+        final String namespace = randomSuffixString("pulsar/ns", 5);
+
+        final String persistentPartitionedTopic =
+                randomSuffixString("persistent://" + namespace + "/partitioned", 5);
+        final String persistentNonPartitionedTopic =
+                randomSuffixString("persistent://" + namespace + "/non-partitioned", 5);
+        final String nonPersistentPartitionedTopic =
+                randomSuffixString("non-persistent://" + namespace + "/partitioned", 5);
+        final String nonPersistentNonPartitionedTopic =
+                randomSuffixString("non-persistent://" + namespace + "/non-partitioned", 5);
+        final int numPartitions = 3;
+
+        admin1.namespaces().createNamespace(namespace, Sets.newHashSet(cluster1, cluster2, cluster3));
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3"));
+
+        admin1.topics().createPartitionedTopic(persistentPartitionedTopic, numPartitions);
+        admin1.topics().createPartitionedTopic(nonPersistentPartitionedTopic, numPartitions);
+        admin1.topics().createNonPartitionedTopic(persistentNonPartitionedTopic);
+        admin1.topics().createNonPartitionedTopic(nonPersistentNonPartitionedTopic);
+
+        List<String> partitionedTopicList = admin1.topics().getPartitionedTopicList(namespace);
+        Assert.assertTrue(partitionedTopicList.contains(persistentPartitionedTopic));
+        Assert.assertTrue(partitionedTopicList.contains(nonPersistentPartitionedTopic));
+
+        // wait non-partitioned topics replicators created finished
+        Thread.sleep(1000);

Review comment:
       Flaky test is easy to appear in this way, can we use Awaitility?
   `admin.topics().getList().size > x` can be used as the judgment condition

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
##########
@@ -1004,6 +1004,59 @@ public void testTopicReplicatedAndProducerCreate(String topicPrefix, String topi
         nonPersistentProducer2.close();
     }
 
+    @Test
+    public void createPartitionedTopicTest() throws Exception {
+        final String cluster1 = pulsar1.getConfig().getClusterName();
+        final String cluster2 = pulsar2.getConfig().getClusterName();
+        final String cluster3 = pulsar3.getConfig().getClusterName();
+        final String namespace = randomSuffixString("pulsar/ns", 5);
+
+        final String persistentPartitionedTopic =
+                randomSuffixString("persistent://" + namespace + "/partitioned", 5);
+        final String persistentNonPartitionedTopic =
+                randomSuffixString("persistent://" + namespace + "/non-partitioned", 5);
+        final String nonPersistentPartitionedTopic =
+                randomSuffixString("non-persistent://" + namespace + "/partitioned", 5);
+        final String nonPersistentNonPartitionedTopic =
+                randomSuffixString("non-persistent://" + namespace + "/non-partitioned", 5);
+        final int numPartitions = 3;
+
+        admin1.namespaces().createNamespace(namespace, Sets.newHashSet(cluster1, cluster2, cluster3));
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3"));
+
+        admin1.topics().createPartitionedTopic(persistentPartitionedTopic, numPartitions);
+        admin1.topics().createPartitionedTopic(nonPersistentPartitionedTopic, numPartitions);
+        admin1.topics().createNonPartitionedTopic(persistentNonPartitionedTopic);
+        admin1.topics().createNonPartitionedTopic(nonPersistentNonPartitionedTopic);
+
+        List<String> partitionedTopicList = admin1.topics().getPartitionedTopicList(namespace);
+        Assert.assertTrue(partitionedTopicList.contains(persistentPartitionedTopic));
+        Assert.assertTrue(partitionedTopicList.contains(nonPersistentPartitionedTopic));
+
+        // wait non-partitioned topics replicators created finished
+        Thread.sleep(1000);

Review comment:
       Flaky test is easy to appear in this way, can we use Awaitility?
   `admin.topics(). getPartitionedTopicList().size > x` can be used as the judgment condition




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on a change in pull request #10963: [Broker] Fix create partitioned topic in replicated namespace

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on a change in pull request #10963:
URL: https://github.com/apache/pulsar/pull/10963#discussion_r654312778



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
##########
@@ -672,55 +673,57 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
                     "Number of partitions should be less than or equal to " + maxPartitions));
             return;
         }
+
+        List<CompletableFuture<Void>> createFutureList = new ArrayList<>();
+
+        CompletableFuture<Void> createLocalFuture = new CompletableFuture<>();
+        createFutureList.add(createLocalFuture);
         checkTopicExistsAsync(topicName).thenAccept(exists -> {
             if (exists) {
                 log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
                 asyncResponse.resume(new RestException(Status.CONFLICT, "This topic already exists"));

Review comment:
       if local exist but remote cluster don't have, do we need to check this logical?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
##########
@@ -672,55 +673,57 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
                     "Number of partitions should be less than or equal to " + maxPartitions));
             return;
         }
+
+        List<CompletableFuture<Void>> createFutureList = new ArrayList<>();
+
+        CompletableFuture<Void> createLocalFuture = new CompletableFuture<>();
+        createFutureList.add(createLocalFuture);
         checkTopicExistsAsync(topicName).thenAccept(exists -> {
             if (exists) {
                 log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
                 asyncResponse.resume(new RestException(Status.CONFLICT, "This topic already exists"));
-            } else {
-
-                try {
-                    String path = ZkAdminPaths.partitionedTopicPath(topicName);
-                    namespaceResources().getPartitionedTopicResources()
-                            .createAsync(path, new PartitionedTopicMetadata(numPartitions)).thenAccept(r -> {
-                                log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
-                                tryCreatePartitionsAsync(numPartitions).thenAccept(v -> {
-                                    log.info("[{}] Successfully created partitions for topic {}", clientAppId(),
-                                            topicName);
-                                    asyncResponse.resume(Response.noContent().build());
-                                }).exceptionally(e -> {
-                                    log.error("[{}] Failed to create partitions for topic {}", clientAppId(),
-                                            topicName);
-                                    // The partitioned topic is created but there are some partitions create failed
-                                    asyncResponse.resume(new RestException(e));
-                                    return null;
-                                });
-                            }).exceptionally(ex -> {
-                                if (ex.getCause() instanceof AlreadyExistsException) {
-                                    log.warn("[{}] Failed to create already existing partitioned topic {}",
-                                            clientAppId(), topicName);
-                                    asyncResponse.resume(
-                                            new RestException(Status.CONFLICT, "Partitioned topic already exists"));
-                                } else if (ex.getCause() instanceof BadVersionException) {
-                                    log.warn("[{}] Failed to create partitioned topic {}: concurrent modification",
-                                            clientAppId(), topicName);
-                                    asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
-                                } else {
-                                    log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName,
-                                            ex.getCause());
-                                    asyncResponse.resume(new RestException(ex.getCause()));
-                                }
-                                return null;
-                            });
-                } catch (Exception e) {
-                    log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
-                    resumeAsyncResponseExceptionally(asyncResponse, e);
-                }
+                return;
             }
+
+            provisionPartitionedTopicPath(asyncResponse, numPartitions, createLocalTopicOnly)
+                    .thenCompose(ignored -> tryCreatePartitionsAsync(numPartitions))
+                    .whenComplete((ignored, ex) -> {
+                        if (ex != null) {
+                            createLocalFuture.completeExceptionally(ex);
+                            return;
+                        }
+                        createLocalFuture.complete(null);

Review comment:
       `createFutureList.add(tryCreatePartitionsAsync(numPartitions))`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on pull request #10963: [Broker] Fix create partitioned topic in replicated namespace

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on pull request #10963:
URL: https://github.com/apache/pulsar/pull/10963#issuecomment-864919993


   > LGTM
   > 
   > please switch to using newUniqueName in the tests
   
   Ok, I'll fix this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on a change in pull request #10963: [Broker] Fix create partitioned topic in replicated namespace

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on a change in pull request #10963:
URL: https://github.com/apache/pulsar/pull/10963#discussion_r654266958



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
##########
@@ -1004,6 +1004,59 @@ public void testTopicReplicatedAndProducerCreate(String topicPrefix, String topi
         nonPersistentProducer2.close();
     }
 
+    @Test
+    public void createPartitionedTopicTest() throws Exception {
+        final String cluster1 = pulsar1.getConfig().getClusterName();
+        final String cluster2 = pulsar2.getConfig().getClusterName();
+        final String cluster3 = pulsar3.getConfig().getClusterName();
+        final String namespace = randomSuffixString("pulsar/ns", 5);
+
+        final String persistentPartitionedTopic =
+                randomSuffixString("persistent://" + namespace + "/partitioned", 5);
+        final String persistentNonPartitionedTopic =
+                randomSuffixString("persistent://" + namespace + "/non-partitioned", 5);
+        final String nonPersistentPartitionedTopic =
+                randomSuffixString("non-persistent://" + namespace + "/partitioned", 5);
+        final String nonPersistentNonPartitionedTopic =
+                randomSuffixString("non-persistent://" + namespace + "/non-partitioned", 5);
+        final int numPartitions = 3;
+
+        admin1.namespaces().createNamespace(namespace, Sets.newHashSet(cluster1, cluster2, cluster3));
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3"));
+
+        admin1.topics().createPartitionedTopic(persistentPartitionedTopic, numPartitions);
+        admin1.topics().createPartitionedTopic(nonPersistentPartitionedTopic, numPartitions);
+        admin1.topics().createNonPartitionedTopic(persistentNonPartitionedTopic);
+        admin1.topics().createNonPartitionedTopic(nonPersistentNonPartitionedTopic);
+
+        List<String> partitionedTopicList = admin1.topics().getPartitionedTopicList(namespace);
+        Assert.assertTrue(partitionedTopicList.contains(persistentPartitionedTopic));
+        Assert.assertTrue(partitionedTopicList.contains(nonPersistentPartitionedTopic));
+
+        // wait non-partitioned topics replicators created finished
+        Thread.sleep(1000);

Review comment:
       Ok, I'll fix this. Thanks.

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
##########
@@ -29,4 +31,8 @@ public static String newUniqueName(String prefix) {
         return prefix + "-" + UUID.randomUUID();
     }
 
+    public static String randomSuffixString(String content, int numSuffix) {
+        return content + "-" + RandomStringUtils.randomAlphabetic(numSuffix).toLowerCase();

Review comment:
       The method `newUniqueName ` could work well, but the UUID suffix is hard to read and check, maybe a random alphabetic string is enough. WDYT?

##########
File path: pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
##########
@@ -361,6 +361,42 @@
      */
     CompletableFuture<Void> createPartitionedTopicAsync(String topic, int numPartitions);
 
+    /**
+     * Create a partitioned topic.
+     * <p/>
+     * Create a partitioned topic. It needs to be called before creating a producer for a partitioned topic.
+     * <p/>
+     *
+     * @param topic
+     *            Topic name
+     * @param numPartitions
+     *            Number of partitions to create of the topic
+     * @param createLocalTopicOnly
+     *            False indicate create topic in all replicate clusters,
+     *            true indicate only create topic in local cluster.
+     * @throws PulsarAdminException
+     */
+    void createPartitionedTopic(

Review comment:
       Yes, I think this should be a broker internal behavior, the param `createLocalTopicOnly` shouldn't be specified by users. @codelipenghui Please confirm.

##########
File path: pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
##########
@@ -361,6 +361,42 @@
      */
     CompletableFuture<Void> createPartitionedTopicAsync(String topic, int numPartitions);
 
+    /**
+     * Create a partitioned topic.
+     * <p/>
+     * Create a partitioned topic. It needs to be called before creating a producer for a partitioned topic.
+     * <p/>
+     *
+     * @param topic
+     *            Topic name
+     * @param numPartitions
+     *            Number of partitions to create of the topic
+     * @param createLocalTopicOnly
+     *            False indicate create topic in all replicate clusters,
+     *            true indicate only create topic in local cluster.
+     * @throws PulsarAdminException
+     */
+    void createPartitionedTopic(

Review comment:
       Yes, I think this should be a broker internal behavior, the param `createLocalTopicOnly` shouldn't be specified by users.
   
   @codelipenghui Please confirm.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
##########
@@ -672,55 +673,57 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
                     "Number of partitions should be less than or equal to " + maxPartitions));
             return;
         }
+
+        List<CompletableFuture<Void>> createFutureList = new ArrayList<>();
+
+        CompletableFuture<Void> createLocalFuture = new CompletableFuture<>();
+        createFutureList.add(createLocalFuture);
         checkTopicExistsAsync(topicName).thenAccept(exists -> {
             if (exists) {
                 log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
                 asyncResponse.resume(new RestException(Status.CONFLICT, "This topic already exists"));
-            } else {
-
-                try {
-                    String path = ZkAdminPaths.partitionedTopicPath(topicName);
-                    namespaceResources().getPartitionedTopicResources()
-                            .createAsync(path, new PartitionedTopicMetadata(numPartitions)).thenAccept(r -> {
-                                log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
-                                tryCreatePartitionsAsync(numPartitions).thenAccept(v -> {
-                                    log.info("[{}] Successfully created partitions for topic {}", clientAppId(),
-                                            topicName);
-                                    asyncResponse.resume(Response.noContent().build());
-                                }).exceptionally(e -> {
-                                    log.error("[{}] Failed to create partitions for topic {}", clientAppId(),
-                                            topicName);
-                                    // The partitioned topic is created but there are some partitions create failed
-                                    asyncResponse.resume(new RestException(e));
-                                    return null;
-                                });
-                            }).exceptionally(ex -> {
-                                if (ex.getCause() instanceof AlreadyExistsException) {
-                                    log.warn("[{}] Failed to create already existing partitioned topic {}",
-                                            clientAppId(), topicName);
-                                    asyncResponse.resume(
-                                            new RestException(Status.CONFLICT, "Partitioned topic already exists"));
-                                } else if (ex.getCause() instanceof BadVersionException) {
-                                    log.warn("[{}] Failed to create partitioned topic {}: concurrent modification",
-                                            clientAppId(), topicName);
-                                    asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
-                                } else {
-                                    log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName,
-                                            ex.getCause());
-                                    asyncResponse.resume(new RestException(ex.getCause()));
-                                }
-                                return null;
-                            });
-                } catch (Exception e) {
-                    log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
-                    resumeAsyncResponseExceptionally(asyncResponse, e);
-                }
+                return;
             }
+
+            provisionPartitionedTopicPath(asyncResponse, numPartitions, createLocalTopicOnly)
+                    .thenCompose(ignored -> tryCreatePartitionsAsync(numPartitions))
+                    .whenComplete((ignored, ex) -> {
+                        if (ex != null) {
+                            createLocalFuture.completeExceptionally(ex);
+                            return;
+                        }
+                        createLocalFuture.complete(null);

Review comment:
       I'm not sure about this comment.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
##########
@@ -672,55 +673,57 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
                     "Number of partitions should be less than or equal to " + maxPartitions));
             return;
         }
+
+        List<CompletableFuture<Void>> createFutureList = new ArrayList<>();
+
+        CompletableFuture<Void> createLocalFuture = new CompletableFuture<>();
+        createFutureList.add(createLocalFuture);
         checkTopicExistsAsync(topicName).thenAccept(exists -> {
             if (exists) {
                 log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
                 asyncResponse.resume(new RestException(Status.CONFLICT, "This topic already exists"));

Review comment:
       This method is used to create a partitioned topic, if the topic already exists in the local cluster, then the request should be rejected.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #10963: [Broker] Fix create partitioned topic in replicated namespace

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #10963:
URL: https://github.com/apache/pulsar/pull/10963#issuecomment-864641696


   @gaoran10 Could you please help check the comments?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #10963: [Broker] Fix create partitioned topic in replicated namespace

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #10963:
URL: https://github.com/apache/pulsar/pull/10963#discussion_r654318386



##########
File path: pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
##########
@@ -361,6 +361,42 @@
      */
     CompletableFuture<Void> createPartitionedTopicAsync(String topic, int numPartitions);
 
+    /**
+     * Create a partitioned topic.
+     * <p/>
+     * Create a partitioned topic. It needs to be called before creating a producer for a partitioned topic.
+     * <p/>
+     *
+     * @param topic
+     *            Topic name
+     * @param numPartitions
+     *            Number of partitions to create of the topic
+     * @param createLocalTopicOnly
+     *            False indicate create topic in all replicate clusters,
+     *            true indicate only create topic in local cluster.
+     * @throws PulsarAdminException
+     */
+    void createPartitionedTopic(

Review comment:
       Yes, we don't need to expose to users, keep it in the implementation is a better solution




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] gaoran10 commented on pull request #10963: [Broker] Fix create partitioned topic in replicated namespace

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on pull request #10963:
URL: https://github.com/apache/pulsar/pull/10963#issuecomment-864919993


   > LGTM
   > 
   > please switch to using newUniqueName in the tests
   
   Ok, I'll fix this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #10963: [Broker] Fix create partitioned topic in replicated namespace

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #10963:
URL: https://github.com/apache/pulsar/pull/10963


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10963: [Broker] Fix create partitioned topic in replicated namespace

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10963:
URL: https://github.com/apache/pulsar/pull/10963#discussion_r655245674



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
##########
@@ -29,4 +31,8 @@ public static String newUniqueName(String prefix) {
         return prefix + "-" + UUID.randomUUID();
     }
 
+    public static String randomSuffixString(String content, int numSuffix) {
+        return content + "-" + RandomStringUtils.randomAlphabetic(numSuffix).toLowerCase();

Review comment:
       we can enhance newUniqueName in a follow up patch if you prefer.
   at the moment it is better to have only one single way of generating random topic names
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #10963: [Broker] Fix create partitioned topic in replicated namespace

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #10963:
URL: https://github.com/apache/pulsar/pull/10963#issuecomment-869153894


   @gaoran10 Looks not able to cherry-pick to branch-2.7 directly, could you please help push a PR based on branch-2.7?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #10963: [Broker] Fix create partitioned topic in replicated namespace

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #10963:
URL: https://github.com/apache/pulsar/pull/10963


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org