You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/06/28 10:03:02 UTC
[pulsar] branch master updated: [improve][broker][PIP-149]make compactionStatus method async (#16231)
This is an automated email from the ASF dual-hosted git repository.
jianghaiting pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 50fa2d4b3f8 [improve][broker][PIP-149]make compactionStatus method async (#16231)
50fa2d4b3f8 is described below
commit 50fa2d4b3f8c2acd84b236824fcfad5ffcbc865a
Author: Qiang Huang <HQ...@users.noreply.github.com>
AuthorDate: Tue Jun 28 18:02:53 2022 +0800
[improve][broker][PIP-149]make compactionStatus method async (#16231)
---
.../pulsar/broker/admin/impl/PersistentTopicsBase.java | 11 +++++------
.../apache/pulsar/broker/admin/v1/PersistentTopics.java | 15 ++++++++++++---
.../apache/pulsar/broker/admin/v2/PersistentTopics.java | 15 ++++++++++++---
3 files changed, 29 insertions(+), 12 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index f2a7dd5fa05..4c5c4a836a7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -4063,12 +4063,11 @@ public class PersistentTopicsBase extends AdminResource {
);
}
- protected LongRunningProcessStatus internalCompactionStatus(boolean authoritative) {
- validateTopicOwnership(topicName, authoritative);
- validateTopicOperation(topicName, TopicOperation.COMPACT);
-
- PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
- return topic.compactionStatus();
+ protected CompletableFuture<LongRunningProcessStatus> internalCompactionStatusAsync(boolean authoritative) {
+ return validateTopicOwnershipAsync(topicName, authoritative)
+ .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.COMPACT))
+ .thenCompose(__ -> getTopicReferenceAsync(topicName))
+ .thenApply(topic -> ((PersistentTopic) topic).compactionStatus());
}
protected void internalTriggerOffload(AsyncResponse asyncResponse,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 0e9753428c6..028c17da3d3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -44,7 +44,6 @@ import javax.ws.rs.core.Response;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.web.RestException;
-import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.ResetCursorData;
import org.apache.pulsar.common.policies.data.AuthAction;
@@ -916,12 +915,22 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 405, message = "Operation not allowed on persistent topic"),
@ApiResponse(code = 404, message = "Topic does not exist, or compaction hasn't run") })
- public LongRunningProcessStatus compactionStatus(
+ public void compactionStatus(
+ @Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(property, cluster, namespace, encodedTopic);
- return internalCompactionStatus(authoritative);
+ internalCompactionStatusAsync(authoritative)
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get the status of a compaction operation for the topic {}",
+ clientAppId(), topicName, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@PUT
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 0e8267c19c2..1fa09d747a1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -48,7 +48,6 @@ import javax.ws.rs.core.Response;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.web.RestException;
-import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.ResetCursorData;
@@ -3006,7 +3005,8 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
- public LongRunningProcessStatus compactionStatus(
+ public void compactionStatus(
+ @Suspended AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@@ -3016,7 +3016,16 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
- return internalCompactionStatus(authoritative);
+ internalCompactionStatusAsync(authoritative)
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get the status of a compaction operation for the topic {}",
+ clientAppId(), topicName, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@PUT