You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/02/09 01:36:56 UTC

[GitHub] [pulsar] mattisonchao commented on a change in pull request #14153: [Broker] Make PersistentTopicsBase#internalGetPartitionedMetadata async

mattisonchao commented on a change in pull request #14153:
URL: https://github.com/apache/pulsar/pull/14153#discussion_r802197683



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
##########
@@ -425,18 +425,19 @@ protected void setServletContext(ServletContext servletContext) {
         this.servletContext = servletContext;
     }
 
+    protected PartitionedTopicMetadata getPartitionedTopicMetadata(TopicName topicName,
+                                                                   boolean authoritative,
+                                                                   boolean checkAllowAutoCreation) {
+        return getPartitionedTopicMetadataAsync(topicName, authoritative, checkAllowAutoCreation).join();
+    }
+
     protected CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadataAsync(
             TopicName topicName, boolean authoritative, boolean checkAllowAutoCreation) {
-        try {
-            validateClusterOwnership(topicName.getCluster());
-        } catch (Exception e) {
-            return FutureUtil.failedFuture(e);
-        }
-
         // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
         // serve/redirect request else fail partitioned-metadata-request so, client fails while creating
         // producer/consumer
-        return validateGlobalNamespaceOwnershipAsync(topicName.getNamespaceObject())
+        return validateClusterOwnershipAsync(topicName.getCluster())
+                .thenCompose(__ -> validateGlobalNamespaceOwnershipAsync(topicName.getNamespaceObject()))
                 .thenRun(() -> {
                     validateTopicOperation(topicName, TopicOperation.LOOKUP);

Review comment:
       this method can be async

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
##########
@@ -245,13 +244,25 @@ public void updatePartitionedTopic(@PathParam("property") String property, @Path
     @ApiResponses(value = {
             @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
             @ApiResponse(code = 403, message = "Don't have admin permission") })
-    public PartitionedTopicMetadata getPartitionedMetadata(@PathParam("property") String property,
+    public void getPartitionedMetadata(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("property") String property,
             @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @QueryParam("checkAllowAutoCreation") @DefaultValue("false") boolean checkAllowAutoCreation) {
-        validateTopicName(property, cluster, namespace, encodedTopic);
-        return internalGetPartitionedMetadata(authoritative, checkAllowAutoCreation);
+        try {
+            validateTopicName(property, cluster, namespace, encodedTopic);
+            internalGetPartitionedMetadataAsync(authoritative, checkAllowAutoCreation)
+                    .thenAccept(metadata -> asyncResponse.resume(metadata))
+                    .exceptionally(ex -> {
+                        resumeAsyncResponseExceptionally(asyncResponse, ex.getCause());
+                        return null;
+                    });
+        } catch (Exception e) {

Review comment:
       Maybe it's enough that we just capture ``validateTopicName`` ?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
##########
@@ -425,18 +425,19 @@ protected void setServletContext(ServletContext servletContext) {
         this.servletContext = servletContext;
     }
 
+    protected PartitionedTopicMetadata getPartitionedTopicMetadata(TopicName topicName,
+                                                                   boolean authoritative,
+                                                                   boolean checkAllowAutoCreation) {
+        return getPartitionedTopicMetadataAsync(topicName, authoritative, checkAllowAutoCreation).join();

Review comment:
       Maybe we need to use ``get()`` with a timeout to avoid a long time blocking and unchecked exceptions.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -534,27 +534,37 @@ protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) {
 
     protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean authoritative,
                                                                       boolean checkAllowAutoCreation) {
-        PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(topicName,
-                authoritative, checkAllowAutoCreation);
-        if (metadata.partitions == 0 && !checkAllowAutoCreation) {
-            // The topic may be a non-partitioned topic, so check if it exists here.
-            // However, when checkAllowAutoCreation is true, the client will create the topic if it doesn't exist.
-            // In this case, `partitions == 0` means the automatically created topic is a non-partitioned topic so we
-            // shouldn't check if the topic exists.
-            try {
-                if (!pulsar().getNamespaceService().checkTopicExists(topicName).get()) {
-                    throw new RestException(Status.NOT_FOUND,
-                            new PulsarClientException.NotFoundException("Topic not exist"));
-                }
-            } catch (InterruptedException | ExecutionException e) {
-                log.error("Failed to check if topic '{}' exists", topicName, e);
-                throw new RestException(Status.INTERNAL_SERVER_ERROR, "Failed to get topic metadata");
-            }
-        }
-        if (metadata.partitions > 1) {
-            validateClientVersion();
-        }
-        return metadata;
+        return internalGetPartitionedMetadataAsync(authoritative, checkAllowAutoCreation).join();

Review comment:
       Maybe we need to use get() with a timeout to avoid a long time blocking and unchecked exceptions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org