You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/23 07:02:11 UTC

[GitHub] [pulsar] HQebupt opened a new pull request, #16192: [improve][broker][PIP-149]make internalPeekNthMessage method async

HQebupt opened a new pull request, #16192:
URL: https://github.com/apache/pulsar/pull/16192

   Master Issue: #14013 
   ### Motivation
   
   See #14365 
   
   ### Verifying this change
   
   Make sure that the change passes the CI checks.
   
   ### Does this pull request potentially affect one of the following parts:
   
   If `yes` was chosen, please highlight the changes
   
   - Dependencies (does it add or upgrade a dependency): (no)
   - The public API: (no)
   - The schema: (no)
   - The default values of configurations: (no)
   - The wire protocol: (no)
   - The rest endpoints: (no)
   - The admin cli options: (no)
   - Anything that affects deployment: (no)
   
   ### Documentation
   Check the box below and label this PR (if you have committer privilege).
   
   Need to update docs? 
   - [x] `doc-not-needed` 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on a diff in pull request #16192: [improve][broker][PIP-149]make internalPeekNthMessage method async

Posted by GitBox <gi...@apache.org>.
HQebupt commented on code in PR #16192:
URL: https://github.com/apache/pulsar/pull/16192#discussion_r905662410


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -2805,50 +2805,63 @@ protected CompletableFuture<MessageId> internalGetMessageIdByTimestamp(long time
         }
     }
 
-    protected Response internalPeekNthMessage(String subName, int messagePosition, boolean authoritative) {
+    protected CompletableFuture<Response> internalPeekNthMessageAsync(String subName, int messagePosition,
+                                                                      boolean authoritative) {
+        CompletableFuture<Void> ret;
         // If the topic name is a partition name, no need to get partition topic metadata again
-        if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName,
-                authoritative, false).partitions > 0) {
-            throw new RestException(Status.METHOD_NOT_ALLOWED, "Peek messages on a partitioned topic is not allowed");
-        }
-
-        validateTopicOwnership(topicName, authoritative);
-        validateTopicOperation(topicName, TopicOperation.PEEK_MESSAGES);
-
-        if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
-            log.error("[{}] Not supported operation of non-persistent topic {} {}", clientAppId(), topicName,
-                    subName);
-            throw new RestException(Status.METHOD_NOT_ALLOWED,
-                    "Peek messages on a non-persistent topic is not allowed");
-        }
-
-        PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
-        PersistentReplicator repl = null;
-        PersistentSubscription sub = null;
-        Entry entry = null;
-        if (subName.startsWith(topic.getReplicatorPrefix())) {
-            repl = getReplicatorReference(subName, topic);
+        if (!topicName.isPartitioned()) {
+            ret = getPartitionedTopicMetadataAsync(topicName, authoritative, false)
+                    .thenCompose(topicMetadata -> {
+                        if (topicMetadata.partitions > 0) {
+                            throw new RestException(Status.METHOD_NOT_ALLOWED,
+                                    "Peek messages on a partitioned topic is not allowed");
+                        }
+                        return CompletableFuture.completedFuture(null);
+                    });
         } else {
-            sub = (PersistentSubscription) getSubscriptionReference(subName, topic);
-        }
-        try {
-            if (subName.startsWith(topic.getReplicatorPrefix())) {
-                entry = repl.peekNthMessage(messagePosition).get();
-            } else {
-                entry = sub.peekNthMessage(messagePosition).get();
-            }
-            return generateResponseWithEntry(entry);
-        } catch (NullPointerException npe) {
-            throw new RestException(Status.NOT_FOUND, "Message not found");
-        } catch (Exception exception) {
-            log.error("[{}] Failed to peek message at position {} from {} {}", clientAppId(), messagePosition,
-                    topicName, subName, exception);
-            throw new RestException(exception);
-        } finally {
-            if (entry != null) {
-                entry.release();
-            }
+            ret = CompletableFuture.completedFuture(null);
         }
+        return ret.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
+                .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES))
+                .thenCompose(__ -> getTopicReferenceAsync(topicName))
+                .thenCompose(topic -> {
+                    CompletableFuture<Entry> entry;
+                    if (!(topic instanceof PersistentTopic)) {
+                        log.error("[{}] Not supported operation of non-persistent topic {} {}", clientAppId(),
+                                topicName, subName);
+                        throw new RestException(Status.METHOD_NOT_ALLOWED,
+                                "Peek messages on a non-persistent topic is not allowed");
+                    } else {
+                        if (subName.startsWith(((PersistentTopic) topic).getReplicatorPrefix())) {
+                            PersistentReplicator repl = getReplicatorReference(subName, (PersistentTopic) topic);
+                            entry = repl.peekNthMessage(messagePosition);
+                        } else {
+                            PersistentSubscription sub =
+                                    (PersistentSubscription) getSubscriptionReference(subName, (PersistentTopic) topic);
+                            entry = sub.peekNthMessage(messagePosition);
+                        }
+                    }
+                    return entry;
+                }).thenCompose(entry -> {
+                    if (entry != null) {
+                        try {
+                            Response response = generateResponseWithEntry(entry);
+                            return CompletableFuture.completedFuture(response);
+                        } catch (NullPointerException npe) {
+                            throw new RestException(Status.NOT_FOUND, "Message not found");

Review Comment:
   It has to, because the `generateResponseWithEntry` method may throw Exception. And the test case `AuthorizationProducerConsumerTest` contains the case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on pull request #16192: [improve][broker][PIP-149]make internalPeekNthMessage method async

Posted by GitBox <gi...@apache.org>.
HQebupt commented on PR #16192:
URL: https://github.com/apache/pulsar/pull/16192#issuecomment-1165110188

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on pull request #16192: [improve][broker][PIP-149]make internalPeekNthMessage method async

Posted by GitBox <gi...@apache.org>.
HQebupt commented on PR #16192:
URL: https://github.com/apache/pulsar/pull/16192#issuecomment-1165204379

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on pull request #16192: [improve][broker][PIP-149]make internalPeekNthMessage method async

Posted by GitBox <gi...@apache.org>.
HQebupt commented on PR #16192:
URL: https://github.com/apache/pulsar/pull/16192#issuecomment-1164030895

   @liudezhi2098 @Technoboy- @mattisonchao PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] AnonHxy commented on a diff in pull request #16192: [improve][broker][PIP-149]make internalPeekNthMessage method async

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on code in PR #16192:
URL: https://github.com/apache/pulsar/pull/16192#discussion_r905201293


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -2805,50 +2805,63 @@ protected CompletableFuture<MessageId> internalGetMessageIdByTimestamp(long time
         }
     }
 
-    protected Response internalPeekNthMessage(String subName, int messagePosition, boolean authoritative) {
+    protected CompletableFuture<Response> internalPeekNthMessageAsync(String subName, int messagePosition,
+                                                                      boolean authoritative) {
+        CompletableFuture<Void> ret;
         // If the topic name is a partition name, no need to get partition topic metadata again
-        if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName,
-                authoritative, false).partitions > 0) {
-            throw new RestException(Status.METHOD_NOT_ALLOWED, "Peek messages on a partitioned topic is not allowed");
-        }
-
-        validateTopicOwnership(topicName, authoritative);
-        validateTopicOperation(topicName, TopicOperation.PEEK_MESSAGES);
-
-        if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
-            log.error("[{}] Not supported operation of non-persistent topic {} {}", clientAppId(), topicName,
-                    subName);
-            throw new RestException(Status.METHOD_NOT_ALLOWED,
-                    "Peek messages on a non-persistent topic is not allowed");
-        }
-
-        PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
-        PersistentReplicator repl = null;
-        PersistentSubscription sub = null;
-        Entry entry = null;
-        if (subName.startsWith(topic.getReplicatorPrefix())) {
-            repl = getReplicatorReference(subName, topic);
+        if (!topicName.isPartitioned()) {
+            ret = getPartitionedTopicMetadataAsync(topicName, authoritative, false)
+                    .thenCompose(topicMetadata -> {
+                        if (topicMetadata.partitions > 0) {
+                            throw new RestException(Status.METHOD_NOT_ALLOWED,
+                                    "Peek messages on a partitioned topic is not allowed");
+                        }
+                        return CompletableFuture.completedFuture(null);
+                    });
         } else {
-            sub = (PersistentSubscription) getSubscriptionReference(subName, topic);
-        }
-        try {
-            if (subName.startsWith(topic.getReplicatorPrefix())) {
-                entry = repl.peekNthMessage(messagePosition).get();
-            } else {
-                entry = sub.peekNthMessage(messagePosition).get();
-            }
-            return generateResponseWithEntry(entry);
-        } catch (NullPointerException npe) {
-            throw new RestException(Status.NOT_FOUND, "Message not found");
-        } catch (Exception exception) {
-            log.error("[{}] Failed to peek message at position {} from {} {}", clientAppId(), messagePosition,
-                    topicName, subName, exception);
-            throw new RestException(exception);
-        } finally {
-            if (entry != null) {
-                entry.release();
-            }
+            ret = CompletableFuture.completedFuture(null);
         }
+        return ret.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
+                .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES))
+                .thenCompose(__ -> getTopicReferenceAsync(topicName))
+                .thenCompose(topic -> {
+                    CompletableFuture<Entry> entry;
+                    if (!(topic instanceof PersistentTopic)) {
+                        log.error("[{}] Not supported operation of non-persistent topic {} {}", clientAppId(),
+                                topicName, subName);
+                        throw new RestException(Status.METHOD_NOT_ALLOWED,
+                                "Peek messages on a non-persistent topic is not allowed");
+                    } else {
+                        if (subName.startsWith(((PersistentTopic) topic).getReplicatorPrefix())) {
+                            PersistentReplicator repl = getReplicatorReference(subName, (PersistentTopic) topic);
+                            entry = repl.peekNthMessage(messagePosition);
+                        } else {
+                            PersistentSubscription sub =
+                                    (PersistentSubscription) getSubscriptionReference(subName, (PersistentTopic) topic);
+                            entry = sub.peekNthMessage(messagePosition);
+                        }
+                    }
+                    return entry;
+                }).thenCompose(entry -> {
+                    if (entry != null) {
+                        try {
+                            Response response = generateResponseWithEntry(entry);
+                            return CompletableFuture.completedFuture(response);
+                        } catch (NullPointerException npe) {
+                            throw new RestException(Status.NOT_FOUND, "Message not found");

Review Comment:
   No need to catch NPE, because we have check `entry != null`before



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on pull request #16192: [improve][broker][PIP-149]make internalPeekNthMessage method async

Posted by GitBox <gi...@apache.org>.
HQebupt commented on PR #16192:
URL: https://github.com/apache/pulsar/pull/16192#issuecomment-1164584065

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] gaozhangmin merged pull request #16192: [improve][broker][PIP-149]make internalPeekNthMessage method async

Posted by GitBox <gi...@apache.org>.
gaozhangmin merged PR #16192:
URL: https://github.com/apache/pulsar/pull/16192


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on pull request #16192: [improve][broker][PIP-149]make internalPeekNthMessage method async

Posted by GitBox <gi...@apache.org>.
HQebupt commented on PR #16192:
URL: https://github.com/apache/pulsar/pull/16192#issuecomment-1165244350

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on pull request #16192: [improve][broker][PIP-149]make internalPeekNthMessage method async

Posted by GitBox <gi...@apache.org>.
HQebupt commented on PR #16192:
URL: https://github.com/apache/pulsar/pull/16192#issuecomment-1164196053

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] AnonHxy commented on a diff in pull request #16192: [improve][broker][PIP-149]make internalPeekNthMessage method async

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on code in PR #16192:
URL: https://github.com/apache/pulsar/pull/16192#discussion_r905201293


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -2805,50 +2805,63 @@ protected CompletableFuture<MessageId> internalGetMessageIdByTimestamp(long time
         }
     }
 
-    protected Response internalPeekNthMessage(String subName, int messagePosition, boolean authoritative) {
+    protected CompletableFuture<Response> internalPeekNthMessageAsync(String subName, int messagePosition,
+                                                                      boolean authoritative) {
+        CompletableFuture<Void> ret;
         // If the topic name is a partition name, no need to get partition topic metadata again
-        if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName,
-                authoritative, false).partitions > 0) {
-            throw new RestException(Status.METHOD_NOT_ALLOWED, "Peek messages on a partitioned topic is not allowed");
-        }
-
-        validateTopicOwnership(topicName, authoritative);
-        validateTopicOperation(topicName, TopicOperation.PEEK_MESSAGES);
-
-        if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
-            log.error("[{}] Not supported operation of non-persistent topic {} {}", clientAppId(), topicName,
-                    subName);
-            throw new RestException(Status.METHOD_NOT_ALLOWED,
-                    "Peek messages on a non-persistent topic is not allowed");
-        }
-
-        PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
-        PersistentReplicator repl = null;
-        PersistentSubscription sub = null;
-        Entry entry = null;
-        if (subName.startsWith(topic.getReplicatorPrefix())) {
-            repl = getReplicatorReference(subName, topic);
+        if (!topicName.isPartitioned()) {
+            ret = getPartitionedTopicMetadataAsync(topicName, authoritative, false)
+                    .thenCompose(topicMetadata -> {
+                        if (topicMetadata.partitions > 0) {
+                            throw new RestException(Status.METHOD_NOT_ALLOWED,
+                                    "Peek messages on a partitioned topic is not allowed");
+                        }
+                        return CompletableFuture.completedFuture(null);
+                    });
         } else {
-            sub = (PersistentSubscription) getSubscriptionReference(subName, topic);
-        }
-        try {
-            if (subName.startsWith(topic.getReplicatorPrefix())) {
-                entry = repl.peekNthMessage(messagePosition).get();
-            } else {
-                entry = sub.peekNthMessage(messagePosition).get();
-            }
-            return generateResponseWithEntry(entry);
-        } catch (NullPointerException npe) {
-            throw new RestException(Status.NOT_FOUND, "Message not found");
-        } catch (Exception exception) {
-            log.error("[{}] Failed to peek message at position {} from {} {}", clientAppId(), messagePosition,
-                    topicName, subName, exception);
-            throw new RestException(exception);
-        } finally {
-            if (entry != null) {
-                entry.release();
-            }
+            ret = CompletableFuture.completedFuture(null);
         }
+        return ret.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
+                .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES))
+                .thenCompose(__ -> getTopicReferenceAsync(topicName))
+                .thenCompose(topic -> {
+                    CompletableFuture<Entry> entry;
+                    if (!(topic instanceof PersistentTopic)) {
+                        log.error("[{}] Not supported operation of non-persistent topic {} {}", clientAppId(),
+                                topicName, subName);
+                        throw new RestException(Status.METHOD_NOT_ALLOWED,
+                                "Peek messages on a non-persistent topic is not allowed");
+                    } else {
+                        if (subName.startsWith(((PersistentTopic) topic).getReplicatorPrefix())) {
+                            PersistentReplicator repl = getReplicatorReference(subName, (PersistentTopic) topic);
+                            entry = repl.peekNthMessage(messagePosition);
+                        } else {
+                            PersistentSubscription sub =
+                                    (PersistentSubscription) getSubscriptionReference(subName, (PersistentTopic) topic);
+                            entry = sub.peekNthMessage(messagePosition);
+                        }
+                    }
+                    return entry;
+                }).thenCompose(entry -> {
+                    if (entry != null) {
+                        try {
+                            Response response = generateResponseWithEntry(entry);
+                            return CompletableFuture.completedFuture(response);
+                        } catch (NullPointerException npe) {
+                            throw new RestException(Status.NOT_FOUND, "Message not found");

Review Comment:
   No need to catch NPE, because we have checked `entry != null`before



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] HQebupt commented on pull request #16192: [improve][broker][PIP-149]make internalPeekNthMessage method async

Posted by GitBox <gi...@apache.org>.
HQebupt commented on PR #16192:
URL: https://github.com/apache/pulsar/pull/16192#issuecomment-1165566973

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] AnonHxy commented on a diff in pull request #16192: [improve][broker][PIP-149]make internalPeekNthMessage method async

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on code in PR #16192:
URL: https://github.com/apache/pulsar/pull/16192#discussion_r905203394


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -2805,50 +2805,63 @@ protected CompletableFuture<MessageId> internalGetMessageIdByTimestamp(long time
         }
     }
 
-    protected Response internalPeekNthMessage(String subName, int messagePosition, boolean authoritative) {
+    protected CompletableFuture<Response> internalPeekNthMessageAsync(String subName, int messagePosition,
+                                                                      boolean authoritative) {
+        CompletableFuture<Void> ret;
         // If the topic name is a partition name, no need to get partition topic metadata again
-        if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName,
-                authoritative, false).partitions > 0) {
-            throw new RestException(Status.METHOD_NOT_ALLOWED, "Peek messages on a partitioned topic is not allowed");
-        }
-
-        validateTopicOwnership(topicName, authoritative);
-        validateTopicOperation(topicName, TopicOperation.PEEK_MESSAGES);
-
-        if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
-            log.error("[{}] Not supported operation of non-persistent topic {} {}", clientAppId(), topicName,
-                    subName);
-            throw new RestException(Status.METHOD_NOT_ALLOWED,
-                    "Peek messages on a non-persistent topic is not allowed");
-        }
-
-        PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
-        PersistentReplicator repl = null;
-        PersistentSubscription sub = null;
-        Entry entry = null;
-        if (subName.startsWith(topic.getReplicatorPrefix())) {
-            repl = getReplicatorReference(subName, topic);
+        if (!topicName.isPartitioned()) {
+            ret = getPartitionedTopicMetadataAsync(topicName, authoritative, false)
+                    .thenCompose(topicMetadata -> {
+                        if (topicMetadata.partitions > 0) {
+                            throw new RestException(Status.METHOD_NOT_ALLOWED,
+                                    "Peek messages on a partitioned topic is not allowed");
+                        }
+                        return CompletableFuture.completedFuture(null);
+                    });
         } else {
-            sub = (PersistentSubscription) getSubscriptionReference(subName, topic);
-        }
-        try {
-            if (subName.startsWith(topic.getReplicatorPrefix())) {
-                entry = repl.peekNthMessage(messagePosition).get();
-            } else {
-                entry = sub.peekNthMessage(messagePosition).get();
-            }
-            return generateResponseWithEntry(entry);
-        } catch (NullPointerException npe) {
-            throw new RestException(Status.NOT_FOUND, "Message not found");
-        } catch (Exception exception) {
-            log.error("[{}] Failed to peek message at position {} from {} {}", clientAppId(), messagePosition,
-                    topicName, subName, exception);
-            throw new RestException(exception);
-        } finally {
-            if (entry != null) {
-                entry.release();
-            }
+            ret = CompletableFuture.completedFuture(null);
         }
+        return ret.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
+                .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES))
+                .thenCompose(__ -> getTopicReferenceAsync(topicName))
+                .thenCompose(topic -> {
+                    CompletableFuture<Entry> entry;
+                    if (!(topic instanceof PersistentTopic)) {
+                        log.error("[{}] Not supported operation of non-persistent topic {} {}", clientAppId(),
+                                topicName, subName);
+                        throw new RestException(Status.METHOD_NOT_ALLOWED,
+                                "Peek messages on a non-persistent topic is not allowed");
+                    } else {
+                        if (subName.startsWith(((PersistentTopic) topic).getReplicatorPrefix())) {
+                            PersistentReplicator repl = getReplicatorReference(subName, (PersistentTopic) topic);
+                            entry = repl.peekNthMessage(messagePosition);
+                        } else {
+                            PersistentSubscription sub =
+                                    (PersistentSubscription) getSubscriptionReference(subName, (PersistentTopic) topic);
+                            entry = sub.peekNthMessage(messagePosition);
+                        }
+                    }
+                    return entry;
+                }).thenCompose(entry -> {
+                    if (entry != null) {
+                        try {
+                            Response response = generateResponseWithEntry(entry);
+                            return CompletableFuture.completedFuture(response);
+                        } catch (NullPointerException npe) {
+                            throw new RestException(Status.NOT_FOUND, "Message not found");
+                        } catch (Exception exception) {
+                            log.error("[{}] Failed to peek message at position {} from {} {}", clientAppId(),
+                                    messagePosition, topicName, subName, exception);
+                            throw new RestException(exception);
+                        } finally {
+                            if (entry != null) {

Review Comment:
   Can entry be null?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org