You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/02/07 06:58:06 UTC

[GitHub] [pulsar] Technoboy- opened a new pull request #14141: Make PersistentTopicsBase#internalUpdatePartitionedTopic async

Technoboy- opened a new pull request #14141:
URL: https://github.com/apache/pulsar/pull/14141


   ### Motivation
   
   Make PersistentTopicsBase#internalUpdatePartitionedTopic async.
   
   ### Documentation
   
   - [x] `no-need-doc` 
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #14141: [Broker] Make PersistentTopicsBase#internalUpdatePartitionedTopic async

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #14141:
URL: https://github.com/apache/pulsar/pull/14141#discussion_r801631498



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
##########
@@ -2265,7 +2265,7 @@ public void testFailedUpdatePartitionedTopic() throws Exception {
 
         try {
             admin.topics().updatePartitionedTopic(partitionedTopicName, newPartitions, false, false);
-        } catch (PulsarAdminException.PreconditionFailedException e) {
+        } catch (Exception e) {

Review comment:
       Does the exception has changed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #14141: Make PersistentTopicsBase#internalUpdatePartitionedTopic async

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #14141:
URL: https://github.com/apache/pulsar/pull/14141#discussion_r800391619



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -416,87 +432,60 @@ protected void internalCreateNonPartitionedTopic(boolean authoritative, Map<Stri
      *
      * @param numPartitions
      */
-    protected void internalUpdatePartitionedTopic(int numPartitions,
-                                                  boolean updateLocalTopicOnly, boolean authoritative,
-                                                  boolean force) {
+    protected CompletableFuture<Void> internalUpdatePartitionedTopicAsync(int numPartitions,
+                                                                          boolean updateLocalTopicOnly,
+                                                                          boolean authoritative, boolean force) {
         if (numPartitions <= 0) {
-            throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
+            return FutureUtil.failedFuture(new RestException(Status.NOT_ACCEPTABLE,
+                    "Number of partitions should be more than 0"));
         }
-
-        validateTopicOwnership(topicName, authoritative);
-        validateTopicPolicyOperation(topicName, PolicyName.PARTITION, PolicyOperation.WRITE);
-        // Only do the validation if it's the first hop.
-        if (!updateLocalTopicOnly && !force) {
-            validatePartitionTopicUpdate(topicName.getLocalName(), numPartitions);
-        }
-        final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
-        if (maxPartitions > 0 && numPartitions > maxPartitions) {
-            throw new RestException(Status.NOT_ACCEPTABLE,
-                    "Number of partitions should be less than or equal to " + maxPartitions);
-        }
-
-        if (topicName.isGlobal() && isNamespaceReplicated(topicName.getNamespaceObject())) {
-            Set<String> clusters = getNamespaceReplicatedClusters(topicName.getNamespaceObject());
-            if (!clusters.contains(pulsar().getConfig().getClusterName())) {
-                log.error("[{}] local cluster is not part of replicated cluster for namespace {}", clientAppId(),
-                        topicName);
-                throw new RestException(Status.FORBIDDEN, "Local cluster is not part of replicate cluster list");
-            }
-            try {
-                tryCreatePartitionsAsync(numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
-                createSubscriptions(topicName, numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
-            } catch (Exception e) {
-                if (e.getCause() instanceof RestException) {
-                    throw (RestException) e.getCause();
+        return validateTopicOwnershipAsync(topicName, authoritative)
+            .thenCompose(__ -> validateTopicPolicyOperationAsync(topicName, PolicyName.PARTITION,
+                    PolicyOperation.WRITE))
+            .thenCompose(__ -> {
+                CompletableFuture<Void> ret;
+                if (!updateLocalTopicOnly && !force) {
+                    ret = validatePartitionTopicUpdateAsync(topicName.getLocalName(), numPartitions);
+                }  else {
+                    ret = CompletableFuture.completedFuture(null);
                 }
-                log.error("[{}] Failed to update partitioned topic {}", clientAppId(), topicName, e);
-                throw new RestException(e);
-            }
-            // if this cluster is the first hop which needs to coordinate with other clusters then update partitions in
-            // other clusters and then update number of partitions.
-            if (!updateLocalTopicOnly) {
-                CompletableFuture<Void> updatePartition = new CompletableFuture<>();
-                updatePartitionInOtherCluster(numPartitions, clusters).thenRun(() -> {
-                    try {
-                        namespaceResources().getPartitionedTopicResources()
-                                .updatePartitionedTopicAsync(topicName, p ->
-                            new PartitionedTopicMetadata(numPartitions)
-                        ).thenAccept(r -> updatePartition.complete(null)).exceptionally(ex -> {
-                            updatePartition.completeExceptionally(ex.getCause());
-                            return null;
-                        });
-                    } catch (Exception e) {
-                        updatePartition.completeExceptionally(e);
-                    }
-                }).exceptionally(ex -> {
-                    updatePartition.completeExceptionally(ex);
-                    return null;
-                });
-                try {
-                    updatePartition.get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
-                } catch (Exception e) {
-                    log.error("{} Failed to update number of partitions in zk for topic {} and partitions {}",
-                            clientAppId(), topicName, numPartitions, e);
-                    if (e.getCause() instanceof RestException) {
-                        throw (RestException) e.getCause();
+                return ret;
+            })
+            .thenCompose(__ -> {
+                final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
+                if (maxPartitions > 0 && numPartitions > maxPartitions) {
+                    throw new RestException(Status.NOT_ACCEPTABLE,
+                            "Number of partitions should be less than or equal to " + maxPartitions);
+                }
+                // Only do the validation if it's the first hop.
+                if (topicName.isGlobal() && isNamespaceReplicated(topicName.getNamespaceObject())) {
+                    Set<String> clusters = getNamespaceReplicatedClusters(topicName.getNamespaceObject());
+                    if (!clusters.contains(pulsar().getConfig().getClusterName())) {
+                        log.error("[{}] local cluster is not part of replicated cluster for namespace {}",
+                                clientAppId(), topicName);
+                        throw new RestException(Status.FORBIDDEN, "Local cluster is not part of replicate cluster"
+                                + " list");
                     }
-                    throw new RestException(e);
+                    return tryCreatePartitionsAsync(numPartitions)
+                            .thenCompose(ignore -> createSubscriptions(topicName, numPartitions))
+                            .thenCompose(ignore -> {
+                                CompletableFuture<Void> ret;

Review comment:
       right. good.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] mattisonchao commented on pull request #14141: Make PersistentTopicsBase#internalUpdatePartitionedTopic async

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on pull request #14141:
URL: https://github.com/apache/pulsar/pull/14141#issuecomment-1031143932


   @Technoboy-  Please add tag ``components/management ``.
   
   Thanks~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #14141: Make PersistentTopicsBase#internalUpdatePartitionedTopic async

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #14141:
URL: https://github.com/apache/pulsar/pull/14141#discussion_r800408253



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
##########
@@ -228,15 +228,33 @@ public void createNonPartitionedTopic(
             @ApiResponse(code = 406, message = "The number of partitions should be more than 0"
                     + " and less than or equal to maxNumPartitionsPerPartitionedTopic"),
             @ApiResponse(code = 409, message = "Partitioned topic does not exist")})
-    public void updatePartitionedTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
+    public void updatePartitionedTopic(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("updateLocalTopicOnly") @DefaultValue("false") boolean updateLocalTopicOnly,
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @QueryParam("force") @DefaultValue("false") boolean force,
             int numPartitions) {
-        validateTopicName(property, cluster, namespace, encodedTopic);
-        internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly, authoritative, force);
+        try {
+            validateTopicName(property, cluster, namespace, encodedTopic);
+            internalUpdatePartitionedTopicAsync(numPartitions, updateLocalTopicOnly, authoritative, force)
+                    .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+                    .exceptionally(ex -> {
+                        Throwable cause = ex.getCause();

Review comment:
       ok.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] mattisonchao edited a comment on pull request #14141: Make PersistentTopicsBase#internalUpdatePartitionedTopic async

Posted by GitBox <gi...@apache.org>.
mattisonchao edited a comment on pull request #14141:
URL: https://github.com/apache/pulsar/pull/14141#issuecomment-1031143932


   @Technoboy-  Please add tag ``components/management``.
   
   Thanks~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #14141: Make PersistentTopicsBase#internalUpdatePartitionedTopic async

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #14141:
URL: https://github.com/apache/pulsar/pull/14141#discussion_r800458474



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -416,87 +432,60 @@ protected void internalCreateNonPartitionedTopic(boolean authoritative, Map<Stri
      *
      * @param numPartitions

Review comment:
       yes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on pull request #14141: [Broker] Make PersistentTopicsBase#internalUpdatePartitionedTopic async

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on pull request #14141:
URL: https://github.com/apache/pulsar/pull/14141#issuecomment-1032163663


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] mattisonchao commented on a change in pull request #14141: [Broker] Make PersistentTopicsBase#internalUpdatePartitionedTopic async

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on a change in pull request #14141:
URL: https://github.com/apache/pulsar/pull/14141#discussion_r801621302



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
##########
@@ -228,15 +228,27 @@ public void createNonPartitionedTopic(
             @ApiResponse(code = 406, message = "The number of partitions should be more than 0"
                     + " and less than or equal to maxNumPartitionsPerPartitionedTopic"),
             @ApiResponse(code = 409, message = "Partitioned topic does not exist")})
-    public void updatePartitionedTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
+    public void updatePartitionedTopic(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("updateLocalTopicOnly") @DefaultValue("false") boolean updateLocalTopicOnly,
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @QueryParam("force") @DefaultValue("false") boolean force,
             int numPartitions) {
-        validateTopicName(property, cluster, namespace, encodedTopic);
-        internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly, authoritative, force);
+        try {
+            validateTopicName(property, cluster, namespace, encodedTopic);
+            internalUpdatePartitionedTopicAsync(numPartitions, updateLocalTopicOnly, authoritative, force)
+                    .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+                    .exceptionally(ex -> {
+                        resumeAsyncResponseExceptionally(asyncResponse, ex);
+                        return null;
+                    });
+        } catch (Exception e) {

Review comment:
       Looks we just need to catch  ``validateTopicName(property, cluster, namespace, encodedTopic);`` right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #14141: [Broker] Make PersistentTopicsBase#internalUpdatePartitionedTopic async

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #14141:
URL: https://github.com/apache/pulsar/pull/14141#discussion_r801643597



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
##########
@@ -2265,7 +2265,7 @@ public void testFailedUpdatePartitionedTopic() throws Exception {
 
         try {
             admin.topics().updatePartitionedTopic(partitionedTopicName, newPartitions, false, false);
-        } catch (PulsarAdminException.PreconditionFailedException e) {
+        } catch (Exception e) {

Review comment:
       Yes, as we change `updatePartitionedTopic` from sync to async, the exception is wrapped by CompletionException




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] github-actions[bot] commented on pull request #14141: [Broker] Make PersistentTopicsBase#internalUpdatePartitionedTopic async

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14141:
URL: https://github.com/apache/pulsar/pull/14141#issuecomment-1069833339


   The pr had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] mattisonchao commented on a change in pull request #14141: Make PersistentTopicsBase#internalUpdatePartitionedTopic async

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on a change in pull request #14141:
URL: https://github.com/apache/pulsar/pull/14141#discussion_r800464850



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -415,88 +427,66 @@ protected void internalCreateNonPartitionedTopic(boolean authoritative, Map<Stri
      * well. Therefore, it can violate partition ordering at producers until all producers are restarted at application.
      *
      * @param numPartitions
+     * @param updateLocalTopicOnly
+     * @param authoritative
+     * @param force
      */
-    protected void internalUpdatePartitionedTopic(int numPartitions,
-                                                  boolean updateLocalTopicOnly, boolean authoritative,
-                                                  boolean force) {
+    protected CompletableFuture<Void> internalUpdatePartitionedTopicAsync(int numPartitions,
+                                                                          boolean updateLocalTopicOnly,
+                                                                          boolean authoritative, boolean force) {
         if (numPartitions <= 0) {
-            throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
+            return FutureUtil.failedFuture(new RestException(Status.NOT_ACCEPTABLE,
+                    "Number of partitions should be more than 0"));
         }
-
-        validateTopicOwnership(topicName, authoritative);
-        validateTopicPolicyOperation(topicName, PolicyName.PARTITION, PolicyOperation.WRITE);
-        // Only do the validation if it's the first hop.
-        if (!updateLocalTopicOnly && !force) {
-            validatePartitionTopicUpdate(topicName.getLocalName(), numPartitions);
-        }
-        final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
-        if (maxPartitions > 0 && numPartitions > maxPartitions) {
-            throw new RestException(Status.NOT_ACCEPTABLE,
-                    "Number of partitions should be less than or equal to " + maxPartitions);
-        }
-
-        if (topicName.isGlobal() && isNamespaceReplicated(topicName.getNamespaceObject())) {
-            Set<String> clusters = getNamespaceReplicatedClusters(topicName.getNamespaceObject());
-            if (!clusters.contains(pulsar().getConfig().getClusterName())) {
-                log.error("[{}] local cluster is not part of replicated cluster for namespace {}", clientAppId(),
-                        topicName);
-                throw new RestException(Status.FORBIDDEN, "Local cluster is not part of replicate cluster list");
-            }
-            try {
-                tryCreatePartitionsAsync(numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
-                createSubscriptions(topicName, numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
-            } catch (Exception e) {
-                if (e.getCause() instanceof RestException) {
-                    throw (RestException) e.getCause();
+        return validateTopicOwnershipAsync(topicName, authoritative)
+            .thenCompose(__ -> validateTopicPolicyOperationAsync(topicName, PolicyName.PARTITION,
+                    PolicyOperation.WRITE))
+            .thenCompose(__ -> {
+                if (!updateLocalTopicOnly && !force) {
+                    return validatePartitionTopicUpdateAsync(topicName.getLocalName(), numPartitions);
+                }  else {
+                    return CompletableFuture.completedFuture(null);
                 }
-                log.error("[{}] Failed to update partitioned topic {}", clientAppId(), topicName, e);
-                throw new RestException(e);
-            }
-            // if this cluster is the first hop which needs to coordinate with other clusters then update partitions in
-            // other clusters and then update number of partitions.
-            if (!updateLocalTopicOnly) {
-                CompletableFuture<Void> updatePartition = new CompletableFuture<>();
-                updatePartitionInOtherCluster(numPartitions, clusters).thenRun(() -> {
-                    try {
-                        namespaceResources().getPartitionedTopicResources()
-                                .updatePartitionedTopicAsync(topicName, p ->
-                            new PartitionedTopicMetadata(numPartitions)
-                        ).thenAccept(r -> updatePartition.complete(null)).exceptionally(ex -> {
-                            updatePartition.completeExceptionally(ex.getCause());
-                            return null;
-                        });
-                    } catch (Exception e) {
-                        updatePartition.completeExceptionally(e);
-                    }
-                }).exceptionally(ex -> {
-                    updatePartition.completeExceptionally(ex);
-                    return null;
-                });
-                try {
-                    updatePartition.get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
-                } catch (Exception e) {
-                    log.error("{} Failed to update number of partitions in zk for topic {} and partitions {}",
-                            clientAppId(), topicName, numPartitions, e);
-                    if (e.getCause() instanceof RestException) {
-                        throw (RestException) e.getCause();
-                    }
-                    throw new RestException(e);
+            })
+            .thenCompose(__ -> {
+                final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
+                if (maxPartitions > 0 && numPartitions > maxPartitions) {
+                    throw new RestException(Status.NOT_ACCEPTABLE,
+                            "Number of partitions should be less than or equal to " + maxPartitions);
                 }
-            }
-            return;
-        }
-
-        try {
-            tryCreatePartitionsAsync(numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
-            updatePartitionedTopic(topicName, numPartitions, force).get(DEFAULT_OPERATION_TIMEOUT_SEC,
-                    TimeUnit.SECONDS);
-        } catch (Exception e) {
-            if (e.getCause() instanceof RestException) {
-                throw (RestException) e.getCause();
-            }
-            log.error("[{}] Failed to update partitioned topic {}", clientAppId(), topicName, e);
-            throw new RestException(e);
-        }
+                CompletableFuture<Void> ret;

Review comment:
       it looks unused. : (




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #14141: Make PersistentTopicsBase#internalUpdatePartitionedTopic async

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #14141:
URL: https://github.com/apache/pulsar/pull/14141#discussion_r800391437



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -416,87 +432,60 @@ protected void internalCreateNonPartitionedTopic(boolean authoritative, Map<Stri
      *
      * @param numPartitions
      */
-    protected void internalUpdatePartitionedTopic(int numPartitions,
-                                                  boolean updateLocalTopicOnly, boolean authoritative,
-                                                  boolean force) {
+    protected CompletableFuture<Void> internalUpdatePartitionedTopicAsync(int numPartitions,
+                                                                          boolean updateLocalTopicOnly,
+                                                                          boolean authoritative, boolean force) {
         if (numPartitions <= 0) {
-            throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
+            return FutureUtil.failedFuture(new RestException(Status.NOT_ACCEPTABLE,
+                    "Number of partitions should be more than 0"));
         }
-
-        validateTopicOwnership(topicName, authoritative);
-        validateTopicPolicyOperation(topicName, PolicyName.PARTITION, PolicyOperation.WRITE);
-        // Only do the validation if it's the first hop.
-        if (!updateLocalTopicOnly && !force) {
-            validatePartitionTopicUpdate(topicName.getLocalName(), numPartitions);
-        }
-        final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
-        if (maxPartitions > 0 && numPartitions > maxPartitions) {
-            throw new RestException(Status.NOT_ACCEPTABLE,
-                    "Number of partitions should be less than or equal to " + maxPartitions);
-        }
-
-        if (topicName.isGlobal() && isNamespaceReplicated(topicName.getNamespaceObject())) {
-            Set<String> clusters = getNamespaceReplicatedClusters(topicName.getNamespaceObject());
-            if (!clusters.contains(pulsar().getConfig().getClusterName())) {
-                log.error("[{}] local cluster is not part of replicated cluster for namespace {}", clientAppId(),
-                        topicName);
-                throw new RestException(Status.FORBIDDEN, "Local cluster is not part of replicate cluster list");
-            }
-            try {
-                tryCreatePartitionsAsync(numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
-                createSubscriptions(topicName, numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
-            } catch (Exception e) {
-                if (e.getCause() instanceof RestException) {
-                    throw (RestException) e.getCause();
+        return validateTopicOwnershipAsync(topicName, authoritative)
+            .thenCompose(__ -> validateTopicPolicyOperationAsync(topicName, PolicyName.PARTITION,
+                    PolicyOperation.WRITE))
+            .thenCompose(__ -> {
+                CompletableFuture<Void> ret;
+                if (!updateLocalTopicOnly && !force) {
+                    ret = validatePartitionTopicUpdateAsync(topicName.getLocalName(), numPartitions);

Review comment:
       yes, right.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] mattisonchao commented on a change in pull request #14141: Make PersistentTopicsBase#internalUpdatePartitionedTopic async

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on a change in pull request #14141:
URL: https://github.com/apache/pulsar/pull/14141#discussion_r800360060



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -175,6 +175,22 @@
         }
     }
 
+    protected CompletableFuture<List<String>> internalGetListAsync() {
+        return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GET_TOPICS)
+                .thenCompose(__ -> {
+                    try {
+                        if (!namespaceResources().namespaceExists(namespaceName)) {

Review comment:
       This method can be async.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -416,87 +432,60 @@ protected void internalCreateNonPartitionedTopic(boolean authoritative, Map<Stri
      *
      * @param numPartitions
      */
-    protected void internalUpdatePartitionedTopic(int numPartitions,
-                                                  boolean updateLocalTopicOnly, boolean authoritative,
-                                                  boolean force) {
+    protected CompletableFuture<Void> internalUpdatePartitionedTopicAsync(int numPartitions,
+                                                                          boolean updateLocalTopicOnly,
+                                                                          boolean authoritative, boolean force) {
         if (numPartitions <= 0) {
-            throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
+            return FutureUtil.failedFuture(new RestException(Status.NOT_ACCEPTABLE,
+                    "Number of partitions should be more than 0"));
         }
-
-        validateTopicOwnership(topicName, authoritative);
-        validateTopicPolicyOperation(topicName, PolicyName.PARTITION, PolicyOperation.WRITE);
-        // Only do the validation if it's the first hop.
-        if (!updateLocalTopicOnly && !force) {
-            validatePartitionTopicUpdate(topicName.getLocalName(), numPartitions);
-        }
-        final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
-        if (maxPartitions > 0 && numPartitions > maxPartitions) {
-            throw new RestException(Status.NOT_ACCEPTABLE,
-                    "Number of partitions should be less than or equal to " + maxPartitions);
-        }
-
-        if (topicName.isGlobal() && isNamespaceReplicated(topicName.getNamespaceObject())) {
-            Set<String> clusters = getNamespaceReplicatedClusters(topicName.getNamespaceObject());
-            if (!clusters.contains(pulsar().getConfig().getClusterName())) {
-                log.error("[{}] local cluster is not part of replicated cluster for namespace {}", clientAppId(),
-                        topicName);
-                throw new RestException(Status.FORBIDDEN, "Local cluster is not part of replicate cluster list");
-            }
-            try {
-                tryCreatePartitionsAsync(numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
-                createSubscriptions(topicName, numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
-            } catch (Exception e) {
-                if (e.getCause() instanceof RestException) {
-                    throw (RestException) e.getCause();
+        return validateTopicOwnershipAsync(topicName, authoritative)
+            .thenCompose(__ -> validateTopicPolicyOperationAsync(topicName, PolicyName.PARTITION,
+                    PolicyOperation.WRITE))
+            .thenCompose(__ -> {
+                CompletableFuture<Void> ret;
+                if (!updateLocalTopicOnly && !force) {
+                    ret = validatePartitionTopicUpdateAsync(topicName.getLocalName(), numPartitions);
+                }  else {
+                    ret = CompletableFuture.completedFuture(null);
                 }
-                log.error("[{}] Failed to update partitioned topic {}", clientAppId(), topicName, e);
-                throw new RestException(e);
-            }
-            // if this cluster is the first hop which needs to coordinate with other clusters then update partitions in
-            // other clusters and then update number of partitions.
-            if (!updateLocalTopicOnly) {
-                CompletableFuture<Void> updatePartition = new CompletableFuture<>();
-                updatePartitionInOtherCluster(numPartitions, clusters).thenRun(() -> {
-                    try {
-                        namespaceResources().getPartitionedTopicResources()
-                                .updatePartitionedTopicAsync(topicName, p ->
-                            new PartitionedTopicMetadata(numPartitions)
-                        ).thenAccept(r -> updatePartition.complete(null)).exceptionally(ex -> {
-                            updatePartition.completeExceptionally(ex.getCause());
-                            return null;
-                        });
-                    } catch (Exception e) {
-                        updatePartition.completeExceptionally(e);
-                    }
-                }).exceptionally(ex -> {
-                    updatePartition.completeExceptionally(ex);
-                    return null;
-                });
-                try {
-                    updatePartition.get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
-                } catch (Exception e) {
-                    log.error("{} Failed to update number of partitions in zk for topic {} and partitions {}",
-                            clientAppId(), topicName, numPartitions, e);
-                    if (e.getCause() instanceof RestException) {
-                        throw (RestException) e.getCause();
+                return ret;
+            })
+            .thenCompose(__ -> {
+                final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
+                if (maxPartitions > 0 && numPartitions > maxPartitions) {
+                    throw new RestException(Status.NOT_ACCEPTABLE,
+                            "Number of partitions should be less than or equal to " + maxPartitions);
+                }
+                // Only do the validation if it's the first hop.
+                if (topicName.isGlobal() && isNamespaceReplicated(topicName.getNamespaceObject())) {
+                    Set<String> clusters = getNamespaceReplicatedClusters(topicName.getNamespaceObject());

Review comment:
       This method can be async. 

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
##########
@@ -228,15 +228,33 @@ public void createNonPartitionedTopic(
             @ApiResponse(code = 406, message = "The number of partitions should be more than 0"
                     + " and less than or equal to maxNumPartitionsPerPartitionedTopic"),
             @ApiResponse(code = 409, message = "Partitioned topic does not exist")})
-    public void updatePartitionedTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
+    public void updatePartitionedTopic(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("updateLocalTopicOnly") @DefaultValue("false") boolean updateLocalTopicOnly,
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @QueryParam("force") @DefaultValue("false") boolean force,
             int numPartitions) {
-        validateTopicName(property, cluster, namespace, encodedTopic);
-        internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly, authoritative, force);
+        try {
+            validateTopicName(property, cluster, namespace, encodedTopic);
+            internalUpdatePartitionedTopicAsync(numPartitions, updateLocalTopicOnly, authoritative, force)
+                    .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+                    .exceptionally(ex -> {
+                        Throwable cause = ex.getCause();

Review comment:
       There can also use ``resumeAsyncResponseExceptionally``

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -796,9 +797,25 @@ public void updatePartitionedTopic(
             @ApiParam(value = "The number of partitions for the topic",
                     required = true, type = "int", defaultValue = "0")
                     int numPartitions) {
-        validatePartitionedTopicName(tenant, namespace, encodedTopic);
-        validatePartitionedTopicMetadata(tenant, namespace, encodedTopic);
-        internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly, authoritative, force);
+        try {
+            validatePartitionedTopicName(tenant, namespace, encodedTopic);
+            validatePartitionedTopicMetadata(tenant, namespace, encodedTopic);
+            internalUpdatePartitionedTopicAsync(numPartitions, updateLocalTopicOnly, authoritative, force)
+                    .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+                    .exceptionally(ex -> {
+                        Throwable cause = ex.getCause();

Review comment:
       There can also use ``resumeAsyncResponseExceptionally``

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -416,87 +432,60 @@ protected void internalCreateNonPartitionedTopic(boolean authoritative, Map<Stri
      *
      * @param numPartitions

Review comment:
       Maybe we need to update ``@param``? 

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -796,9 +797,25 @@ public void updatePartitionedTopic(
             @ApiParam(value = "The number of partitions for the topic",
                     required = true, type = "int", defaultValue = "0")
                     int numPartitions) {
-        validatePartitionedTopicName(tenant, namespace, encodedTopic);
-        validatePartitionedTopicMetadata(tenant, namespace, encodedTopic);
-        internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly, authoritative, force);
+        try {
+            validatePartitionedTopicName(tenant, namespace, encodedTopic);
+            validatePartitionedTopicMetadata(tenant, namespace, encodedTopic);

Review comment:
       This method can be async.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -416,87 +432,60 @@ protected void internalCreateNonPartitionedTopic(boolean authoritative, Map<Stri
      *
      * @param numPartitions
      */
-    protected void internalUpdatePartitionedTopic(int numPartitions,
-                                                  boolean updateLocalTopicOnly, boolean authoritative,
-                                                  boolean force) {
+    protected CompletableFuture<Void> internalUpdatePartitionedTopicAsync(int numPartitions,
+                                                                          boolean updateLocalTopicOnly,
+                                                                          boolean authoritative, boolean force) {
         if (numPartitions <= 0) {
-            throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
+            return FutureUtil.failedFuture(new RestException(Status.NOT_ACCEPTABLE,
+                    "Number of partitions should be more than 0"));
         }
-
-        validateTopicOwnership(topicName, authoritative);
-        validateTopicPolicyOperation(topicName, PolicyName.PARTITION, PolicyOperation.WRITE);
-        // Only do the validation if it's the first hop.
-        if (!updateLocalTopicOnly && !force) {
-            validatePartitionTopicUpdate(topicName.getLocalName(), numPartitions);
-        }
-        final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
-        if (maxPartitions > 0 && numPartitions > maxPartitions) {
-            throw new RestException(Status.NOT_ACCEPTABLE,
-                    "Number of partitions should be less than or equal to " + maxPartitions);
-        }
-
-        if (topicName.isGlobal() && isNamespaceReplicated(topicName.getNamespaceObject())) {
-            Set<String> clusters = getNamespaceReplicatedClusters(topicName.getNamespaceObject());
-            if (!clusters.contains(pulsar().getConfig().getClusterName())) {
-                log.error("[{}] local cluster is not part of replicated cluster for namespace {}", clientAppId(),
-                        topicName);
-                throw new RestException(Status.FORBIDDEN, "Local cluster is not part of replicate cluster list");
-            }
-            try {
-                tryCreatePartitionsAsync(numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
-                createSubscriptions(topicName, numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
-            } catch (Exception e) {
-                if (e.getCause() instanceof RestException) {
-                    throw (RestException) e.getCause();
+        return validateTopicOwnershipAsync(topicName, authoritative)
+            .thenCompose(__ -> validateTopicPolicyOperationAsync(topicName, PolicyName.PARTITION,
+                    PolicyOperation.WRITE))
+            .thenCompose(__ -> {
+                CompletableFuture<Void> ret;
+                if (!updateLocalTopicOnly && !force) {
+                    ret = validatePartitionTopicUpdateAsync(topicName.getLocalName(), numPartitions);

Review comment:
       I think It would be clearer to use ``return`` directly.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -416,87 +432,60 @@ protected void internalCreateNonPartitionedTopic(boolean authoritative, Map<Stri
      *
      * @param numPartitions
      */
-    protected void internalUpdatePartitionedTopic(int numPartitions,
-                                                  boolean updateLocalTopicOnly, boolean authoritative,
-                                                  boolean force) {
+    protected CompletableFuture<Void> internalUpdatePartitionedTopicAsync(int numPartitions,
+                                                                          boolean updateLocalTopicOnly,
+                                                                          boolean authoritative, boolean force) {
         if (numPartitions <= 0) {
-            throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
+            return FutureUtil.failedFuture(new RestException(Status.NOT_ACCEPTABLE,
+                    "Number of partitions should be more than 0"));
         }
-
-        validateTopicOwnership(topicName, authoritative);
-        validateTopicPolicyOperation(topicName, PolicyName.PARTITION, PolicyOperation.WRITE);
-        // Only do the validation if it's the first hop.
-        if (!updateLocalTopicOnly && !force) {
-            validatePartitionTopicUpdate(topicName.getLocalName(), numPartitions);
-        }
-        final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
-        if (maxPartitions > 0 && numPartitions > maxPartitions) {
-            throw new RestException(Status.NOT_ACCEPTABLE,
-                    "Number of partitions should be less than or equal to " + maxPartitions);
-        }
-
-        if (topicName.isGlobal() && isNamespaceReplicated(topicName.getNamespaceObject())) {
-            Set<String> clusters = getNamespaceReplicatedClusters(topicName.getNamespaceObject());
-            if (!clusters.contains(pulsar().getConfig().getClusterName())) {
-                log.error("[{}] local cluster is not part of replicated cluster for namespace {}", clientAppId(),
-                        topicName);
-                throw new RestException(Status.FORBIDDEN, "Local cluster is not part of replicate cluster list");
-            }
-            try {
-                tryCreatePartitionsAsync(numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
-                createSubscriptions(topicName, numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
-            } catch (Exception e) {
-                if (e.getCause() instanceof RestException) {
-                    throw (RestException) e.getCause();
+        return validateTopicOwnershipAsync(topicName, authoritative)
+            .thenCompose(__ -> validateTopicPolicyOperationAsync(topicName, PolicyName.PARTITION,
+                    PolicyOperation.WRITE))
+            .thenCompose(__ -> {
+                CompletableFuture<Void> ret;
+                if (!updateLocalTopicOnly && !force) {
+                    ret = validatePartitionTopicUpdateAsync(topicName.getLocalName(), numPartitions);
+                }  else {
+                    ret = CompletableFuture.completedFuture(null);
                 }
-                log.error("[{}] Failed to update partitioned topic {}", clientAppId(), topicName, e);
-                throw new RestException(e);
-            }
-            // if this cluster is the first hop which needs to coordinate with other clusters then update partitions in
-            // other clusters and then update number of partitions.
-            if (!updateLocalTopicOnly) {
-                CompletableFuture<Void> updatePartition = new CompletableFuture<>();
-                updatePartitionInOtherCluster(numPartitions, clusters).thenRun(() -> {
-                    try {
-                        namespaceResources().getPartitionedTopicResources()
-                                .updatePartitionedTopicAsync(topicName, p ->
-                            new PartitionedTopicMetadata(numPartitions)
-                        ).thenAccept(r -> updatePartition.complete(null)).exceptionally(ex -> {
-                            updatePartition.completeExceptionally(ex.getCause());
-                            return null;
-                        });
-                    } catch (Exception e) {
-                        updatePartition.completeExceptionally(e);
-                    }
-                }).exceptionally(ex -> {
-                    updatePartition.completeExceptionally(ex);
-                    return null;
-                });
-                try {
-                    updatePartition.get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
-                } catch (Exception e) {
-                    log.error("{} Failed to update number of partitions in zk for topic {} and partitions {}",
-                            clientAppId(), topicName, numPartitions, e);
-                    if (e.getCause() instanceof RestException) {
-                        throw (RestException) e.getCause();
+                return ret;
+            })
+            .thenCompose(__ -> {
+                final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
+                if (maxPartitions > 0 && numPartitions > maxPartitions) {
+                    throw new RestException(Status.NOT_ACCEPTABLE,
+                            "Number of partitions should be less than or equal to " + maxPartitions);
+                }
+                // Only do the validation if it's the first hop.
+                if (topicName.isGlobal() && isNamespaceReplicated(topicName.getNamespaceObject())) {
+                    Set<String> clusters = getNamespaceReplicatedClusters(topicName.getNamespaceObject());
+                    if (!clusters.contains(pulsar().getConfig().getClusterName())) {
+                        log.error("[{}] local cluster is not part of replicated cluster for namespace {}",
+                                clientAppId(), topicName);
+                        throw new RestException(Status.FORBIDDEN, "Local cluster is not part of replicate cluster"
+                                + " list");
                     }
-                    throw new RestException(e);
+                    return tryCreatePartitionsAsync(numPartitions)
+                            .thenCompose(ignore -> createSubscriptions(topicName, numPartitions))
+                            .thenCompose(ignore -> {
+                                CompletableFuture<Void> ret;

Review comment:
       I think It would be clearer to use return directly.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -175,6 +175,22 @@
         }
     }
 
+    protected CompletableFuture<List<String>> internalGetListAsync() {
+        return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GET_TOPICS)
+                .thenCompose(__ -> {
+                    try {
+                        if (!namespaceResources().namespaceExists(namespaceName)) {
+                            throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
+                        }
+                    } catch (MetadataStoreException e) {

Review comment:
       If change ``namespaceResources().namespaceExists(namespaceName)`` to async, we can remove this catch.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] mattisonchao edited a comment on pull request #14141: Make PersistentTopicsBase#internalUpdatePartitionedTopic async

Posted by GitBox <gi...@apache.org>.
mattisonchao edited a comment on pull request #14141:
URL: https://github.com/apache/pulsar/pull/14141#issuecomment-1031143932


   @Technoboy-  Please add tag ``component/management``.
   
   Thanks~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #14141: Make PersistentTopicsBase#internalUpdatePartitionedTopic async

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #14141:
URL: https://github.com/apache/pulsar/pull/14141#discussion_r800492894



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -415,88 +427,66 @@ protected void internalCreateNonPartitionedTopic(boolean authoritative, Map<Stri
      * well. Therefore, it can violate partition ordering at producers until all producers are restarted at application.
      *
      * @param numPartitions
+     * @param updateLocalTopicOnly
+     * @param authoritative
+     * @param force
      */
-    protected void internalUpdatePartitionedTopic(int numPartitions,
-                                                  boolean updateLocalTopicOnly, boolean authoritative,
-                                                  boolean force) {
+    protected CompletableFuture<Void> internalUpdatePartitionedTopicAsync(int numPartitions,
+                                                                          boolean updateLocalTopicOnly,
+                                                                          boolean authoritative, boolean force) {
         if (numPartitions <= 0) {
-            throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
+            return FutureUtil.failedFuture(new RestException(Status.NOT_ACCEPTABLE,
+                    "Number of partitions should be more than 0"));
         }
-
-        validateTopicOwnership(topicName, authoritative);
-        validateTopicPolicyOperation(topicName, PolicyName.PARTITION, PolicyOperation.WRITE);
-        // Only do the validation if it's the first hop.
-        if (!updateLocalTopicOnly && !force) {
-            validatePartitionTopicUpdate(topicName.getLocalName(), numPartitions);
-        }
-        final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
-        if (maxPartitions > 0 && numPartitions > maxPartitions) {
-            throw new RestException(Status.NOT_ACCEPTABLE,
-                    "Number of partitions should be less than or equal to " + maxPartitions);
-        }
-
-        if (topicName.isGlobal() && isNamespaceReplicated(topicName.getNamespaceObject())) {
-            Set<String> clusters = getNamespaceReplicatedClusters(topicName.getNamespaceObject());
-            if (!clusters.contains(pulsar().getConfig().getClusterName())) {
-                log.error("[{}] local cluster is not part of replicated cluster for namespace {}", clientAppId(),
-                        topicName);
-                throw new RestException(Status.FORBIDDEN, "Local cluster is not part of replicate cluster list");
-            }
-            try {
-                tryCreatePartitionsAsync(numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
-                createSubscriptions(topicName, numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
-            } catch (Exception e) {
-                if (e.getCause() instanceof RestException) {
-                    throw (RestException) e.getCause();
+        return validateTopicOwnershipAsync(topicName, authoritative)
+            .thenCompose(__ -> validateTopicPolicyOperationAsync(topicName, PolicyName.PARTITION,
+                    PolicyOperation.WRITE))
+            .thenCompose(__ -> {
+                if (!updateLocalTopicOnly && !force) {
+                    return validatePartitionTopicUpdateAsync(topicName.getLocalName(), numPartitions);
+                }  else {
+                    return CompletableFuture.completedFuture(null);
                 }
-                log.error("[{}] Failed to update partitioned topic {}", clientAppId(), topicName, e);
-                throw new RestException(e);
-            }
-            // if this cluster is the first hop which needs to coordinate with other clusters then update partitions in
-            // other clusters and then update number of partitions.
-            if (!updateLocalTopicOnly) {
-                CompletableFuture<Void> updatePartition = new CompletableFuture<>();
-                updatePartitionInOtherCluster(numPartitions, clusters).thenRun(() -> {
-                    try {
-                        namespaceResources().getPartitionedTopicResources()
-                                .updatePartitionedTopicAsync(topicName, p ->
-                            new PartitionedTopicMetadata(numPartitions)
-                        ).thenAccept(r -> updatePartition.complete(null)).exceptionally(ex -> {
-                            updatePartition.completeExceptionally(ex.getCause());
-                            return null;
-                        });
-                    } catch (Exception e) {
-                        updatePartition.completeExceptionally(e);
-                    }
-                }).exceptionally(ex -> {
-                    updatePartition.completeExceptionally(ex);
-                    return null;
-                });
-                try {
-                    updatePartition.get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
-                } catch (Exception e) {
-                    log.error("{} Failed to update number of partitions in zk for topic {} and partitions {}",
-                            clientAppId(), topicName, numPartitions, e);
-                    if (e.getCause() instanceof RestException) {
-                        throw (RestException) e.getCause();
-                    }
-                    throw new RestException(e);
+            })
+            .thenCompose(__ -> {
+                final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
+                if (maxPartitions > 0 && numPartitions > maxPartitions) {
+                    throw new RestException(Status.NOT_ACCEPTABLE,
+                            "Number of partitions should be less than or equal to " + maxPartitions);
                 }
-            }
-            return;
-        }
-
-        try {
-            tryCreatePartitionsAsync(numPartitions).get(DEFAULT_OPERATION_TIMEOUT_SEC, TimeUnit.SECONDS);
-            updatePartitionedTopic(topicName, numPartitions, force).get(DEFAULT_OPERATION_TIMEOUT_SEC,
-                    TimeUnit.SECONDS);
-        } catch (Exception e) {
-            if (e.getCause() instanceof RestException) {
-                throw (RestException) e.getCause();
-            }
-            log.error("[{}] Failed to update partitioned topic {}", clientAppId(), topicName, e);
-            throw new RestException(e);
-        }
+                CompletableFuture<Void> ret;

Review comment:
       done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #14141: Make PersistentTopicsBase#internalUpdatePartitionedTopic async

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #14141:
URL: https://github.com/apache/pulsar/pull/14141#discussion_r800389304



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -175,6 +175,22 @@
         }
     }
 
+    protected CompletableFuture<List<String>> internalGetListAsync() {
+        return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GET_TOPICS)
+                .thenCompose(__ -> {
+                    try {
+                        if (!namespaceResources().namespaceExists(namespaceName)) {

Review comment:
       yes, get it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #14141: [Broker] Make PersistentTopicsBase#internalUpdatePartitionedTopic async

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #14141:
URL: https://github.com/apache/pulsar/pull/14141#discussion_r801705420



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
##########
@@ -2265,7 +2265,7 @@ public void testFailedUpdatePartitionedTopic() throws Exception {
 
         try {
             admin.topics().updatePartitionedTopic(partitionedTopicName, newPartitions, false, false);
-        } catch (PulsarAdminException.PreconditionFailedException e) {
+        } catch (Exception e) {

Review comment:
       I didn't call e.getCause() in `resumeAsyncResponseExceptionally(asyncResponse, ex);`
   Thanks, fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #14141: [Broker] Make PersistentTopicsBase#internalUpdatePartitionedTopic async

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #14141:
URL: https://github.com/apache/pulsar/pull/14141#discussion_r801643925



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
##########
@@ -228,15 +228,27 @@ public void createNonPartitionedTopic(
             @ApiResponse(code = 406, message = "The number of partitions should be more than 0"
                     + " and less than or equal to maxNumPartitionsPerPartitionedTopic"),
             @ApiResponse(code = 409, message = "Partitioned topic does not exist")})
-    public void updatePartitionedTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
+    public void updatePartitionedTopic(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("updateLocalTopicOnly") @DefaultValue("false") boolean updateLocalTopicOnly,
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @QueryParam("force") @DefaultValue("false") boolean force,
             int numPartitions) {
-        validateTopicName(property, cluster, namespace, encodedTopic);
-        internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly, authoritative, force);
+        try {
+            validateTopicName(property, cluster, namespace, encodedTopic);
+            internalUpdatePartitionedTopicAsync(numPartitions, updateLocalTopicOnly, authoritative, force)
+                    .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+                    .exceptionally(ex -> {
+                        resumeAsyncResponseExceptionally(asyncResponse, ex);
+                        return null;
+                    });
+        } catch (Exception e) {

Review comment:
       yes. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org