You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/02/07 15:26:44 UTC

[GitHub] [pulsar] Technoboy- opened a new pull request #14153: [Broker] Make PersistentTopicsBase#internalGetPartitionedMetadata async

Technoboy- opened a new pull request #14153:
URL: https://github.com/apache/pulsar/pull/14153


   ### Motivation
   
   Make PersistentTopicsBase#internalGetPartitionedMetadata async.
   
   ### Documentation
     
   - [x] `no-need-doc` 
     
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] mattisonchao commented on a change in pull request #14153: [Broker] Make PersistentTopicsBase#internalGetPartitionedMetadata async

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on a change in pull request #14153:
URL: https://github.com/apache/pulsar/pull/14153#discussion_r802197683



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
##########
@@ -425,18 +425,19 @@ protected void setServletContext(ServletContext servletContext) {
         this.servletContext = servletContext;
     }
 
+    protected PartitionedTopicMetadata getPartitionedTopicMetadata(TopicName topicName,
+                                                                   boolean authoritative,
+                                                                   boolean checkAllowAutoCreation) {
+        return getPartitionedTopicMetadataAsync(topicName, authoritative, checkAllowAutoCreation).join();
+    }
+
     protected CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadataAsync(
             TopicName topicName, boolean authoritative, boolean checkAllowAutoCreation) {
-        try {
-            validateClusterOwnership(topicName.getCluster());
-        } catch (Exception e) {
-            return FutureUtil.failedFuture(e);
-        }
-
         // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
         // serve/redirect request else fail partitioned-metadata-request so, client fails while creating
         // producer/consumer
-        return validateGlobalNamespaceOwnershipAsync(topicName.getNamespaceObject())
+        return validateClusterOwnershipAsync(topicName.getCluster())
+                .thenCompose(__ -> validateGlobalNamespaceOwnershipAsync(topicName.getNamespaceObject()))
                 .thenRun(() -> {
                     validateTopicOperation(topicName, TopicOperation.LOOKUP);

Review comment:
       this method can be async

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
##########
@@ -245,13 +244,25 @@ public void updatePartitionedTopic(@PathParam("property") String property, @Path
     @ApiResponses(value = {
             @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
             @ApiResponse(code = 403, message = "Don't have admin permission") })
-    public PartitionedTopicMetadata getPartitionedMetadata(@PathParam("property") String property,
+    public void getPartitionedMetadata(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("property") String property,
             @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @QueryParam("checkAllowAutoCreation") @DefaultValue("false") boolean checkAllowAutoCreation) {
-        validateTopicName(property, cluster, namespace, encodedTopic);
-        return internalGetPartitionedMetadata(authoritative, checkAllowAutoCreation);
+        try {
+            validateTopicName(property, cluster, namespace, encodedTopic);
+            internalGetPartitionedMetadataAsync(authoritative, checkAllowAutoCreation)
+                    .thenAccept(metadata -> asyncResponse.resume(metadata))
+                    .exceptionally(ex -> {
+                        resumeAsyncResponseExceptionally(asyncResponse, ex.getCause());
+                        return null;
+                    });
+        } catch (Exception e) {

Review comment:
       Maybe it's enough that we just capture ``validateTopicName`` ?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
##########
@@ -425,18 +425,19 @@ protected void setServletContext(ServletContext servletContext) {
         this.servletContext = servletContext;
     }
 
+    protected PartitionedTopicMetadata getPartitionedTopicMetadata(TopicName topicName,
+                                                                   boolean authoritative,
+                                                                   boolean checkAllowAutoCreation) {
+        return getPartitionedTopicMetadataAsync(topicName, authoritative, checkAllowAutoCreation).join();

Review comment:
       Maybe we need to use ``get()`` with a timeout to avoid a long time blocking and unchecked exceptions.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -534,27 +534,37 @@ protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) {
 
     protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean authoritative,
                                                                       boolean checkAllowAutoCreation) {
-        PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(topicName,
-                authoritative, checkAllowAutoCreation);
-        if (metadata.partitions == 0 && !checkAllowAutoCreation) {
-            // The topic may be a non-partitioned topic, so check if it exists here.
-            // However, when checkAllowAutoCreation is true, the client will create the topic if it doesn't exist.
-            // In this case, `partitions == 0` means the automatically created topic is a non-partitioned topic so we
-            // shouldn't check if the topic exists.
-            try {
-                if (!pulsar().getNamespaceService().checkTopicExists(topicName).get()) {
-                    throw new RestException(Status.NOT_FOUND,
-                            new PulsarClientException.NotFoundException("Topic not exist"));
-                }
-            } catch (InterruptedException | ExecutionException e) {
-                log.error("Failed to check if topic '{}' exists", topicName, e);
-                throw new RestException(Status.INTERNAL_SERVER_ERROR, "Failed to get topic metadata");
-            }
-        }
-        if (metadata.partitions > 1) {
-            validateClientVersion();
-        }
-        return metadata;
+        return internalGetPartitionedMetadataAsync(authoritative, checkAllowAutoCreation).join();

Review comment:
       Maybe we need to use get() with a timeout to avoid a long time blocking and unchecked exceptions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #14153: [Broker] Make PersistentTopicsBase#internalGetPartitionedMetadata async

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #14153:
URL: https://github.com/apache/pulsar/pull/14153#discussion_r801231595



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2515,8 +2526,8 @@ public void readEntryComplete(Entry entry, Object ctx) {
 
     protected Response internalPeekNthMessage(String subName, int messagePosition, boolean authoritative) {
         // If the topic name is a partition name, no need to get partition topic metadata again
-        if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName,
-                authoritative, false).partitions > 0) {
+        if (!topicName.isPartitioned() && getPartitionedTopicMetadataAsync(topicName,

Review comment:
       yes, right, this is a mistake for modifying here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #14153: [Broker] Make PersistentTopicsBase#internalGetPartitionedMetadata async

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #14153:
URL: https://github.com/apache/pulsar/pull/14153#discussion_r802387948



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -534,27 +534,37 @@ protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) {
 
     protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean authoritative,
                                                                       boolean checkAllowAutoCreation) {
-        PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(topicName,
-                authoritative, checkAllowAutoCreation);
-        if (metadata.partitions == 0 && !checkAllowAutoCreation) {
-            // The topic may be a non-partitioned topic, so check if it exists here.
-            // However, when checkAllowAutoCreation is true, the client will create the topic if it doesn't exist.
-            // In this case, `partitions == 0` means the automatically created topic is a non-partitioned topic so we
-            // shouldn't check if the topic exists.
-            try {
-                if (!pulsar().getNamespaceService().checkTopicExists(topicName).get()) {
-                    throw new RestException(Status.NOT_FOUND,
-                            new PulsarClientException.NotFoundException("Topic not exist"));
-                }
-            } catch (InterruptedException | ExecutionException e) {
-                log.error("Failed to check if topic '{}' exists", topicName, e);
-                throw new RestException(Status.INTERNAL_SERVER_ERROR, "Failed to get topic metadata");
-            }
-        }
-        if (metadata.partitions > 1) {
-            validateClientVersion();
-        }
-        return metadata;
+        return internalGetPartitionedMetadataAsync(authoritative, checkAllowAutoCreation).join();

Review comment:
       ok




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #14153: [Broker] Make PersistentTopicsBase#internalGetPartitionedMetadata async

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #14153:
URL: https://github.com/apache/pulsar/pull/14153#discussion_r802391829



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
##########
@@ -425,18 +425,19 @@ protected void setServletContext(ServletContext servletContext) {
         this.servletContext = servletContext;
     }
 
+    protected PartitionedTopicMetadata getPartitionedTopicMetadata(TopicName topicName,
+                                                                   boolean authoritative,
+                                                                   boolean checkAllowAutoCreation) {
+        return getPartitionedTopicMetadataAsync(topicName, authoritative, checkAllowAutoCreation).join();
+    }
+
     protected CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadataAsync(
             TopicName topicName, boolean authoritative, boolean checkAllowAutoCreation) {
-        try {
-            validateClusterOwnership(topicName.getCluster());
-        } catch (Exception e) {
-            return FutureUtil.failedFuture(e);
-        }
-
         // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
         // serve/redirect request else fail partitioned-metadata-request so, client fails while creating
         // producer/consumer
-        return validateGlobalNamespaceOwnershipAsync(topicName.getNamespaceObject())
+        return validateClusterOwnershipAsync(topicName.getCluster())
+                .thenCompose(__ -> validateGlobalNamespaceOwnershipAsync(topicName.getNamespaceObject()))
                 .thenRun(() -> {
                     validateTopicOperation(topicName, TopicOperation.LOOKUP);

Review comment:
       ok




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] RobertIndie commented on a change in pull request #14153: [Broker] Make PersistentTopicsBase#internalGetPartitionedMetadata async

Posted by GitBox <gi...@apache.org>.
RobertIndie commented on a change in pull request #14153:
URL: https://github.com/apache/pulsar/pull/14153#discussion_r801208457



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2515,8 +2526,8 @@ public void readEntryComplete(Entry entry, Object ctx) {
 
     protected Response internalPeekNthMessage(String subName, int messagePosition, boolean authoritative) {
         // If the topic name is a partition name, no need to get partition topic metadata again
-        if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName,
-                authoritative, false).partitions > 0) {
+        if (!topicName.isPartitioned() && getPartitionedTopicMetadataAsync(topicName,

Review comment:
       It seems that changing here will not make any difference. Since https://github.com/apache/pulsar/pull/14039 is fixing the call sync method here. I think it's better not to change here.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -551,27 +551,38 @@ protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) {
 
     protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean authoritative,
                                                                       boolean checkAllowAutoCreation) {
-        PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(topicName,
-                authoritative, checkAllowAutoCreation);
-        if (metadata.partitions == 0 && !checkAllowAutoCreation) {
-            // The topic may be a non-partitioned topic, so check if it exists here.
-            // However, when checkAllowAutoCreation is true, the client will create the topic if it doesn't exist.
-            // In this case, `partitions == 0` means the automatically created topic is a non-partitioned topic so we
-            // shouldn't check if the topic exists.
-            try {
-                if (!pulsar().getNamespaceService().checkTopicExists(topicName).get()) {
-                    throw new RestException(Status.NOT_FOUND,
-                            new PulsarClientException.NotFoundException("Topic not exist"));
-                }
-            } catch (InterruptedException | ExecutionException e) {
-                log.error("Failed to check if topic '{}' exists", topicName, e);
-                throw new RestException(Status.INTERNAL_SERVER_ERROR, "Failed to get topic metadata");
-            }
-        }
-        if (metadata.partitions > 1) {
-            validateClientVersion();
-        }
-        return metadata;
+        return internalGetPartitionedMetadataAsync(authoritative, checkAllowAutoCreation).join();
+    }
+
+    protected CompletableFuture<PartitionedTopicMetadata> internalGetPartitionedMetadataAsync(
+                                                                          boolean authoritative,
+                                                                          boolean checkAllowAutoCreation) {
+        return getPartitionedTopicMetadataAsync(topicName, authoritative, checkAllowAutoCreation)
+                .thenCompose(metadata -> {
+                    CompletableFuture<Void> ret;
+                    if (metadata.partitions == 0 && !checkAllowAutoCreation) {
+                        // The topic may be a non-partitioned topic, so check if it exists here.
+                        // However, when checkAllowAutoCreation is true, the client will create the topic if
+                        // it doesn't exist. In this case, `partitions == 0` means the automatically created topic
+                        // is a non-partitioned topic so we shouldn't check if the topic exists.
+                        ret = internalCheckTopicExists(topicName);
+                    } else if (metadata.partitions > 1) {
+                        ret = internalValidateClientVersionAsync();
+                    } else {
+                        ret = CompletableFuture.completedFuture(null);

Review comment:
       ```suggestion
                           return CompletableFuture.completedFuture(metadata);
   ```
   This seems more clear?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] github-actions[bot] commented on pull request #14153: [Broker] Make PersistentTopicsBase#internalGetPartitionedMetadata async

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14153:
URL: https://github.com/apache/pulsar/pull/14153#issuecomment-1072916847


   The pr had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #14153: [Broker] Make PersistentTopicsBase#internalGetPartitionedMetadata async

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #14153:
URL: https://github.com/apache/pulsar/pull/14153#discussion_r801232897



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -551,27 +551,38 @@ protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) {
 
     protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean authoritative,
                                                                       boolean checkAllowAutoCreation) {
-        PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(topicName,
-                authoritative, checkAllowAutoCreation);
-        if (metadata.partitions == 0 && !checkAllowAutoCreation) {
-            // The topic may be a non-partitioned topic, so check if it exists here.
-            // However, when checkAllowAutoCreation is true, the client will create the topic if it doesn't exist.
-            // In this case, `partitions == 0` means the automatically created topic is a non-partitioned topic so we
-            // shouldn't check if the topic exists.
-            try {
-                if (!pulsar().getNamespaceService().checkTopicExists(topicName).get()) {
-                    throw new RestException(Status.NOT_FOUND,
-                            new PulsarClientException.NotFoundException("Topic not exist"));
-                }
-            } catch (InterruptedException | ExecutionException e) {
-                log.error("Failed to check if topic '{}' exists", topicName, e);
-                throw new RestException(Status.INTERNAL_SERVER_ERROR, "Failed to get topic metadata");
-            }
-        }
-        if (metadata.partitions > 1) {
-            validateClientVersion();
-        }
-        return metadata;
+        return internalGetPartitionedMetadataAsync(authoritative, checkAllowAutoCreation).join();
+    }
+
+    protected CompletableFuture<PartitionedTopicMetadata> internalGetPartitionedMetadataAsync(
+                                                                          boolean authoritative,
+                                                                          boolean checkAllowAutoCreation) {
+        return getPartitionedTopicMetadataAsync(topicName, authoritative, checkAllowAutoCreation)
+                .thenCompose(metadata -> {
+                    CompletableFuture<Void> ret;
+                    if (metadata.partitions == 0 && !checkAllowAutoCreation) {
+                        // The topic may be a non-partitioned topic, so check if it exists here.
+                        // However, when checkAllowAutoCreation is true, the client will create the topic if
+                        // it doesn't exist. In this case, `partitions == 0` means the automatically created topic
+                        // is a non-partitioned topic so we shouldn't check if the topic exists.
+                        ret = internalCheckTopicExists(topicName);
+                    } else if (metadata.partitions > 1) {
+                        ret = internalValidateClientVersionAsync();
+                    } else {
+                        ret = CompletableFuture.completedFuture(null);

Review comment:
       I think it's ok here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #14153: [Broker] Make PersistentTopicsBase#internalGetPartitionedMetadata async

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #14153:
URL: https://github.com/apache/pulsar/pull/14153#discussion_r802383842



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
##########
@@ -425,18 +425,19 @@ protected void setServletContext(ServletContext servletContext) {
         this.servletContext = servletContext;
     }
 
+    protected PartitionedTopicMetadata getPartitionedTopicMetadata(TopicName topicName,
+                                                                   boolean authoritative,
+                                                                   boolean checkAllowAutoCreation) {
+        return getPartitionedTopicMetadataAsync(topicName, authoritative, checkAllowAutoCreation).join();

Review comment:
       yes, right.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org