You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/06/06 15:45:39 UTC
[pulsar] branch master updated: [pulsar-admin] change partitioned
topic stats on async rest api (#4487)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1473bda [pulsar-admin] change partitioned topic stats on async rest api (#4487)
1473bda is described below
commit 1473bda6329fcae97f476c351a84df7990b66b6e
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Thu Jun 6 08:45:31 2019 -0700
[pulsar-admin] change partitioned topic stats on async rest api (#4487)
---
.../broker/admin/impl/PersistentTopicsBase.java | 98 ++++++++++++++--------
.../pulsar/broker/admin/v1/PersistentTopics.java | 25 ++++--
.../pulsar/broker/admin/v2/PersistentTopics.java | 24 ++++--
3 files changed, 101 insertions(+), 46 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index b7302f7..bfea5e5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -39,6 +39,7 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -641,7 +642,7 @@ public class PersistentTopicsBase extends AdminResource {
}, null);
}
- protected PartitionedTopicStats internalGetPartitionedStats(boolean authoritative) {
+ protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean authoritative) {
PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative);
if (partitionMetadata.partitions == 0) {
throw new RestException(Status.NOT_FOUND, "Partitioned Topic not found");
@@ -650,38 +651,54 @@ public class PersistentTopicsBase extends AdminResource {
validateGlobalNamespaceOwnership(namespaceName);
}
PartitionedTopicStats stats = new PartitionedTopicStats(partitionMetadata);
- try {
- for (int i = 0; i < partitionMetadata.partitions; i++) {
- TopicStats partitionStats = pulsar().getAdminClient().topics()
- .getStats(topicName.getPartition(i).toString());
- stats.add(partitionStats);
- stats.partitions.put(topicName.getPartition(i).toString(), partitionStats);
+
+ List<CompletableFuture<TopicStats>> topicStatsFutureList = Lists.newArrayList();
+ for (int i = 0; i < partitionMetadata.partitions; i++) {
+ try {
+ topicStatsFutureList
+ .add(pulsar().getAdminClient().topics().getStatsAsync((topicName.getPartition(i).toString())));
+ } catch (PulsarServerException e) {
+ asyncResponse.resume(new RestException(e));
+ return;
}
- } catch (PulsarAdminException e) {
- if (e.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
+ }
+ FutureUtil.waitForAll(topicStatsFutureList).handle((result, exception) -> {
+ CompletableFuture<TopicStats> statFuture = null;
+ for (int i = 0; i < topicStatsFutureList.size(); i++) {
+ statFuture = topicStatsFutureList.get(i);
+ if (statFuture.isDone() && !statFuture.isCompletedExceptionally()) {
+ try {
+ stats.add(statFuture.get());
+ stats.partitions.put(topicName.getPartition(i).toString(), statFuture.get());
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ return null;
+ }
+ }
+ }
+ if (stats.partitions.isEmpty()) {
String path = ZkAdminPaths.partitionedTopicPath(topicName);
try {
boolean zkPathExists = zkPathExists(path);
if (zkPathExists) {
stats.partitions.put(topicName.toString(), new TopicStats());
} else {
- throw new RestException(Status.NOT_FOUND, "Internal topics have not been generated yet");
+ asyncResponse.resume(
+ new RestException(Status.NOT_FOUND, "Internal topics have not been generated yet"));
+ return null;
}
- } catch (KeeperException | InterruptedException exception) {
- throw new RestException(e);
+ } catch (KeeperException | InterruptedException e) {
+ asyncResponse.resume(new RestException(e));
+ return null;
}
-
- } else {
- throw new RestException(e);
}
- } catch (Exception e) {
- throw new RestException(e);
- }
- return stats;
+ asyncResponse.resume(stats);
+ return null;
+ });
}
- protected PartitionedTopicInternalStats internalGetPartitionedStatsInternal(boolean authoritative) {
+ protected void internalGetPartitionedStatsInternal(AsyncResponse asyncResponse, boolean authoritative) {
PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative);
if (partitionMetadata.partitions == 0) {
throw new RestException(Status.NOT_FOUND, "Partitioned Topic not found");
@@ -690,22 +707,35 @@ public class PersistentTopicsBase extends AdminResource {
validateGlobalNamespaceOwnership(namespaceName);
}
PartitionedTopicInternalStats stats = new PartitionedTopicInternalStats(partitionMetadata);
- try {
- for (int i = 0; i < partitionMetadata.partitions; i++) {
- PersistentTopicInternalStats partitionStats = pulsar().getAdminClient().topics()
- .getInternalStats(topicName.getPartition(i).toString());
- stats.partitions.put(topicName.getPartition(i).toString(), partitionStats);
- }
- } catch (PulsarAdminException e) {
- if (e.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
- throw new RestException(Status.NOT_FOUND, "Internal topics have not been generated yet");
- } else {
- throw new RestException(e);
+
+ List<CompletableFuture<PersistentTopicInternalStats>> topicStatsFutureList = Lists.newArrayList();
+ for (int i = 0; i < partitionMetadata.partitions; i++) {
+ try {
+ topicStatsFutureList.add(pulsar().getAdminClient().topics()
+ .getInternalStatsAsync((topicName.getPartition(i).toString())));
+ } catch (PulsarServerException e) {
+ asyncResponse.resume(new RestException(e));
+ return;
}
- } catch (Exception e) {
- throw new RestException(e);
}
- return stats;
+
+ FutureUtil.waitForAll(topicStatsFutureList).handle((result, exception) -> {
+ CompletableFuture<PersistentTopicInternalStats> statFuture = null;
+ for (int i = 0; i < topicStatsFutureList.size(); i++) {
+ statFuture = topicStatsFutureList.get(i);
+ if (statFuture.isDone() && !statFuture.isCompletedExceptionally()) {
+ try {
+ stats.partitions.put(topicName.getPartition(i).toString(), statFuture.get());
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ return null;
+ }
+ }
+ }
+ asyncResponse.resume(!stats.partitions.isEmpty() ? stats
+ : new RestException(Status.NOT_FOUND, "Internal topics have not been generated yet"));
+ return null;
+ });
}
protected void internalDeleteSubscription(String subName, boolean authoritative) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 7d5cbc4..7c4d807 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -38,6 +38,7 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
+import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.client.api.MessageId;
@@ -55,6 +56,7 @@ import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
+import javax.ws.rs.container.AsyncResponse;
/**
*/
@@ -271,12 +273,17 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiOperation(hidden = true, value = "Get the stats for the partitioned topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist") })
- public PartitionedTopicStats getPartitionedStats(@PathParam("property") String property,
+ public void getPartitionedStats(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
- validateTopicName(property, cluster, namespace, encodedTopic);
- return internalGetPartitionedStats(authoritative);
+ try {
+ validateTopicName(property, cluster, namespace, encodedTopic);
+ internalGetPartitionedStats(asyncResponse, authoritative);
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ }
}
@@ -285,12 +292,18 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiOperation(hidden = true, value = "Get the stats-internal for the partitioned topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist") })
- public PartitionedTopicInternalStats getPartitionedStatsInternal(@PathParam("property") String property,
+ public void getPartitionedStatsInternal(
+ @Suspended final AsyncResponse asyncResponse,
+ @PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
- validateTopicName(property, cluster, namespace, encodedTopic);
- return internalGetPartitionedStatsInternal(authoritative);
+ try {
+ validateTopicName(property, cluster, namespace, encodedTopic);
+ internalGetPartitionedStatsInternal(asyncResponse, authoritative);
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ }
}
@DELETE
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 837a3cb..d2554ee 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -38,6 +38,7 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
+import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.client.api.MessageId;
@@ -297,11 +298,16 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 412, message = "Partitioned topic name is invalid")
})
- public PartitionedTopicStats getPartitionedStats(@PathParam("tenant") String tenant,
+ public void getPartitionedStats(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
- validatePartitionedTopicName(tenant, namespace, encodedTopic);
- return internalGetPartitionedStats(authoritative);
+ try {
+ validatePartitionedTopicName(tenant, namespace, encodedTopic);
+ internalGetPartitionedStats(asyncResponse, authoritative);
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ }
}
@GET
@@ -309,11 +315,17 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiOperation(hidden = true, value = "Get the stats-internal for the partitioned topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist") })
- public PartitionedTopicInternalStats getPartitionedStatsInternal(@PathParam("tenant") String tenant,
+ public void getPartitionedStatsInternal(
+ @Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
- validateTopicName(tenant, namespace, encodedTopic);
- return internalGetPartitionedStatsInternal(authoritative);
+ try {
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalGetPartitionedStatsInternal(asyncResponse, authoritative);
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ }
}
@DELETE