You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/27 06:31:07 UTC

[GitHub] [pulsar] gaozhangmin commented on a diff in pull request #16226: [improve][broker][PIP-149]make examineMessage method async

gaozhangmin commented on code in PR #16226:
URL: https://github.com/apache/pulsar/pull/16226#discussion_r907018449


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -2851,58 +2851,77 @@ protected Response internalPeekNthMessage(String subName, int messagePosition, b
         }
     }
 
-    protected Response internalExamineMessage(String initialPosition, long messagePosition, boolean authoritative) {
+    protected CompletableFuture<Response> internalExamineMessageAsync(String initialPosition, long messagePosition,
+                                                                      boolean authoritative) {
+        CompletableFuture<Void> ret;
         if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-
-        if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName,
-                authoritative, false).partitions > 0) {
-            throw new RestException(Status.METHOD_NOT_ALLOWED,
-                    "Examine messages on a partitioned topic is not allowed, "
-                            + "please try examine message on specific topic partition");
-        }
-        validateTopicOwnership(topicName, authoritative);
-        if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
-            log.error("[{}] Not supported operation of non-persistent topic {} ", clientAppId(), topicName);
-            throw new RestException(Status.METHOD_NOT_ALLOWED,
-                    "Examine messages on a non-persistent topic is not allowed");
+            ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        } else {
+            ret = CompletableFuture.completedFuture(null);
         }
 
-        if (messagePosition < 1) {
-            messagePosition = 1;
+        long messagePositionLocal = messagePosition < 1 ? 1 : messagePosition;
+        String initialPositionLocal = initialPosition == null ? "latest" : initialPosition;
+        if (!topicName.isPartitioned()) {
+            ret = ret.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false))
+                    .thenCompose(partitionedTopicMetadata -> {
+                        if (partitionedTopicMetadata.partitions > 0) {
+                            throw new RestException(Status.METHOD_NOT_ALLOWED,
+                                    "Examine messages on a partitioned topic is not allowed, "
+                                            + "please try examine message on specific topic partition");
+                        } else {
+                            return CompletableFuture.completedFuture(null);
+                        }
+                    });
         }
 
-        if (null == initialPosition) {
-            initialPosition = "latest";
-        }
+        return ret.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))

Review Comment:
   `validateTopicOperationAsync` should be preceded the `getPartitionedTopicMetadataAsync`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org