You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2022/12/09 10:35:27 UTC

[pulsar] branch branch-2.10 updated: [cherry-pick][branch-2.10] make getList async (#18819)

This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 96dbfbfa498 [cherry-pick][branch-2.10] make getList async (#18819)
96dbfbfa498 is described below

commit 96dbfbfa498d61eb6ce9f89c77090dbe8658fab5
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Fri Dec 9 18:35:20 2022 +0800

    [cherry-pick][branch-2.10] make getList async (#18819)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 35 ++++++++++++++++++++--
 .../pulsar/broker/admin/v1/PersistentTopics.java   | 18 ++++++-----
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 18 ++++++-----
 .../org/apache/pulsar/broker/admin/AdminTest.java  |  2 +-
 .../pulsar/broker/service/BrokerServiceTest.java   |  2 ++
 .../systopic/PartitionedSystemTopicTest.java       |  3 +-
 6 files changed, 57 insertions(+), 21 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 698da80265d..4fa7ff79bfe 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -185,6 +185,32 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
+    protected CompletableFuture<List<String>> internalGetListAsync(Optional<String> bundle) {
+        return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GET_TOPICS)
+            .thenCompose(__ -> namespaceResources().namespaceExistsAsync(namespaceName))
+            .thenAccept(exists -> {
+                if (!exists) {
+                    throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
+                }
+            })
+            .thenCompose(__ -> topicResources().listPersistentTopicsAsync(namespaceName))
+            .thenApply(topics ->
+                topics.stream()
+                    .filter(topic -> {
+                        if (isTransactionInternalName(TopicName.get(topic))) {
+                            return false;
+                        }
+                        if (bundle.isPresent()) {
+                            NamespaceBundle b = pulsar().getNamespaceService().getNamespaceBundleFactory()
+                                .getBundle(TopicName.get(topic));
+                            return b != null && bundle.get().equals(b.getBundleRange());
+                        }
+                        return true;
+                    })
+                    .collect(Collectors.toList())
+            );
+    }
+
     protected List<String> internalGetPartitionedTopicList() {
         validateNamespaceOperation(namespaceName, NamespaceOperation.GET_TOPICS);
         // Validate that namespace exists, throws 404 if it doesn't exist
@@ -203,6 +229,7 @@ public class PersistentTopicsBase extends AdminResource {
         return getPartitionedTopicList(TopicDomain.getEnum(domain()));
     }
 
+
     protected Map<String, Set<AuthAction>> internalGetPermissionsOnTopic() {
         // This operation should be reading from zookeeper and it should be allowed without having admin privileges
         validateAdminAccessForTenant(namespaceName.getTenant());
@@ -4097,13 +4124,17 @@ public class PersistentTopicsBase extends AdminResource {
 
         return getPartitionedTopicMetadataAsync(
                 TopicName.get(topicName.getPartitionedTopicName()), false, false)
-                .thenApply(partitionedTopicMetadata -> {
+                .thenAccept(partitionedTopicMetadata -> {
                     if (partitionedTopicMetadata == null || partitionedTopicMetadata.partitions == 0) {
                         final String topicErrorType = partitionedTopicMetadata
                                 == null ? "has no metadata" : "has zero partitions";
                         throw new RestException(Status.NOT_FOUND, String.format(
                                 "Partitioned Topic not found: %s %s", topicName.toString(), topicErrorType));
-                    } else if (!internalGetList(Optional.empty()).contains(topicName.toString())) {
+                    }
+                })
+                .thenCompose(__ -> internalGetListAsync(Optional.empty()))
+                .thenApply(topics -> {
+                    if (!topics.contains(topicName.toString())) {
                         throw new RestException(Status.NOT_FOUND, "Topic partitions were not yet created");
                     }
                     throw new RestException(Status.NOT_FOUND, "Partitioned Topic not found");
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index ba3a69cc0be..2009de113d2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -77,14 +77,16 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
             @ApiParam(value = "Specify the bundle name", required = false)
             @QueryParam("bundle") String bundle) {
-        try {
-            validateNamespaceName(property, cluster, namespace);
-            asyncResponse.resume(internalGetList(Optional.ofNullable(bundle)));
-        } catch (WebApplicationException wae) {
-            asyncResponse.resume(wae);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-        }
+        validateNamespaceName(property, cluster, namespace);
+        internalGetListAsync(Optional.ofNullable(bundle))
+            .thenAccept(asyncResponse::resume)
+            .exceptionally(ex -> {
+                if (!isRedirectException(ex)) {
+                    log.error("[{}] Failed to get topic list {}", clientAppId(), namespaceName, ex);
+                }
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+                return null;
+            });
     }
 
     @GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 56e3799c475..a607db8b6aa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -101,14 +101,16 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace,
             @ApiParam(value = "Specify the bundle name", required = false)
             @QueryParam("bundle") String bundle) {
-        try {
-            validateNamespaceName(tenant, namespace);
-            asyncResponse.resume(internalGetList(Optional.ofNullable(bundle)));
-        } catch (WebApplicationException wae) {
-            asyncResponse.resume(wae);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-        }
+        validateNamespaceName(tenant, namespace);
+        internalGetListAsync(Optional.ofNullable(bundle))
+            .thenAccept(asyncResponse::resume)
+            .exceptionally(ex -> {
+                if (!isRedirectException(ex)) {
+                    log.error("[{}] Failed to get topic list {}", clientAppId(), namespaceName, ex);
+                }
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+                return null;
+            });
     }
 
     @GET
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index f3802235686..cdca5f33a5b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -846,7 +846,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
 
         AsyncResponse response = mock(AsyncResponse.class);
         persistentTopics.getList(response, property, cluster, namespace, null);
-        verify(response, times(1)).resume(Lists.newArrayList());
+        verify(response, timeout(5000).times(1)).resume(Lists.newArrayList());
         // create topic
         assertEquals(persistentTopics.getPartitionedTopicList(property, cluster, namespace), Lists.newArrayList());
         response = mock(AsyncResponse.class);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index e479519f393..89cc626b262 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -86,11 +86,13 @@ import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index d8caccfd53d..9920a097e34 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.systopic;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
@@ -261,7 +260,7 @@ public class PartitionedSystemTopicTest extends BrokerTestBase {
         CompletableFuture<SystemTopicClient.Writer<PulsarEvent>> writer2 = systemTopicClientForNamespace.newWriterAsync();
         CompletableFuture<Void> f1 = admin.topicPolicies().setCompactionThresholdAsync(topic, 1L);
 
-        FutureUtil.waitForAll(Lists.newArrayList(writer1, writer2, f1)).join();
+        CompletableFuture.allOf(writer1, writer2, f1).join();
         Assert.assertTrue(reader1.hasMoreEvents());
         Assert.assertNotNull(reader1.readNext());
         Assert.assertTrue(reader2.hasMoreEvents());