You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/06/22 01:28:25 UTC

[pulsar] branch master updated: [improve][broker][PIP-149]make getInternalStats method async (#16141)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 54e9c7577c7 [improve][broker][PIP-149]make getInternalStats method async (#16141)
54e9c7577c7 is described below

commit 54e9c7577c711f3756d191e625885573a90c54d7
Author: Qiang Huang <HQ...@users.noreply.github.com>
AuthorDate: Wed Jun 22 09:28:17 2022 +0800

    [improve][broker][PIP-149]make getInternalStats method async (#16141)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 33 +++++++++--------
 .../broker/admin/v1/NonPersistentTopics.java       | 42 ++++++++++++----------
 .../pulsar/broker/admin/v1/PersistentTopics.java   | 15 ++++++--
 .../broker/admin/v2/NonPersistentTopics.java       | 29 ++++++++-------
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 14 ++++++--
 5 files changed, 79 insertions(+), 54 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 9d63b5e5ee3..8b84cd74d74 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -38,7 +38,6 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -1208,24 +1207,24 @@ public class PersistentTopicsBase extends AdminResource {
                         getEarliestTimeInBacklog));
     }
 
-    protected PersistentTopicInternalStats internalGetInternalStats(boolean authoritative, boolean metadata) {
+    protected CompletableFuture<PersistentTopicInternalStats> internalGetInternalStatsAsync(boolean authoritative,
+                                                                                            boolean metadata) {
+        CompletableFuture<Void> ret;
         if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        validateTopicOwnership(topicName, authoritative);
-        validateTopicOperation(topicName, TopicOperation.GET_STATS);
-
-        Topic topic = getTopicReference(topicName);
-        try {
-            if (metadata) {
-                validateTopicOperation(topicName, TopicOperation.GET_METADATA);
-            }
-            return topic.getInternalStats(metadata).get();
-        } catch (Exception e) {
-            log.error("[{}] Failed to get internal stats for {}", clientAppId(), topicName, e);
-            throw new RestException(Status.INTERNAL_SERVER_ERROR,
-                    (e instanceof ExecutionException) ? e.getCause().getMessage() : e.getMessage());
+            ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        } else {
+            ret = CompletableFuture.completedFuture(null);
         }
+        return ret.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
+                .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_STATS))
+                .thenCompose(__ -> {
+                    if (metadata) {
+                        return validateTopicOperationAsync(topicName, TopicOperation.GET_METADATA);
+                    }
+                    return CompletableFuture.completedFuture(null);
+                })
+                .thenCompose(__ -> getTopicReferenceAsync(topicName))
+                .thenCompose(topic -> topic.getInternalStats(metadata));
     }
 
     protected void internalGetManagedLedgerInfo(AsyncResponse asyncResponse, boolean authoritative) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index 9cfe0e168de..34e9eadd716 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -51,7 +51,6 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
-import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -93,25 +92,30 @@ public class NonPersistentTopics extends PersistentTopics {
             @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
             @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Topic does not exist")})
-    public PersistentTopicInternalStats getInternalStats(@PathParam("property") String property,
-                                                         @PathParam("cluster") String cluster, @PathParam("namespace")
-                                                                     String namespace,
-                                                         @PathParam("topic") @Encoded String encodedTopic,
-                                                         @QueryParam("authoritative") @DefaultValue("false")
-                                                                     boolean authoritative,
-                                                         @QueryParam("metadata") @DefaultValue("false")
-                                                                     boolean metadata) {
+    public void getInternalStats(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("property") String property,
+            @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @QueryParam("metadata") @DefaultValue("false") boolean metadata) {
         validateTopicName(property, cluster, namespace, encodedTopic);
-        validateTopicOwnership(topicName, authoritative);
-        validateTopicOperation(topicName, TopicOperation.GET_STATS);
-        Topic topic = getTopicReference(topicName);
-        try {
-            boolean includeMetadata = metadata && hasSuperUserAccess();
-            return topic.getInternalStats(includeMetadata).get();
-        } catch (Exception e) {
-            throw new RestException(Status.INTERNAL_SERVER_ERROR,
-                    (e instanceof ExecutionException) ? e.getCause().getMessage() : e.getMessage());
-        }
+        validateTopicOwnershipAsync(topicName, authoritative)
+                .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_STATS))
+                .thenCompose(__ -> {
+                    Topic topic = getTopicReference(topicName);
+                    boolean includeMetadata = metadata && hasSuperUserAccess();
+                    return topic.getInternalStats(includeMetadata);
+                })
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to get internal stats for topic {}", clientAppId(), topicName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @PUT
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index f6baf6c4b7a..c6c1e189984 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -50,7 +50,6 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.ResetCursorData;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
-import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -419,13 +418,23 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
             @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Topic does not exist") })
-    public PersistentTopicInternalStats getInternalStats(@PathParam("property") String property,
+    public void getInternalStats(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("property") String property,
             @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @QueryParam("metadata") @DefaultValue("false") boolean metadata) {
         validateTopicName(property, cluster, namespace, encodedTopic);
-        return internalGetInternalStats(authoritative, metadata);
+        internalGetInternalStatsAsync(authoritative, metadata)
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to get internal stats for topic {}", clientAppId(), topicName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 1a3d2799f47..4e4060fb3c8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -54,7 +54,6 @@ import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
-import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.policies.data.TopicStats;
@@ -112,7 +111,8 @@ public class NonPersistentTopics extends PersistentTopics {
             @ApiResponse(code = 412, message = "Topic name is not valid"),
             @ApiResponse(code = 500, message = "Internal server error"),
     })
-    public PersistentTopicInternalStats getInternalStats(
+    public void getInternalStats(
+            @Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "Specify the tenant", required = true)
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the namespace", required = true)
@@ -123,16 +123,21 @@ public class NonPersistentTopics extends PersistentTopics {
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @QueryParam("metadata") @DefaultValue("false") boolean metadata) {
         validateTopicName(tenant, namespace, encodedTopic);
-        validateTopicOwnership(topicName, authoritative);
-        validateTopicOperation(topicName, TopicOperation.GET_STATS);
-        Topic topic = getTopicReference(topicName);
-        try {
-            boolean includeMetadata = metadata && hasSuperUserAccess();
-            return topic.getInternalStats(includeMetadata).get();
-        } catch (Exception e) {
-            throw new RestException(Status.INTERNAL_SERVER_ERROR,
-                    (e instanceof ExecutionException) ? e.getCause().getMessage() : e.getMessage());
-        }
+        validateTopicOwnershipAsync(topicName, authoritative)
+                .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_STATS))
+                .thenCompose(__ -> {
+                    Topic topic = getTopicReference(topicName);
+                    boolean includeMetadata = metadata && hasSuperUserAccess();
+                    return topic.getInternalStats(includeMetadata);
+                })
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to get internal stats for topic {}", clientAppId(), topicName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @PUT
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index d0033d91e68..ad063381343 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -60,7 +60,6 @@ import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
-import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.PolicyName;
 import org.apache.pulsar.common.policies.data.PolicyOperation;
 import org.apache.pulsar.common.policies.data.PublishRate;
@@ -1132,7 +1131,8 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 412, message = "Topic name is not valid"),
             @ApiResponse(code = 500, message = "Internal server error"),
             @ApiResponse(code = 503, message = "Failed to validate global cluster configuration") })
-    public PersistentTopicInternalStats getInternalStats(
+    public void getInternalStats(
+            @Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "Specify the tenant", required = true)
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the namespace", required = true)
@@ -1143,7 +1143,15 @@ public class PersistentTopics extends PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @QueryParam("metadata") @DefaultValue("false") boolean metadata) {
         validateTopicName(tenant, namespace, encodedTopic);
-        return internalGetInternalStats(authoritative, metadata);
+        internalGetInternalStatsAsync(authoritative, metadata)
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to get internal stats for topic {}", clientAppId(), topicName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET