You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/24 02:14:10 UTC

[GitHub] [pulsar] HQebupt commented on a diff in pull request #16192: [improve][broker][PIP-149]make internalPeekNthMessage method async

HQebupt commented on code in PR #16192:
URL: https://github.com/apache/pulsar/pull/16192#discussion_r905662410


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -2805,50 +2805,63 @@ protected CompletableFuture<MessageId> internalGetMessageIdByTimestamp(long time
         }
     }
 
-    protected Response internalPeekNthMessage(String subName, int messagePosition, boolean authoritative) {
+    protected CompletableFuture<Response> internalPeekNthMessageAsync(String subName, int messagePosition,
+                                                                      boolean authoritative) {
+        CompletableFuture<Void> ret;
         // If the topic name is a partition name, no need to get partition topic metadata again
-        if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName,
-                authoritative, false).partitions > 0) {
-            throw new RestException(Status.METHOD_NOT_ALLOWED, "Peek messages on a partitioned topic is not allowed");
-        }
-
-        validateTopicOwnership(topicName, authoritative);
-        validateTopicOperation(topicName, TopicOperation.PEEK_MESSAGES);
-
-        if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
-            log.error("[{}] Not supported operation of non-persistent topic {} {}", clientAppId(), topicName,
-                    subName);
-            throw new RestException(Status.METHOD_NOT_ALLOWED,
-                    "Peek messages on a non-persistent topic is not allowed");
-        }
-
-        PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
-        PersistentReplicator repl = null;
-        PersistentSubscription sub = null;
-        Entry entry = null;
-        if (subName.startsWith(topic.getReplicatorPrefix())) {
-            repl = getReplicatorReference(subName, topic);
+        if (!topicName.isPartitioned()) {
+            ret = getPartitionedTopicMetadataAsync(topicName, authoritative, false)
+                    .thenCompose(topicMetadata -> {
+                        if (topicMetadata.partitions > 0) {
+                            throw new RestException(Status.METHOD_NOT_ALLOWED,
+                                    "Peek messages on a partitioned topic is not allowed");
+                        }
+                        return CompletableFuture.completedFuture(null);
+                    });
         } else {
-            sub = (PersistentSubscription) getSubscriptionReference(subName, topic);
-        }
-        try {
-            if (subName.startsWith(topic.getReplicatorPrefix())) {
-                entry = repl.peekNthMessage(messagePosition).get();
-            } else {
-                entry = sub.peekNthMessage(messagePosition).get();
-            }
-            return generateResponseWithEntry(entry);
-        } catch (NullPointerException npe) {
-            throw new RestException(Status.NOT_FOUND, "Message not found");
-        } catch (Exception exception) {
-            log.error("[{}] Failed to peek message at position {} from {} {}", clientAppId(), messagePosition,
-                    topicName, subName, exception);
-            throw new RestException(exception);
-        } finally {
-            if (entry != null) {
-                entry.release();
-            }
+            ret = CompletableFuture.completedFuture(null);
         }
+        return ret.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
+                .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES))
+                .thenCompose(__ -> getTopicReferenceAsync(topicName))
+                .thenCompose(topic -> {
+                    CompletableFuture<Entry> entry;
+                    if (!(topic instanceof PersistentTopic)) {
+                        log.error("[{}] Not supported operation of non-persistent topic {} {}", clientAppId(),
+                                topicName, subName);
+                        throw new RestException(Status.METHOD_NOT_ALLOWED,
+                                "Peek messages on a non-persistent topic is not allowed");
+                    } else {
+                        if (subName.startsWith(((PersistentTopic) topic).getReplicatorPrefix())) {
+                            PersistentReplicator repl = getReplicatorReference(subName, (PersistentTopic) topic);
+                            entry = repl.peekNthMessage(messagePosition);
+                        } else {
+                            PersistentSubscription sub =
+                                    (PersistentSubscription) getSubscriptionReference(subName, (PersistentTopic) topic);
+                            entry = sub.peekNthMessage(messagePosition);
+                        }
+                    }
+                    return entry;
+                }).thenCompose(entry -> {
+                    if (entry != null) {
+                        try {
+                            Response response = generateResponseWithEntry(entry);
+                            return CompletableFuture.completedFuture(response);
+                        } catch (NullPointerException npe) {
+                            throw new RestException(Status.NOT_FOUND, "Message not found");

Review Comment:
   It has to, because the `generateResponseWithEntry` method may throw Exception. And the test case `AuthorizationProducerConsumerTest` contains the case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org