You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2022/12/09 10:35:27 UTC
[pulsar] branch branch-2.10 updated: [cherry-pick][branch-2.10] make getList async (#18819)
This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 96dbfbfa498 [cherry-pick][branch-2.10] make getList async (#18819)
96dbfbfa498 is described below
commit 96dbfbfa498d61eb6ce9f89c77090dbe8658fab5
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Fri Dec 9 18:35:20 2022 +0800
[cherry-pick][branch-2.10] make getList async (#18819)
---
.../broker/admin/impl/PersistentTopicsBase.java | 35 ++++++++++++++++++++--
.../pulsar/broker/admin/v1/PersistentTopics.java | 18 ++++++-----
.../pulsar/broker/admin/v2/PersistentTopics.java | 18 ++++++-----
.../org/apache/pulsar/broker/admin/AdminTest.java | 2 +-
.../pulsar/broker/service/BrokerServiceTest.java | 2 ++
.../systopic/PartitionedSystemTopicTest.java | 3 +-
6 files changed, 57 insertions(+), 21 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 698da80265d..4fa7ff79bfe 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -185,6 +185,32 @@ public class PersistentTopicsBase extends AdminResource {
}
}
+ protected CompletableFuture<List<String>> internalGetListAsync(Optional<String> bundle) {
+ return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GET_TOPICS)
+ .thenCompose(__ -> namespaceResources().namespaceExistsAsync(namespaceName))
+ .thenAccept(exists -> {
+ if (!exists) {
+ throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
+ }
+ })
+ .thenCompose(__ -> topicResources().listPersistentTopicsAsync(namespaceName))
+ .thenApply(topics ->
+ topics.stream()
+ .filter(topic -> {
+ if (isTransactionInternalName(TopicName.get(topic))) {
+ return false;
+ }
+ if (bundle.isPresent()) {
+ NamespaceBundle b = pulsar().getNamespaceService().getNamespaceBundleFactory()
+ .getBundle(TopicName.get(topic));
+ return b != null && bundle.get().equals(b.getBundleRange());
+ }
+ return true;
+ })
+ .collect(Collectors.toList())
+ );
+ }
+
protected List<String> internalGetPartitionedTopicList() {
validateNamespaceOperation(namespaceName, NamespaceOperation.GET_TOPICS);
// Validate that namespace exists, throws 404 if it doesn't exist
@@ -203,6 +229,7 @@ public class PersistentTopicsBase extends AdminResource {
return getPartitionedTopicList(TopicDomain.getEnum(domain()));
}
+
protected Map<String, Set<AuthAction>> internalGetPermissionsOnTopic() {
// This operation should be reading from zookeeper and it should be allowed without having admin privileges
validateAdminAccessForTenant(namespaceName.getTenant());
@@ -4097,13 +4124,17 @@ public class PersistentTopicsBase extends AdminResource {
return getPartitionedTopicMetadataAsync(
TopicName.get(topicName.getPartitionedTopicName()), false, false)
- .thenApply(partitionedTopicMetadata -> {
+ .thenAccept(partitionedTopicMetadata -> {
if (partitionedTopicMetadata == null || partitionedTopicMetadata.partitions == 0) {
final String topicErrorType = partitionedTopicMetadata
== null ? "has no metadata" : "has zero partitions";
throw new RestException(Status.NOT_FOUND, String.format(
"Partitioned Topic not found: %s %s", topicName.toString(), topicErrorType));
- } else if (!internalGetList(Optional.empty()).contains(topicName.toString())) {
+ }
+ })
+ .thenCompose(__ -> internalGetListAsync(Optional.empty()))
+ .thenApply(topics -> {
+ if (!topics.contains(topicName.toString())) {
throw new RestException(Status.NOT_FOUND, "Topic partitions were not yet created");
}
throw new RestException(Status.NOT_FOUND, "Partitioned Topic not found");
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index ba3a69cc0be..2009de113d2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -77,14 +77,16 @@ public class PersistentTopics extends PersistentTopicsBase {
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@ApiParam(value = "Specify the bundle name", required = false)
@QueryParam("bundle") String bundle) {
- try {
- validateNamespaceName(property, cluster, namespace);
- asyncResponse.resume(internalGetList(Optional.ofNullable(bundle)));
- } catch (WebApplicationException wae) {
- asyncResponse.resume(wae);
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- }
+ validateNamespaceName(property, cluster, namespace);
+ internalGetListAsync(Optional.ofNullable(bundle))
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get topic list {}", clientAppId(), namespaceName, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 56e3799c475..a607db8b6aa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -101,14 +101,16 @@ public class PersistentTopics extends PersistentTopicsBase {
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify the bundle name", required = false)
@QueryParam("bundle") String bundle) {
- try {
- validateNamespaceName(tenant, namespace);
- asyncResponse.resume(internalGetList(Optional.ofNullable(bundle)));
- } catch (WebApplicationException wae) {
- asyncResponse.resume(wae);
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- }
+ validateNamespaceName(tenant, namespace);
+ internalGetListAsync(Optional.ofNullable(bundle))
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get topic list {}", clientAppId(), namespaceName, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index f3802235686..cdca5f33a5b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -846,7 +846,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.getList(response, property, cluster, namespace, null);
- verify(response, times(1)).resume(Lists.newArrayList());
+ verify(response, timeout(5000).times(1)).resume(Lists.newArrayList());
// create topic
assertEquals(persistentTopics.getPartitionedTopicList(property, cluster, namespace), Lists.newArrayList());
response = mock(AsyncResponse.class);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index e479519f393..89cc626b262 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -86,11 +86,13 @@ import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.events.EventsTopicNames;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index d8caccfd53d..9920a097e34 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.systopic;
-import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.LedgerOffloader;
@@ -261,7 +260,7 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writer2 = systemTopicClientForNamespace.newWriterAsync();
CompletableFuture<Void> f1 = admin.topicPolicies().setCompactionThresholdAsync(topic, 1L);
- FutureUtil.waitForAll(Lists.newArrayList(writer1, writer2, f1)).join();
+ CompletableFuture.allOf(writer1, writer2, f1).join();
Assert.assertTrue(reader1.hasMoreEvents());
Assert.assertNotNull(reader1.readNext());
Assert.assertTrue(reader2.hasMoreEvents());