You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2022/06/25 11:41:53 UTC

[pulsar] branch master updated: [improve][broker][PIP-149]make internalPeekNthMessage method async (#16192)

This is an automated email from the ASF dual-hosted git repository.

zhangmingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ce24db11ada [improve][broker][PIP-149]make internalPeekNthMessage method async (#16192)
ce24db11ada is described below

commit ce24db11adaf60d9cfb302ce39e76cdc9e43bd2d
Author: Qiang Huang <HQ...@users.noreply.github.com>
AuthorDate: Sat Jun 25 19:41:40 2022 +0800

    [improve][broker][PIP-149]make internalPeekNthMessage method async (#16192)
    
    * [improve][broker][PIP-149]make internalPeekNthMessage method async
    
    * fix UT
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 91 ++++++++++++----------
 .../pulsar/broker/admin/v1/PersistentTopics.java   | 15 +++-
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 14 +++-
 3 files changed, 75 insertions(+), 45 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 8b84cd74d74..5ec93241921 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2805,50 +2805,59 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
-    protected Response internalPeekNthMessage(String subName, int messagePosition, boolean authoritative) {
+    protected CompletableFuture<Response> internalPeekNthMessageAsync(String subName, int messagePosition,
+                                                                      boolean authoritative) {
+        CompletableFuture<Void> ret;
         // If the topic name is a partition name, no need to get partition topic metadata again
-        if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName,
-                authoritative, false).partitions > 0) {
-            throw new RestException(Status.METHOD_NOT_ALLOWED, "Peek messages on a partitioned topic is not allowed");
-        }
-
-        validateTopicOwnership(topicName, authoritative);
-        validateTopicOperation(topicName, TopicOperation.PEEK_MESSAGES);
-
-        if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
-            log.error("[{}] Not supported operation of non-persistent topic {} {}", clientAppId(), topicName,
-                    subName);
-            throw new RestException(Status.METHOD_NOT_ALLOWED,
-                    "Peek messages on a non-persistent topic is not allowed");
-        }
-
-        PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
-        PersistentReplicator repl = null;
-        PersistentSubscription sub = null;
-        Entry entry = null;
-        if (subName.startsWith(topic.getReplicatorPrefix())) {
-            repl = getReplicatorReference(subName, topic);
+        if (!topicName.isPartitioned()) {
+            ret = getPartitionedTopicMetadataAsync(topicName, authoritative, false)
+                    .thenCompose(topicMetadata -> {
+                        if (topicMetadata.partitions > 0) {
+                            throw new RestException(Status.METHOD_NOT_ALLOWED,
+                                    "Peek messages on a partitioned topic is not allowed");
+                        }
+                        return CompletableFuture.completedFuture(null);
+                    });
         } else {
-            sub = (PersistentSubscription) getSubscriptionReference(subName, topic);
-        }
-        try {
-            if (subName.startsWith(topic.getReplicatorPrefix())) {
-                entry = repl.peekNthMessage(messagePosition).get();
-            } else {
-                entry = sub.peekNthMessage(messagePosition).get();
-            }
-            return generateResponseWithEntry(entry);
-        } catch (NullPointerException npe) {
-            throw new RestException(Status.NOT_FOUND, "Message not found");
-        } catch (Exception exception) {
-            log.error("[{}] Failed to peek message at position {} from {} {}", clientAppId(), messagePosition,
-                    topicName, subName, exception);
-            throw new RestException(exception);
-        } finally {
-            if (entry != null) {
-                entry.release();
-            }
+            ret = CompletableFuture.completedFuture(null);
         }
+        return ret.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
+                .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES))
+                .thenCompose(__ -> getTopicReferenceAsync(topicName))
+                .thenCompose(topic -> {
+                    CompletableFuture<Entry> entry;
+                    if (!(topic instanceof PersistentTopic)) {
+                        log.error("[{}] Not supported operation of non-persistent topic {} {}", clientAppId(),
+                                topicName, subName);
+                        throw new RestException(Status.METHOD_NOT_ALLOWED,
+                                "Peek messages on a non-persistent topic is not allowed");
+                    } else {
+                        if (subName.startsWith(((PersistentTopic) topic).getReplicatorPrefix())) {
+                            PersistentReplicator repl = getReplicatorReference(subName, (PersistentTopic) topic);
+                            entry = repl.peekNthMessage(messagePosition);
+                        } else {
+                            PersistentSubscription sub =
+                                    (PersistentSubscription) getSubscriptionReference(subName, (PersistentTopic) topic);
+                            entry = sub.peekNthMessage(messagePosition);
+                        }
+                    }
+                    return entry;
+                }).thenCompose(entry -> {
+                    try {
+                        Response response = generateResponseWithEntry(entry);
+                        return CompletableFuture.completedFuture(response);
+                    } catch (NullPointerException npe) {
+                        throw new RestException(Status.NOT_FOUND, "Message not found");
+                    } catch (Exception exception) {
+                        log.error("[{}] Failed to peek message at position {} from {} {}", clientAppId(),
+                                messagePosition, topicName, subName, exception);
+                        throw new RestException(exception);
+                    } finally {
+                        if (entry != null) {
+                            entry.release();
+                        }
+                    }
+                });
     }
 
     protected Response internalExamineMessage(String initialPosition, long messagePosition, boolean authoritative) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index c6c1e189984..e492f799195 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -740,12 +740,23 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
             @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Topic, subscription or the message position does not exist") })
-    public Response peekNthMessage(@PathParam("property") String property, @PathParam("cluster") String cluster,
+    public void peekNthMessage(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
             @PathParam("subName") String encodedSubName, @PathParam("messagePosition") int messagePosition,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(property, cluster, namespace, encodedTopic);
-        return internalPeekNthMessage(decode(encodedSubName), messagePosition, authoritative);
+        internalPeekNthMessageAsync(decode(encodedSubName), messagePosition, authoritative)
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to get peek nth message for topic {} subscription {}", clientAppId(),
+                                topicName, decode(encodedSubName), ex);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index ad063381343..88067344949 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1694,7 +1694,8 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 412, message = "Topic name is not valid"),
             @ApiResponse(code = 500, message = "Internal server error"),
             @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
-    public Response peekNthMessage(
+    public void peekNthMessage(
+            @Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "Specify the tenant", required = true)
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the namespace", required = true)
@@ -1708,7 +1709,16 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        return internalPeekNthMessage(decode(encodedSubName), messagePosition, authoritative);
+        internalPeekNthMessageAsync(decode(encodedSubName), messagePosition, authoritative)
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to get peek nth message for topic {} subscription {}", clientAppId(),
+                                topicName, decode(encodedSubName), ex);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET