You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/06/30 02:42:24 UTC
[pulsar] branch master updated: [improve][broker][PIP-149]Make getList async (#16221)
This is an automated email from the ASF dual-hosted git repository.
jianghaiting pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 92d229db95c [improve][broker][PIP-149]Make getList async (#16221)
92d229db95c is described below
commit 92d229db95c61ef70861f18b3cb91b0c3963a3ce
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Thu Jun 30 10:42:17 2022 +0800
[improve][broker][PIP-149]Make getList async (#16221)
---
.../broker/admin/impl/PersistentTopicsBase.java | 38 +++++++++++++++++++---
.../pulsar/broker/admin/v1/PersistentTopics.java | 18 +++++-----
.../pulsar/broker/admin/v2/PersistentTopics.java | 18 +++++-----
.../org/apache/pulsar/broker/admin/AdminTest.java | 2 +-
4 files changed, 55 insertions(+), 21 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 4c5c4a836a7..5815e31d535 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -183,6 +183,32 @@ public class PersistentTopicsBase extends AdminResource {
}
}
+ protected CompletableFuture<List<String>> internalGetListAsync(Optional<String> bundle) {
+ return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GET_TOPICS)
+ .thenCompose(__ -> namespaceResources().namespaceExistsAsync(namespaceName))
+ .thenAccept(exists -> {
+ if (!exists) {
+ throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
+ }
+ })
+ .thenCompose(__ -> topicResources().listPersistentTopicsAsync(namespaceName))
+ .thenApply(topics ->
+ topics.stream()
+ .filter(topic -> {
+ if (isTransactionInternalName(TopicName.get(topic))) {
+ return false;
+ }
+ if (bundle.isPresent()) {
+ NamespaceBundle b = pulsar().getNamespaceService().getNamespaceBundleFactory()
+ .getBundle(TopicName.get(topic));
+ return b != null && bundle.get().equals(b.getBundleRange());
+ }
+ return true;
+ })
+ .collect(Collectors.toList())
+ );
+ }
+
protected CompletableFuture<List<String>> internalGetListAsync() {
return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GET_TOPICS)
.thenCompose(__ -> namespaceResources().namespaceExistsAsync(namespaceName))
@@ -4249,18 +4275,22 @@ public class PersistentTopicsBase extends AdminResource {
return getPartitionedTopicMetadataAsync(
TopicName.get(topicName.getPartitionedTopicName()), false, false)
- .thenApply(partitionedTopicMetadata -> {
+ .thenAccept(partitionedTopicMetadata -> {
if (partitionedTopicMetadata == null || partitionedTopicMetadata.partitions == 0) {
final String topicErrorType = partitionedTopicMetadata
== null ? "has no metadata" : "has zero partitions";
throw new RestException(Status.NOT_FOUND, String.format(
"Partitioned Topic not found: %s %s", topicName.toString(), topicErrorType));
- } else if (!internalGetList(Optional.empty()).contains(topicName.toString())) {
+ }
+ })
+ .thenCompose(__ -> internalGetListAsync(Optional.empty()))
+ .thenApply(topics -> {
+ if (!topics.contains(topicName.toString())) {
throw new RestException(Status.NOT_FOUND, "Topic partitions were not yet created");
}
throw new RestException(Status.NOT_FOUND,
- getPartitionedTopicNotFoundErrorMessage(topicName.toString()));
- });
+ getPartitionedTopicNotFoundErrorMessage(topicName.toString()));
+ });
}
/**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 028c17da3d3..a114bf54cca 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -71,14 +71,16 @@ public class PersistentTopics extends PersistentTopicsBase {
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@ApiParam(value = "Specify the bundle name", required = false)
@QueryParam("bundle") String bundle) {
- try {
- validateNamespaceName(property, cluster, namespace);
- asyncResponse.resume(internalGetList(Optional.ofNullable(bundle)));
- } catch (WebApplicationException wae) {
- asyncResponse.resume(wae);
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- }
+ validateNamespaceName(property, cluster, namespace);
+ internalGetListAsync(Optional.ofNullable(bundle))
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get topic list {}", clientAppId(), namespaceName, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 1fa09d747a1..bac4145871b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -100,14 +100,16 @@ public class PersistentTopics extends PersistentTopicsBase {
@QueryParam("bundle") String bundle,
@ApiParam(value = "Include system topic")
@QueryParam("includeSystemTopic") boolean includeSystemTopic) {
- try {
- validateNamespaceName(tenant, namespace);
- asyncResponse.resume(filterSystemTopic(internalGetList(Optional.ofNullable(bundle)), includeSystemTopic));
- } catch (WebApplicationException wae) {
- asyncResponse.resume(wae);
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- }
+ validateNamespaceName(tenant, namespace);
+ internalGetListAsync(Optional.ofNullable(bundle))
+ .thenAccept(topicList -> asyncResponse.resume(filterSystemTopic(topicList, includeSystemTopic)))
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get topic list {}", clientAppId(), namespaceName, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 425acf6c664..2a4e204c044 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -792,7 +792,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.getList(response, property, cluster, namespace, null);
- verify(response, times(1)).resume(Lists.newArrayList());
+ verify(response, timeout(5000).times(1)).resume(Lists.newArrayList());
// create topic
response = mock(AsyncResponse.class);
persistentTopics.getPartitionedTopicList(response, property, cluster, namespace);