You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/06/22 01:28:25 UTC
[pulsar] branch master updated: [improve][broker][PIP-149]make getInternalStats method async (#16141)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 54e9c7577c7 [improve][broker][PIP-149]make getInternalStats method async (#16141)
54e9c7577c7 is described below
commit 54e9c7577c711f3756d191e625885573a90c54d7
Author: Qiang Huang <HQ...@users.noreply.github.com>
AuthorDate: Wed Jun 22 09:28:17 2022 +0800
[improve][broker][PIP-149]make getInternalStats method async (#16141)
---
.../broker/admin/impl/PersistentTopicsBase.java | 33 +++++++++--------
.../broker/admin/v1/NonPersistentTopics.java | 42 ++++++++++++----------
.../pulsar/broker/admin/v1/PersistentTopics.java | 15 ++++++--
.../broker/admin/v2/NonPersistentTopics.java | 29 ++++++++-------
.../pulsar/broker/admin/v2/PersistentTopics.java | 14 ++++++--
5 files changed, 79 insertions(+), 54 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 9d63b5e5ee3..8b84cd74d74 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -38,7 +38,6 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -1208,24 +1207,24 @@ public class PersistentTopicsBase extends AdminResource {
getEarliestTimeInBacklog));
}
- protected PersistentTopicInternalStats internalGetInternalStats(boolean authoritative, boolean metadata) {
+ protected CompletableFuture<PersistentTopicInternalStats> internalGetInternalStatsAsync(boolean authoritative,
+ boolean metadata) {
+ CompletableFuture<Void> ret;
if (topicName.isGlobal()) {
- validateGlobalNamespaceOwnership(namespaceName);
- }
- validateTopicOwnership(topicName, authoritative);
- validateTopicOperation(topicName, TopicOperation.GET_STATS);
-
- Topic topic = getTopicReference(topicName);
- try {
- if (metadata) {
- validateTopicOperation(topicName, TopicOperation.GET_METADATA);
- }
- return topic.getInternalStats(metadata).get();
- } catch (Exception e) {
- log.error("[{}] Failed to get internal stats for {}", clientAppId(), topicName, e);
- throw new RestException(Status.INTERNAL_SERVER_ERROR,
- (e instanceof ExecutionException) ? e.getCause().getMessage() : e.getMessage());
+ ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ ret = CompletableFuture.completedFuture(null);
}
+ return ret.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
+ .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_STATS))
+ .thenCompose(__ -> {
+ if (metadata) {
+ return validateTopicOperationAsync(topicName, TopicOperation.GET_METADATA);
+ }
+ return CompletableFuture.completedFuture(null);
+ })
+ .thenCompose(__ -> getTopicReferenceAsync(topicName))
+ .thenCompose(topic -> topic.getInternalStats(metadata));
}
protected void internalGetManagedLedgerInfo(AsyncResponse asyncResponse, boolean authoritative) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index 9cfe0e168de..34e9eadd716 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -51,7 +51,6 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
-import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.util.FutureUtil;
@@ -93,25 +92,30 @@ public class NonPersistentTopics extends PersistentTopics {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist")})
- public PersistentTopicInternalStats getInternalStats(@PathParam("property") String property,
- @PathParam("cluster") String cluster, @PathParam("namespace")
- String namespace,
- @PathParam("topic") @Encoded String encodedTopic,
- @QueryParam("authoritative") @DefaultValue("false")
- boolean authoritative,
- @QueryParam("metadata") @DefaultValue("false")
- boolean metadata) {
+ public void getInternalStats(
+ @Suspended final AsyncResponse asyncResponse,
+ @PathParam("property") String property,
+ @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+ @QueryParam("metadata") @DefaultValue("false") boolean metadata) {
validateTopicName(property, cluster, namespace, encodedTopic);
- validateTopicOwnership(topicName, authoritative);
- validateTopicOperation(topicName, TopicOperation.GET_STATS);
- Topic topic = getTopicReference(topicName);
- try {
- boolean includeMetadata = metadata && hasSuperUserAccess();
- return topic.getInternalStats(includeMetadata).get();
- } catch (Exception e) {
- throw new RestException(Status.INTERNAL_SERVER_ERROR,
- (e instanceof ExecutionException) ? e.getCause().getMessage() : e.getMessage());
- }
+ validateTopicOwnershipAsync(topicName, authoritative)
+ .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_STATS))
+ .thenCompose(__ -> {
+ Topic topic = getTopicReference(topicName);
+ boolean includeMetadata = metadata && hasSuperUserAccess();
+ return topic.getInternalStats(includeMetadata);
+ })
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get internal stats for topic {}", clientAppId(), topicName, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@PUT
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index f6baf6c4b7a..c6c1e189984 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -50,7 +50,6 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.ResetCursorData;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
-import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -419,13 +418,23 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist") })
- public PersistentTopicInternalStats getInternalStats(@PathParam("property") String property,
+ public void getInternalStats(
+ @Suspended final AsyncResponse asyncResponse,
+ @PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("metadata") @DefaultValue("false") boolean metadata) {
validateTopicName(property, cluster, namespace, encodedTopic);
- return internalGetInternalStats(authoritative, metadata);
+ internalGetInternalStatsAsync(authoritative, metadata)
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get internal stats for topic {}", clientAppId(), topicName, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 1a3d2799f47..4e4060fb3c8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -54,7 +54,6 @@ import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
-import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.policies.data.TopicStats;
@@ -112,7 +111,8 @@ public class NonPersistentTopics extends PersistentTopics {
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error"),
})
- public PersistentTopicInternalStats getInternalStats(
+ public void getInternalStats(
+ @Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@@ -123,16 +123,21 @@ public class NonPersistentTopics extends PersistentTopics {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("metadata") @DefaultValue("false") boolean metadata) {
validateTopicName(tenant, namespace, encodedTopic);
- validateTopicOwnership(topicName, authoritative);
- validateTopicOperation(topicName, TopicOperation.GET_STATS);
- Topic topic = getTopicReference(topicName);
- try {
- boolean includeMetadata = metadata && hasSuperUserAccess();
- return topic.getInternalStats(includeMetadata).get();
- } catch (Exception e) {
- throw new RestException(Status.INTERNAL_SERVER_ERROR,
- (e instanceof ExecutionException) ? e.getCause().getMessage() : e.getMessage());
- }
+ validateTopicOwnershipAsync(topicName, authoritative)
+ .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_STATS))
+ .thenCompose(__ -> {
+ Topic topic = getTopicReference(topicName);
+ boolean includeMetadata = metadata && hasSuperUserAccess();
+ return topic.getInternalStats(includeMetadata);
+ })
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get internal stats for topic {}", clientAppId(), topicName, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@PUT
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index d0033d91e68..ad063381343 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -60,7 +60,6 @@ import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
-import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.PublishRate;
@@ -1132,7 +1131,8 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration") })
- public PersistentTopicInternalStats getInternalStats(
+ public void getInternalStats(
+ @Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@@ -1143,7 +1143,15 @@ public class PersistentTopics extends PersistentTopicsBase {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("metadata") @DefaultValue("false") boolean metadata) {
validateTopicName(tenant, namespace, encodedTopic);
- return internalGetInternalStats(authoritative, metadata);
+ internalGetInternalStatsAsync(authoritative, metadata)
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get internal stats for topic {}", clientAppId(), topicName, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET