You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/06/27 11:49:18 UTC

[pulsar] branch master updated: [improve][broker][PIP-149]make examineMessage method async (#16226)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d03731db69 [improve][broker][PIP-149]make examineMessage method async (#16226)
0d03731db69 is described below

commit 0d03731db698085eeaee4d56218ee739322867a8
Author: Qiang Huang <HQ...@users.noreply.github.com>
AuthorDate: Mon Jun 27 19:49:06 2022 +0800

    [improve][broker][PIP-149]make examineMessage method async (#16226)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 110 ++++++++++++---------
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  14 ++-
 2 files changed, 76 insertions(+), 48 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 75ba3b2c7db..c95928d9029 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2857,58 +2857,76 @@ public class PersistentTopicsBase extends AdminResource {
                 });
     }
 
-    protected Response internalExamineMessage(String initialPosition, long messagePosition, boolean authoritative) {
+    protected CompletableFuture<Response> internalExamineMessageAsync(String initialPosition, long messagePosition,
+                                                                      boolean authoritative) {
+        CompletableFuture<Void> ret;
         if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-
-        if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName,
-                authoritative, false).partitions > 0) {
-            throw new RestException(Status.METHOD_NOT_ALLOWED,
-                    "Examine messages on a partitioned topic is not allowed, "
-                            + "please try examine message on specific topic partition");
-        }
-        validateTopicOwnership(topicName, authoritative);
-        if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
-            log.error("[{}] Not supported operation of non-persistent topic {} ", clientAppId(), topicName);
-            throw new RestException(Status.METHOD_NOT_ALLOWED,
-                    "Examine messages on a non-persistent topic is not allowed");
-        }
-
-        if (messagePosition < 1) {
-            messagePosition = 1;
+            ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        } else {
+            ret = CompletableFuture.completedFuture(null);
         }
 
-        if (null == initialPosition) {
-            initialPosition = "latest";
+        ret = ret.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative));
+        long messagePositionLocal = messagePosition < 1 ? 1 : messagePosition;
+        String initialPositionLocal = initialPosition == null ? "latest" : initialPosition;
+        if (!topicName.isPartitioned()) {
+            ret = ret.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false))
+                    .thenCompose(partitionedTopicMetadata -> {
+                        if (partitionedTopicMetadata.partitions > 0) {
+                            throw new RestException(Status.METHOD_NOT_ALLOWED,
+                                    "Examine messages on a partitioned topic is not allowed, "
+                                            + "please try examine message on specific topic partition");
+                        } else {
+                            return CompletableFuture.completedFuture(null);
+                        }
+                    });
         }
+        return ret.thenCompose(__ -> getTopicReferenceAsync(topicName))
+                .thenCompose(topic -> {
+                    if (!(topic instanceof PersistentTopic)) {
+                        log.error("[{}] Not supported operation of non-persistent topic {} ", clientAppId(), topicName);
+                        throw new RestException(Status.METHOD_NOT_ALLOWED,
+                                "Examine messages on a non-persistent topic is not allowed");
+                    }
+                    try {
+                        PersistentTopic persistentTopic = (PersistentTopic) topic;
+                        long totalMessage = persistentTopic.getNumberOfEntries();
+                        PositionImpl startPosition = persistentTopic.getFirstPosition();
+
+                        long messageToSkip = initialPositionLocal.equals("earliest") ? messagePositionLocal :
+                                totalMessage - messagePositionLocal + 1;
+                        CompletableFuture<Entry> future = new CompletableFuture<>();
+                        PositionImpl readPosition = persistentTopic.getPositionAfterN(startPosition, messageToSkip);
+                        persistentTopic.asyncReadEntry(readPosition, new AsyncCallbacks.ReadEntryCallback() {
+                            @Override
+                            public void readEntryComplete(Entry entry, Object ctx) {
+                                future.complete(entry);
+                            }
 
-        try {
-            PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
-            long totalMessage = topic.getNumberOfEntries();
-            PositionImpl startPosition = topic.getFirstPosition();
-            long messageToSkip =
-                    initialPosition.equals("earliest") ? messagePosition : totalMessage - messagePosition + 1;
-            CompletableFuture<Entry> future = new CompletableFuture<>();
-            PositionImpl readPosition = topic.getPositionAfterN(startPosition, messageToSkip);
-            topic.asyncReadEntry(readPosition, new AsyncCallbacks.ReadEntryCallback() {
-                @Override
-                public void readEntryComplete(Entry entry, Object ctx) {
-                    future.complete(entry);
-                }
+                            @Override
+                            public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+                                future.completeExceptionally(exception);
+                            }
+                        }, null);
+                        return future;
+                    } catch (ManagedLedgerException exception) {
+                        log.error("[{}] Failed to examine message at position {} from {} due to {}", clientAppId(),
+                                messagePosition,
+                                topicName, exception);
+                        throw new RestException(exception);
+                    }
 
-                @Override
-                public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
-                    future.completeExceptionally(exception);
-                }
-            }, null);
-            return generateResponseWithEntry(future.get());
-        } catch (Exception exception) {
-            exception.printStackTrace();
-            log.error("[{}] Failed to examine message at position {} from {} due to {}", clientAppId(), messagePosition,
-                    topicName, exception);
-            throw new RestException(exception);
-        }
+                }).thenApply(entry -> {
+                    try {
+                        return generateResponseWithEntry(entry);
+                    } catch (IOException exception) {
+                        throw new RestException(exception);
+                    } finally {
+                        if (entry != null) {
+                            entry.release();
+                        }
+                    }
+                });
     }
 
     private Response generateResponseWithEntry(Entry entry) throws IOException {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 55bacfec03c..bfcbd3169d3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1743,7 +1743,8 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 405, message = "If given partitioned topic"),
             @ApiResponse(code = 412, message = "Topic name is not valid"),
             @ApiResponse(code = 500, message = "Internal server error")})
-    public Response examineMessage(
+    public void examineMessage(
+            @Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "Specify the tenant", required = true)
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the namespace", required = true)
@@ -1761,7 +1762,16 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        return internalExamineMessage(initialPosition, messagePosition, authoritative);
+        internalExamineMessageAsync(initialPosition, messagePosition, authoritative)
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to examine a specific message on the topic {}", clientAppId(), topicName,
+                                ex);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET