You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/06/28 10:03:02 UTC

[pulsar] branch master updated: [improve][broker][PIP-149]make compactionStatus method async (#16231)

This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 50fa2d4b3f8 [improve][broker][PIP-149]make compactionStatus method async (#16231)
50fa2d4b3f8 is described below

commit 50fa2d4b3f8c2acd84b236824fcfad5ffcbc865a
Author: Qiang Huang <HQ...@users.noreply.github.com>
AuthorDate: Tue Jun 28 18:02:53 2022 +0800

    [improve][broker][PIP-149]make compactionStatus method async (#16231)
---
 .../pulsar/broker/admin/impl/PersistentTopicsBase.java    | 11 +++++------
 .../apache/pulsar/broker/admin/v1/PersistentTopics.java   | 15 ++++++++++++---
 .../apache/pulsar/broker/admin/v2/PersistentTopics.java   | 15 ++++++++++++---
 3 files changed, 29 insertions(+), 12 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index f2a7dd5fa05..4c5c4a836a7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -4063,12 +4063,11 @@ public class PersistentTopicsBase extends AdminResource {
         );
     }
 
-    protected LongRunningProcessStatus internalCompactionStatus(boolean authoritative) {
-        validateTopicOwnership(topicName, authoritative);
-        validateTopicOperation(topicName, TopicOperation.COMPACT);
-
-        PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
-        return topic.compactionStatus();
+    protected CompletableFuture<LongRunningProcessStatus> internalCompactionStatusAsync(boolean authoritative) {
+        return validateTopicOwnershipAsync(topicName, authoritative)
+                .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.COMPACT))
+                .thenCompose(__ -> getTopicReferenceAsync(topicName))
+                .thenApply(topic -> ((PersistentTopic) topic).compactionStatus());
     }
 
     protected void internalTriggerOffload(AsyncResponse asyncResponse,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 0e9753428c6..028c17da3d3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -44,7 +44,6 @@ import javax.ws.rs.core.Response;
 import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.web.RestException;
-import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.ResetCursorData;
 import org.apache.pulsar.common.policies.data.AuthAction;
@@ -916,12 +915,22 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 405, message = "Operation not allowed on persistent topic"),
             @ApiResponse(code = 404, message = "Topic does not exist, or compaction hasn't run") })
-    public LongRunningProcessStatus compactionStatus(
+    public void compactionStatus(
+            @Suspended final AsyncResponse asyncResponse,
             @PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(property, cluster, namespace, encodedTopic);
-        return internalCompactionStatus(authoritative);
+        internalCompactionStatusAsync(authoritative)
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to get the status of a compaction operation for the topic {}",
+                                clientAppId(), topicName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @PUT
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 0e8267c19c2..1fa09d747a1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -48,7 +48,6 @@ import javax.ws.rs.core.Response;
 import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.web.RestException;
-import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.ResetCursorData;
@@ -3006,7 +3005,8 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 412, message = "Topic name is not valid"),
             @ApiResponse(code = 500, message = "Internal server error"),
             @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
-    public LongRunningProcessStatus compactionStatus(
+    public void compactionStatus(
+            @Suspended AsyncResponse asyncResponse,
             @ApiParam(value = "Specify the tenant", required = true)
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the namespace", required = true)
@@ -3016,7 +3016,16 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
-        return internalCompactionStatus(authoritative);
+        internalCompactionStatusAsync(authoritative)
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to get the status of a compaction operation for the topic {}",
+                                clientAppId(), topicName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @PUT