You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/26 08:08:06 UTC

[GitHub] [pulsar] HQebupt opened a new pull request, #16226: [improve][broker][PIP-149]make examineMessage method async

HQebupt opened a new pull request, #16226:
URL: https://github.com/apache/pulsar/pull/16226

   Master Issue: #14013 
   ### Motivation
   
   See #14365 
   
   ### Verifying this change
   
   Make sure that the change passes the CI checks.
   
   ### Does this pull request potentially affect one of the following parts:
   
   If `yes` was chosen, please highlight the changes
   
   - Dependencies (does it add or upgrade a dependency): (no)
   - The public API: (no)
   - The schema: (no)
   - The default values of configurations: (no)
   - The wire protocol: (no)
   - The rest endpoints: (no)
   - The admin cli options: (no)
   - Anything that affects deployment: (no)
   
   ### Documentation
   Check the box below and label this PR (if you have committer privilege).
   
   Need to update docs? 
   - [x] `doc-not-needed` 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] gaozhangmin commented on a diff in pull request #16226: [improve][broker][PIP-149]make examineMessage method async

Posted by GitBox <gi...@apache.org>.
gaozhangmin commented on code in PR #16226:
URL: https://github.com/apache/pulsar/pull/16226#discussion_r907018449


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -2851,58 +2851,77 @@ protected Response internalPeekNthMessage(String subName, int messagePosition, b
         }
     }
 
-    protected Response internalExamineMessage(String initialPosition, long messagePosition, boolean authoritative) {
+    protected CompletableFuture<Response> internalExamineMessageAsync(String initialPosition, long messagePosition,
+                                                                      boolean authoritative) {
+        CompletableFuture<Void> ret;
         if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-
-        if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName,
-                authoritative, false).partitions > 0) {
-            throw new RestException(Status.METHOD_NOT_ALLOWED,
-                    "Examine messages on a partitioned topic is not allowed, "
-                            + "please try examine message on specific topic partition");
-        }
-        validateTopicOwnership(topicName, authoritative);
-        if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
-            log.error("[{}] Not supported operation of non-persistent topic {} ", clientAppId(), topicName);
-            throw new RestException(Status.METHOD_NOT_ALLOWED,
-                    "Examine messages on a non-persistent topic is not allowed");
+            ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        } else {
+            ret = CompletableFuture.completedFuture(null);
         }
 
-        if (messagePosition < 1) {
-            messagePosition = 1;
+        long messagePositionLocal = messagePosition < 1 ? 1 : messagePosition;
+        String initialPositionLocal = initialPosition == null ? "latest" : initialPosition;
+        if (!topicName.isPartitioned()) {
+            ret = ret.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false))
+                    .thenCompose(partitionedTopicMetadata -> {
+                        if (partitionedTopicMetadata.partitions > 0) {
+                            throw new RestException(Status.METHOD_NOT_ALLOWED,
+                                    "Examine messages on a partitioned topic is not allowed, "
+                                            + "please try examine message on specific topic partition");
+                        } else {
+                            return CompletableFuture.completedFuture(null);
+                        }
+                    });
         }
 
-        if (null == initialPosition) {
-            initialPosition = "latest";
-        }
+        return ret.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))

Review Comment:
   `validateTopicOperationAsync` should be preceded the `getPartitionedTopicMetadataAsync`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on pull request #16226: [improve][broker][PIP-149]make examineMessage method async

Posted by GitBox <gi...@apache.org>.
HQebupt commented on PR #16226:
URL: https://github.com/apache/pulsar/pull/16226#issuecomment-1166471882

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #16226: [improve][broker][PIP-149]make examineMessage method async

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #16226:
URL: https://github.com/apache/pulsar/pull/16226#discussion_r906923034


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -2851,58 +2851,77 @@ protected Response internalPeekNthMessage(String subName, int messagePosition, b
         }
     }
 
-    protected Response internalExamineMessage(String initialPosition, long messagePosition, boolean authoritative) {
+    protected CompletableFuture<Response> internalExamineMessageAsync(String initialPosition, long messagePosition,
+                                                                      boolean authoritative) {
+        CompletableFuture<Void> ret;
         if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-
-        if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName,
-                authoritative, false).partitions > 0) {
-            throw new RestException(Status.METHOD_NOT_ALLOWED,
-                    "Examine messages on a partitioned topic is not allowed, "
-                            + "please try examine message on specific topic partition");
-        }
-        validateTopicOwnership(topicName, authoritative);
-        if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
-            log.error("[{}] Not supported operation of non-persistent topic {} ", clientAppId(), topicName);
-            throw new RestException(Status.METHOD_NOT_ALLOWED,
-                    "Examine messages on a non-persistent topic is not allowed");
+            ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        } else {
+            ret = CompletableFuture.completedFuture(null);
         }
 
-        if (messagePosition < 1) {
-            messagePosition = 1;
+        long messagePositionLocal = messagePosition < 1 ? 1 : messagePosition;
+        String initialPositionLocal = initialPosition == null ? "latest" : initialPosition;
+        if (!topicName.isPartitioned()) {
+            ret = ret.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false))
+                    .thenCompose(partitionedTopicMetadata -> {
+                        if (partitionedTopicMetadata.partitions > 0) {
+                            throw new RestException(Status.METHOD_NOT_ALLOWED,
+                                    "Examine messages on a partitioned topic is not allowed, "
+                                            + "please try examine message on specific topic partition");
+                        } else {
+                            return CompletableFuture.completedFuture(null);
+                        }
+                    });
         }
 
-        if (null == initialPosition) {
-            initialPosition = "latest";
-        }
+        return ret.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
+                .thenCompose(__ -> getTopicReferenceAsync(topicName))
+                .thenCompose(topic -> {
+                    if (!(topic instanceof PersistentTopic)) {
+                        log.error("[{}] Not supported operation of non-persistent topic {} ", clientAppId(), topicName);
+                        throw new RestException(Status.METHOD_NOT_ALLOWED,
+                                "Examine messages on a non-persistent topic is not allowed");
+                    }
+                    try {
+                        PersistentTopic persistentTopic = (PersistentTopic) topic;
+                        long totalMessage = persistentTopic.getNumberOfEntries();
+                        PositionImpl startPosition = persistentTopic.getFirstPosition();
+
+                        long messageToSkip = initialPositionLocal.equals("earliest") ? messagePositionLocal :
+                                totalMessage - messagePositionLocal + 1;
+                        CompletableFuture<Entry> future = new CompletableFuture<>();
+                        PositionImpl readPosition = persistentTopic.getPositionAfterN(startPosition, messageToSkip);
+                        persistentTopic.asyncReadEntry(readPosition, new AsyncCallbacks.ReadEntryCallback() {
+                            @Override
+                            public void readEntryComplete(Entry entry, Object ctx) {
+                                future.complete(entry);
+                            }
 
-        try {
-            PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
-            long totalMessage = topic.getNumberOfEntries();
-            PositionImpl startPosition = topic.getFirstPosition();
-            long messageToSkip =
-                    initialPosition.equals("earliest") ? messagePosition : totalMessage - messagePosition + 1;
-            CompletableFuture<Entry> future = new CompletableFuture<>();
-            PositionImpl readPosition = topic.getPositionAfterN(startPosition, messageToSkip);
-            topic.asyncReadEntry(readPosition, new AsyncCallbacks.ReadEntryCallback() {
-                @Override
-                public void readEntryComplete(Entry entry, Object ctx) {
-                    future.complete(entry);
-                }
+                            @Override
+                            public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+                                future.completeExceptionally(exception);
+                            }
+                        }, null);
+                        return future;
+                    } catch (ManagedLedgerException exception) {
+                        log.error("[{}] Failed to examine message at position {} from {} due to {}", clientAppId(),
+                                messagePosition,
+                                topicName, exception);
+                        throw new RestException(exception);
+                    }
 
-                @Override
-                public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
-                    future.completeExceptionally(exception);
-                }
-            }, null);
-            return generateResponseWithEntry(future.get());
-        } catch (Exception exception) {
-            exception.printStackTrace();
-            log.error("[{}] Failed to examine message at position {} from {} due to {}", clientAppId(), messagePosition,
-                    topicName, exception);
-            throw new RestException(exception);
-        }
+                }).thenCompose(entry -> {

Review Comment:
   `thenApply` is more better ?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on pull request #16226: [improve][broker][PIP-149]make examineMessage method async

Posted by GitBox <gi...@apache.org>.
HQebupt commented on PR #16226:
URL: https://github.com/apache/pulsar/pull/16226#issuecomment-1167073281

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] gaozhangmin commented on pull request #16226: [improve][broker][PIP-149]make examineMessage method async

Posted by GitBox <gi...@apache.org>.
gaozhangmin commented on PR #16226:
URL: https://github.com/apache/pulsar/pull/16226#issuecomment-1167174572

   @Technoboy- PTAL again, changes made.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- merged pull request #16226: [improve][broker][PIP-149]make examineMessage method async

Posted by GitBox <gi...@apache.org>.
Technoboy- merged PR #16226:
URL: https://github.com/apache/pulsar/pull/16226


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org