You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/02/08 02:07:27 UTC

[GitHub] [pulsar] RobertIndie commented on a change in pull request #14153: [Broker] Make PersistentTopicsBase#internalGetPartitionedMetadata async

RobertIndie commented on a change in pull request #14153:
URL: https://github.com/apache/pulsar/pull/14153#discussion_r801208457



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2515,8 +2526,8 @@ public void readEntryComplete(Entry entry, Object ctx) {
 
     protected Response internalPeekNthMessage(String subName, int messagePosition, boolean authoritative) {
         // If the topic name is a partition name, no need to get partition topic metadata again
-        if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName,
-                authoritative, false).partitions > 0) {
+        if (!topicName.isPartitioned() && getPartitionedTopicMetadataAsync(topicName,

Review comment:
       It seems that changing here will not make any difference. Since https://github.com/apache/pulsar/pull/14039 is fixing the call sync method here. I think it's better not to change here.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -551,27 +551,38 @@ protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) {
 
     protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean authoritative,
                                                                       boolean checkAllowAutoCreation) {
-        PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(topicName,
-                authoritative, checkAllowAutoCreation);
-        if (metadata.partitions == 0 && !checkAllowAutoCreation) {
-            // The topic may be a non-partitioned topic, so check if it exists here.
-            // However, when checkAllowAutoCreation is true, the client will create the topic if it doesn't exist.
-            // In this case, `partitions == 0` means the automatically created topic is a non-partitioned topic so we
-            // shouldn't check if the topic exists.
-            try {
-                if (!pulsar().getNamespaceService().checkTopicExists(topicName).get()) {
-                    throw new RestException(Status.NOT_FOUND,
-                            new PulsarClientException.NotFoundException("Topic not exist"));
-                }
-            } catch (InterruptedException | ExecutionException e) {
-                log.error("Failed to check if topic '{}' exists", topicName, e);
-                throw new RestException(Status.INTERNAL_SERVER_ERROR, "Failed to get topic metadata");
-            }
-        }
-        if (metadata.partitions > 1) {
-            validateClientVersion();
-        }
-        return metadata;
+        return internalGetPartitionedMetadataAsync(authoritative, checkAllowAutoCreation).join();
+    }
+
+    protected CompletableFuture<PartitionedTopicMetadata> internalGetPartitionedMetadataAsync(
+                                                                          boolean authoritative,
+                                                                          boolean checkAllowAutoCreation) {
+        return getPartitionedTopicMetadataAsync(topicName, authoritative, checkAllowAutoCreation)
+                .thenCompose(metadata -> {
+                    CompletableFuture<Void> ret;
+                    if (metadata.partitions == 0 && !checkAllowAutoCreation) {
+                        // The topic may be a non-partitioned topic, so check if it exists here.
+                        // However, when checkAllowAutoCreation is true, the client will create the topic if
+                        // it doesn't exist. In this case, `partitions == 0` means the automatically created topic
+                        // is a non-partitioned topic so we shouldn't check if the topic exists.
+                        ret = internalCheckTopicExists(topicName);
+                    } else if (metadata.partitions > 1) {
+                        ret = internalValidateClientVersionAsync();
+                    } else {
+                        ret = CompletableFuture.completedFuture(null);

Review comment:
       ```suggestion
                           return CompletableFuture.completedFuture(metadata);
   ```
   This seems more clear?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org