You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/06/06 15:45:39 UTC

[pulsar] branch master updated: [pulsar-admin] change partitioned topic stats on async rest api (#4487)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1473bda  [pulsar-admin] change partitioned topic stats on async rest api (#4487)
1473bda is described below

commit 1473bda6329fcae97f476c351a84df7990b66b6e
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Thu Jun 6 08:45:31 2019 -0700

    [pulsar-admin] change partitioned topic stats on async rest api (#4487)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 98 ++++++++++++++--------
 .../pulsar/broker/admin/v1/PersistentTopics.java   | 25 ++++--
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 24 ++++--
 3 files changed, 101 insertions(+), 46 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index b7302f7..bfea5e5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -39,6 +39,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -641,7 +642,7 @@ public class PersistentTopicsBase extends AdminResource {
         }, null);
     }
 
-    protected PartitionedTopicStats internalGetPartitionedStats(boolean authoritative) {
+    protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean authoritative) {
         PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative);
         if (partitionMetadata.partitions == 0) {
             throw new RestException(Status.NOT_FOUND, "Partitioned Topic not found");
@@ -650,38 +651,54 @@ public class PersistentTopicsBase extends AdminResource {
             validateGlobalNamespaceOwnership(namespaceName);
         }
         PartitionedTopicStats stats = new PartitionedTopicStats(partitionMetadata);
-        try {
-            for (int i = 0; i < partitionMetadata.partitions; i++) {
-                TopicStats partitionStats = pulsar().getAdminClient().topics()
-                        .getStats(topicName.getPartition(i).toString());
-                stats.add(partitionStats);
-                stats.partitions.put(topicName.getPartition(i).toString(), partitionStats);
+
+        List<CompletableFuture<TopicStats>> topicStatsFutureList = Lists.newArrayList();
+        for (int i = 0; i < partitionMetadata.partitions; i++) {
+            try {
+                topicStatsFutureList
+                        .add(pulsar().getAdminClient().topics().getStatsAsync((topicName.getPartition(i).toString())));
+            } catch (PulsarServerException e) {
+                asyncResponse.resume(new RestException(e));
+                return;
             }
-        } catch (PulsarAdminException e) {
-            if (e.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
+        }
 
+        FutureUtil.waitForAll(topicStatsFutureList).handle((result, exception) -> {
+            CompletableFuture<TopicStats> statFuture = null;
+            for (int i = 0; i < topicStatsFutureList.size(); i++) {
+                statFuture = topicStatsFutureList.get(i);
+                if (statFuture.isDone() && !statFuture.isCompletedExceptionally()) {
+                    try {
+                        stats.add(statFuture.get());
+                        stats.partitions.put(topicName.getPartition(i).toString(), statFuture.get());
+                    } catch (Exception e) {
+                        asyncResponse.resume(new RestException(e));
+                        return null;
+                    }
+                }
+            }
+            if (stats.partitions.isEmpty()) {
                 String path = ZkAdminPaths.partitionedTopicPath(topicName);
                 try {
                     boolean zkPathExists = zkPathExists(path);
                     if (zkPathExists) {
                         stats.partitions.put(topicName.toString(), new TopicStats());
                     } else {
-                        throw new RestException(Status.NOT_FOUND, "Internal topics have not been generated yet");
+                        asyncResponse.resume(
+                                new RestException(Status.NOT_FOUND, "Internal topics have not been generated yet"));
+                        return null;
                     }
-                } catch (KeeperException | InterruptedException exception) {
-                    throw new RestException(e);
+                } catch (KeeperException | InterruptedException e) {
+                    asyncResponse.resume(new RestException(e));
+                    return null;
                 }
-
-            } else {
-                throw new RestException(e);
             }
-        } catch (Exception e) {
-            throw new RestException(e);
-        }
-        return stats;
+            asyncResponse.resume(stats);
+            return null;
+        });
     }
 
-    protected PartitionedTopicInternalStats internalGetPartitionedStatsInternal(boolean authoritative) {
+    protected void internalGetPartitionedStatsInternal(AsyncResponse asyncResponse, boolean authoritative) {
         PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative);
         if (partitionMetadata.partitions == 0) {
             throw new RestException(Status.NOT_FOUND, "Partitioned Topic not found");
@@ -690,22 +707,35 @@ public class PersistentTopicsBase extends AdminResource {
             validateGlobalNamespaceOwnership(namespaceName);
         }
         PartitionedTopicInternalStats stats = new PartitionedTopicInternalStats(partitionMetadata);
-        try {
-            for (int i = 0; i < partitionMetadata.partitions; i++) {
-                PersistentTopicInternalStats partitionStats = pulsar().getAdminClient().topics()
-                        .getInternalStats(topicName.getPartition(i).toString());
-                stats.partitions.put(topicName.getPartition(i).toString(), partitionStats);
-            }
-        } catch (PulsarAdminException e) {
-            if (e.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
-                throw new RestException(Status.NOT_FOUND, "Internal topics have not been generated yet");
-            } else {
-                throw new RestException(e);
+
+        List<CompletableFuture<PersistentTopicInternalStats>> topicStatsFutureList = Lists.newArrayList();
+        for (int i = 0; i < partitionMetadata.partitions; i++) {
+            try {
+                topicStatsFutureList.add(pulsar().getAdminClient().topics()
+                        .getInternalStatsAsync((topicName.getPartition(i).toString())));
+            } catch (PulsarServerException e) {
+                asyncResponse.resume(new RestException(e));
+                return;
             }
-        } catch (Exception e) {
-            throw new RestException(e);
         }
-        return stats;
+
+        FutureUtil.waitForAll(topicStatsFutureList).handle((result, exception) -> {
+            CompletableFuture<PersistentTopicInternalStats> statFuture = null;
+            for (int i = 0; i < topicStatsFutureList.size(); i++) {
+                statFuture = topicStatsFutureList.get(i);
+                if (statFuture.isDone() && !statFuture.isCompletedExceptionally()) {
+                    try {
+                        stats.partitions.put(topicName.getPartition(i).toString(), statFuture.get());
+                    } catch (Exception e) {
+                        asyncResponse.resume(new RestException(e));
+                        return null;
+                    }
+                }
+            }
+            asyncResponse.resume(!stats.partitions.isEmpty() ? stats
+                    : new RestException(Status.NOT_FOUND, "Internal topics have not been generated yet"));
+            return null;
+        });
     }
 
     protected void internalDeleteSubscription(String subName, boolean authoritative) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 7d5cbc4..7c4d807 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -38,6 +38,7 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
 import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
+import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.OffloadProcessStatus;
 import org.apache.pulsar.client.api.MessageId;
@@ -55,6 +56,7 @@ import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
+import javax.ws.rs.container.AsyncResponse;
 
 /**
  */
@@ -271,12 +273,17 @@ public class PersistentTopics extends PersistentTopicsBase {
     @ApiOperation(hidden = true, value = "Get the stats for the partitioned topic.")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Topic does not exist") })
-    public PartitionedTopicStats getPartitionedStats(@PathParam("property") String property,
+    public void getPartitionedStats(@Suspended final AsyncResponse asyncResponse,
+            @PathParam("property") String property,
             @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
-        validateTopicName(property, cluster, namespace, encodedTopic);
-        return internalGetPartitionedStats(authoritative);
+        try {
+            validateTopicName(property, cluster, namespace, encodedTopic);
+            internalGetPartitionedStats(asyncResponse, authoritative);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
     }
 
 
@@ -285,12 +292,18 @@ public class PersistentTopics extends PersistentTopicsBase {
     @ApiOperation(hidden = true, value = "Get the stats-internal for the partitioned topic.")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Topic does not exist") })
-    public PartitionedTopicInternalStats getPartitionedStatsInternal(@PathParam("property") String property,
+    public void getPartitionedStatsInternal(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("property") String property,
             @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
-        validateTopicName(property, cluster, namespace, encodedTopic);
-        return internalGetPartitionedStatsInternal(authoritative);
+        try {
+            validateTopicName(property, cluster, namespace, encodedTopic);
+            internalGetPartitionedStatsInternal(asyncResponse, authoritative);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
     }
 
     @DELETE
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 837a3cb..d2554ee 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -38,6 +38,7 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
 import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
+import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.OffloadProcessStatus;
 import org.apache.pulsar.client.api.MessageId;
@@ -297,11 +298,16 @@ public class PersistentTopics extends PersistentTopicsBase {
         @ApiResponse(code = 404, message = "Topic does not exist"),
         @ApiResponse(code = 412, message = "Partitioned topic name is invalid")
     })
-    public PartitionedTopicStats getPartitionedStats(@PathParam("tenant") String tenant,
+    public void getPartitionedStats(@Suspended final AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
-        validatePartitionedTopicName(tenant, namespace, encodedTopic);
-        return internalGetPartitionedStats(authoritative);
+        try {
+            validatePartitionedTopicName(tenant, namespace, encodedTopic);
+            internalGetPartitionedStats(asyncResponse, authoritative);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
     }
 
     @GET
@@ -309,11 +315,17 @@ public class PersistentTopics extends PersistentTopicsBase {
     @ApiOperation(hidden = true, value = "Get the stats-internal for the partitioned topic.")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Topic does not exist") })
-    public PartitionedTopicInternalStats getPartitionedStatsInternal(@PathParam("tenant") String tenant,
+    public void getPartitionedStatsInternal(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
-        validateTopicName(tenant, namespace, encodedTopic);
-        return internalGetPartitionedStatsInternal(authoritative);
+        try {
+            validateTopicName(tenant, namespace, encodedTopic);
+            internalGetPartitionedStatsInternal(asyncResponse, authoritative);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
     }
 
     @DELETE