You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/27 06:53:36 UTC
[pulsar] branch master updated: [improve][broker][PIP-149]make getPartitionedTopicList method async (#16217)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new eeb22ba9631 [improve][broker][PIP-149]make getPartitionedTopicList method async (#16217)
eeb22ba9631 is described below
commit eeb22ba9631d86528bbca8f1825feff7e70272d2
Author: Qiang Huang <HQ...@users.noreply.github.com>
AuthorDate: Mon Jun 27 14:53:29 2022 +0800
[improve][broker][PIP-149]make getPartitionedTopicList method async (#16217)
---
.../broker/admin/impl/PersistentTopicsBase.java | 29 ++++++++++------------
.../pulsar/broker/admin/v1/PersistentTopics.java | 18 +++++++++++---
.../pulsar/broker/admin/v2/PersistentTopics.java | 14 +++++++++--
.../org/apache/pulsar/broker/admin/AdminTest.java | 11 +++++---
.../pulsar/broker/admin/PersistentTopicsTest.java | 26 ++++++++++++++-----
5 files changed, 67 insertions(+), 31 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index c6fe2fef7a8..f2b4d72c7b0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -197,22 +197,19 @@ public class PersistentTopicsBase extends AdminResource {
!isTransactionInternalName(TopicName.get(topic))).collect(Collectors.toList()));
}
- protected List<String> internalGetPartitionedTopicList() {
- validateNamespaceOperation(namespaceName, NamespaceOperation.GET_TOPICS);
- // Validate that namespace exists, throws 404 if it doesn't exist
- try {
- if (!namespaceResources().namespaceExists(namespaceName)) {
- log.warn("[{}] Failed to get partitioned topic list {}: Namespace does not exist", clientAppId(),
- namespaceName);
- throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
- }
- } catch (RestException e) {
- throw e;
- } catch (Exception e) {
- log.error("[{}] Failed to get partitioned topic list for namespace {}", clientAppId(), namespaceName, e);
- throw new RestException(e);
- }
- return getPartitionedTopicList(TopicDomain.getEnum(domain()));
+ protected CompletableFuture<List<String>> internalGetPartitionedTopicListAsync() {
+ return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GET_TOPICS)
+ .thenCompose(__ -> namespaceResources().namespaceExistsAsync(namespaceName))
+ .thenCompose(namespaceExists -> {
+ // Validate that namespace exists, throws 404 if it doesn't exist
+ if (!namespaceExists) {
+ log.warn("[{}] Failed to get partitioned topic list {}: Namespace does not exist",
+ clientAppId(), namespaceName);
+ throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
+ } else {
+ return getPartitionedTopicListAsync(TopicDomain.getEnum(domain()));
+ }
+ });
}
protected CompletableFuture<Map<String, Set<AuthAction>>> internalGetPermissionsOnTopic() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index e492f799195..99c10c62f4b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -24,7 +24,6 @@ import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
-import java.util.List;
import java.util.Optional;
import java.util.Set;
import javax.ws.rs.DELETE;
@@ -89,10 +88,21 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin or operate permission on the namespace"),
@ApiResponse(code = 404, message = "Namespace doesn't exist")})
- public List<String> getPartitionedTopicList(@PathParam("property") String property,
- @PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
+ public void getPartitionedTopicList(
+ @Suspended AsyncResponse asyncResponse,
+ @PathParam("property") String property,
+ @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
- return internalGetPartitionedTopicList();
+ internalGetPartitionedTopicListAsync()
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get partitioned topic list {}", clientAppId(), namespaceName, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 88067344949..3f3c34d78d3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -120,7 +120,8 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"),
@ApiResponse(code = 412, message = "Namespace name is not valid"),
@ApiResponse(code = 500, message = "Internal server error")})
- public List<String> getPartitionedTopicList(
+ public void getPartitionedTopicList(
+ @Suspended AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@@ -128,7 +129,16 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiParam(value = "Include system topic")
@QueryParam("includeSystemTopic") boolean includeSystemTopic) {
validateNamespaceName(tenant, namespace);
- return filterSystemTopic(internalGetPartitionedTopicList(), includeSystemTopic);
+ internalGetPartitionedTopicListAsync()
+ .thenAccept(partitionedTopicList -> asyncResponse.resume(
+ filterSystemTopic(partitionedTopicList, includeSystemTopic)))
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get partitioned topic list {}", clientAppId(), namespaceName, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 0d2631b00a9..425acf6c664 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -794,14 +794,19 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
persistentTopics.getList(response, property, cluster, namespace, null);
verify(response, times(1)).resume(Lists.newArrayList());
// create topic
- assertEquals(persistentTopics.getPartitionedTopicList(property, cluster, namespace), Lists.newArrayList());
+ response = mock(AsyncResponse.class);
+ persistentTopics.getPartitionedTopicList(response, property, cluster, namespace);
+ verify(response, timeout(5000).times(1)).resume(Lists.newArrayList());
response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.createPartitionedTopic(response, property, cluster, namespace, topic, 5, false);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
- assertEquals(persistentTopics.getPartitionedTopicList(property, cluster, namespace), Lists
- .newArrayList(String.format("persistent://%s/%s/%s/%s", property, cluster, namespace, topic)));
+ response = mock(AsyncResponse.class);
+ persistentTopics.getPartitionedTopicList(response, property, cluster, namespace);
+ verify(response, timeout(5000).times(1))
+ .resume(Lists
+ .newArrayList(String.format("persistent://%s/%s/%s/%s", property, cluster, namespace, topic)));
TopicName topicName = TopicName.get("persistent", property, cluster, namespace, topic);
assertEquals(persistentTopics.getPartitionedTopicMetadata(topicName, true, false).partitions, 5);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 23fee723654..2fd4389146c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -691,16 +691,30 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
- List<String> persistentPartitionedTopics = persistentTopics.getPartitionedTopicList(testTenant, testNamespace, false);
-
+ response = mock(AsyncResponse.class);
+ responseCaptor = ArgumentCaptor.forClass(Response.class);
+ persistentTopics.getPartitionedTopicList(response, testTenant, testNamespace, false);
+ verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+ List<String> persistentPartitionedTopics = (List<String>) responseCaptor.getValue();
Assert.assertEquals(persistentPartitionedTopics.size(), 1);
- Assert.assertEquals(TopicName.get(persistentPartitionedTopics.get(0)).getDomain().value(), TopicDomain.persistent.value());
- persistentPartitionedTopics = persistentTopics.getPartitionedTopicList(testTenant, testNamespace, true);
+ Assert.assertEquals(TopicName.get(persistentPartitionedTopics.get(0)).getDomain().value(),
+ TopicDomain.persistent.value());
+
+ response = mock(AsyncResponse.class);
+ responseCaptor = ArgumentCaptor.forClass(Response.class);
+ persistentTopics.getPartitionedTopicList(response, testTenant, testNamespace, true);
+ verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+ persistentPartitionedTopics = (List<String>) responseCaptor.getValue();
Assert.assertEquals(persistentPartitionedTopics.size(), 2);
- List<String> nonPersistentPartitionedTopics = nonPersistentTopic.getPartitionedTopicList(testTenant, testNamespace, false);
+ response = mock(AsyncResponse.class);
+ responseCaptor = ArgumentCaptor.forClass(Response.class);
+ nonPersistentTopic.getPartitionedTopicList(response, testTenant, testNamespace, false);
+ verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+ List<String> nonPersistentPartitionedTopics = (List<String>) responseCaptor.getValue();
Assert.assertEquals(nonPersistentPartitionedTopics.size(), 1);
- Assert.assertEquals(TopicName.get(nonPersistentPartitionedTopics.get(0)).getDomain().value(), TopicDomain.non_persistent.value());
+ Assert.assertEquals(TopicName.get(nonPersistentPartitionedTopics.get(0)).getDomain().value(),
+ TopicDomain.non_persistent.value());
}
@Test