You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2022/06/25 11:41:53 UTC
[pulsar] branch master updated: [improve][broker][PIP-149]make internalPeekNthMessage method async (#16192)
This is an automated email from the ASF dual-hosted git repository.
zhangmingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ce24db11ada [improve][broker][PIP-149]make internalPeekNthMessage method async (#16192)
ce24db11ada is described below
commit ce24db11adaf60d9cfb302ce39e76cdc9e43bd2d
Author: Qiang Huang <HQ...@users.noreply.github.com>
AuthorDate: Sat Jun 25 19:41:40 2022 +0800
[improve][broker][PIP-149]make internalPeekNthMessage method async (#16192)
* [improve][broker][PIP-149]make internalPeekNthMessage method async
* fix UT
---
.../broker/admin/impl/PersistentTopicsBase.java | 91 ++++++++++++----------
.../pulsar/broker/admin/v1/PersistentTopics.java | 15 +++-
.../pulsar/broker/admin/v2/PersistentTopics.java | 14 +++-
3 files changed, 75 insertions(+), 45 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 8b84cd74d74..5ec93241921 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2805,50 +2805,59 @@ public class PersistentTopicsBase extends AdminResource {
}
}
- protected Response internalPeekNthMessage(String subName, int messagePosition, boolean authoritative) {
+ protected CompletableFuture<Response> internalPeekNthMessageAsync(String subName, int messagePosition,
+ boolean authoritative) {
+ CompletableFuture<Void> ret;
// If the topic name is a partition name, no need to get partition topic metadata again
- if (!topicName.isPartitioned() && getPartitionedTopicMetadata(topicName,
- authoritative, false).partitions > 0) {
- throw new RestException(Status.METHOD_NOT_ALLOWED, "Peek messages on a partitioned topic is not allowed");
- }
-
- validateTopicOwnership(topicName, authoritative);
- validateTopicOperation(topicName, TopicOperation.PEEK_MESSAGES);
-
- if (!(getTopicReference(topicName) instanceof PersistentTopic)) {
- log.error("[{}] Not supported operation of non-persistent topic {} {}", clientAppId(), topicName,
- subName);
- throw new RestException(Status.METHOD_NOT_ALLOWED,
- "Peek messages on a non-persistent topic is not allowed");
- }
-
- PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
- PersistentReplicator repl = null;
- PersistentSubscription sub = null;
- Entry entry = null;
- if (subName.startsWith(topic.getReplicatorPrefix())) {
- repl = getReplicatorReference(subName, topic);
+ if (!topicName.isPartitioned()) {
+ ret = getPartitionedTopicMetadataAsync(topicName, authoritative, false)
+ .thenCompose(topicMetadata -> {
+ if (topicMetadata.partitions > 0) {
+ throw new RestException(Status.METHOD_NOT_ALLOWED,
+ "Peek messages on a partitioned topic is not allowed");
+ }
+ return CompletableFuture.completedFuture(null);
+ });
} else {
- sub = (PersistentSubscription) getSubscriptionReference(subName, topic);
- }
- try {
- if (subName.startsWith(topic.getReplicatorPrefix())) {
- entry = repl.peekNthMessage(messagePosition).get();
- } else {
- entry = sub.peekNthMessage(messagePosition).get();
- }
- return generateResponseWithEntry(entry);
- } catch (NullPointerException npe) {
- throw new RestException(Status.NOT_FOUND, "Message not found");
- } catch (Exception exception) {
- log.error("[{}] Failed to peek message at position {} from {} {}", clientAppId(), messagePosition,
- topicName, subName, exception);
- throw new RestException(exception);
- } finally {
- if (entry != null) {
- entry.release();
- }
+ ret = CompletableFuture.completedFuture(null);
}
+ return ret.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
+ .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.PEEK_MESSAGES))
+ .thenCompose(__ -> getTopicReferenceAsync(topicName))
+ .thenCompose(topic -> {
+ CompletableFuture<Entry> entry;
+ if (!(topic instanceof PersistentTopic)) {
+ log.error("[{}] Not supported operation of non-persistent topic {} {}", clientAppId(),
+ topicName, subName);
+ throw new RestException(Status.METHOD_NOT_ALLOWED,
+ "Peek messages on a non-persistent topic is not allowed");
+ } else {
+ if (subName.startsWith(((PersistentTopic) topic).getReplicatorPrefix())) {
+ PersistentReplicator repl = getReplicatorReference(subName, (PersistentTopic) topic);
+ entry = repl.peekNthMessage(messagePosition);
+ } else {
+ PersistentSubscription sub =
+ (PersistentSubscription) getSubscriptionReference(subName, (PersistentTopic) topic);
+ entry = sub.peekNthMessage(messagePosition);
+ }
+ }
+ return entry;
+ }).thenCompose(entry -> {
+ try {
+ Response response = generateResponseWithEntry(entry);
+ return CompletableFuture.completedFuture(response);
+ } catch (NullPointerException npe) {
+ throw new RestException(Status.NOT_FOUND, "Message not found");
+ } catch (Exception exception) {
+ log.error("[{}] Failed to peek message at position {} from {} {}", clientAppId(),
+ messagePosition, topicName, subName, exception);
+ throw new RestException(exception);
+ } finally {
+ if (entry != null) {
+ entry.release();
+ }
+ }
+ });
}
protected Response internalExamineMessage(String initialPosition, long messagePosition, boolean authoritative) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index c6c1e189984..e492f799195 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -740,12 +740,23 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic, subscription or the message position does not exist") })
- public Response peekNthMessage(@PathParam("property") String property, @PathParam("cluster") String cluster,
+ public void peekNthMessage(
+ @Suspended final AsyncResponse asyncResponse,
+ @PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
@PathParam("subName") String encodedSubName, @PathParam("messagePosition") int messagePosition,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(property, cluster, namespace, encodedTopic);
- return internalPeekNthMessage(decode(encodedSubName), messagePosition, authoritative);
+ internalPeekNthMessageAsync(decode(encodedSubName), messagePosition, authoritative)
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get peek nth message for topic {} subscription {}", clientAppId(),
+ topicName, decode(encodedSubName), ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index ad063381343..88067344949 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1694,7 +1694,8 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
- public Response peekNthMessage(
+ public void peekNthMessage(
+ @Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@@ -1708,7 +1709,16 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- return internalPeekNthMessage(decode(encodedSubName), messagePosition, authoritative);
+ internalPeekNthMessageAsync(decode(encodedSubName), messagePosition, authoritative)
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get peek nth message for topic {} subscription {}", clientAppId(),
+ topicName, decode(encodedSubName), ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET