You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/19 05:10:36 UTC

[pulsar] branch master updated: Handle web application exception to redirect request (#9228)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new aaa27c3  Handle web application exception to redirect request (#9228)
aaa27c3 is described below

commit aaa27c3fd2dfe75669564ccc00c50a9aed279915
Author: Sijie Guo <si...@apache.org>
AuthorDate: Mon Jan 18 21:10:06 2021 -0800

    Handle web application exception to redirect request (#9228)
    
    *Motivation*
    
    `validateNamespaceBundleOwnership` throws a web application exception with `Response.temporaryRedirect(redirect)`
    to redirect the request to the owner broker. But the web application exception is a runtime exception. If you don't
    handle and propagate it correctly, it will cause web request to be hang.
    
    PR #8746 changed some web calls to async but it doesn't handle the web application exception. It causes `topics list`
    with non-persistent topics to be hang.
    
    This pull request make sure the callers of `validateNamespaceBundleOwnership` catch and propagate the exceptions to
    web response.
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 26 +++++---
 .../broker/admin/v1/NonPersistentTopics.java       |  8 ++-
 .../broker/admin/v2/NonPersistentTopics.java       | 19 +++++-
 .../pulsar/broker/service/TopicOwnerTest.java      | 77 ++++++++++------------
 4 files changed, 74 insertions(+), 56 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 8d0fc05..cfda798 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -539,9 +539,9 @@ public abstract class NamespacesBase extends AdminResource {
             throw new RestException(e);
         }
 
-        NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
-                authoritative, true);
         try {
+            NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
+                authoritative, true);
             List<String> topics = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join();
             for (String topic : topics) {
                 NamespaceBundle topicBundle = pulsar().getNamespaceService()
@@ -613,10 +613,9 @@ public abstract class NamespacesBase extends AdminResource {
             throw new RestException(e);
         }
 
-        NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
-                authoritative, true);
-
         try {
+            NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
+                authoritative, true);
             // directly remove from owned namespace map and ephemeral node from ZK
             pulsar().getNamespaceService().removeOwnedServiceUnit(bundle);
         } catch (WebApplicationException wae) {
@@ -1368,8 +1367,15 @@ public abstract class NamespacesBase extends AdminResource {
                 asyncResponse.resume(Response.noContent().build());
                 return;
             }
-            NamespaceBundle nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
+            NamespaceBundle nsBundle;
+
+            try {
+                nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
                     authoritative, true);
+            } catch (WebApplicationException wae) {
+                asyncResponse.resume(wae);
+                return;
+            }
 
             pulsar().getNamespaceService().unloadNamespaceBundle(nsBundle)
                     .thenRun(() -> {
@@ -1409,8 +1415,6 @@ public abstract class NamespacesBase extends AdminResource {
         }
 
         validatePoliciesReadOnlyAccess();
-        NamespaceBundle nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
-                authoritative, true);
 
         List<String> supportedNamespaceBundleSplitAlgorithms =
                 pulsar().getConfig().getSupportedNamespaceBundleSplitAlgorithms();
@@ -1422,9 +1426,13 @@ public abstract class NamespacesBase extends AdminResource {
         }
 
         try {
+            NamespaceBundle nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
+                authoritative, true);
             pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload,
-                    getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName)).get();
+                getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName)).get();
             log.info("[{}] Successfully split namespace bundle {}", clientAppId(), nsBundle.toString());
+        } catch (WebApplicationException wae) {
+            throw wae;
         } catch (ExecutionException e) {
             if (e.getCause() instanceof IllegalArgumentException) {
                 log.error("[{}] Failed to split namespace bundle {}/{} due to {}", clientAppId(), namespaceName,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index 06e85df..7c596ea 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -266,13 +266,13 @@ public class NonPersistentTopics extends PersistentTopics {
         NamespaceName fqnn = NamespaceName.get(property, cluster, namespace);
         try {
             if (!isBundleOwnedByAnyBroker(fqnn, policies.bundles, bundleRange)
-                    .get(pulsar().getConfig().getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS)) {
+                .get(pulsar().getConfig().getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS)) {
                 log.info("[{}] Namespace bundle is not owned by any broker {}/{}/{}/{}", clientAppId(), property,
-                        cluster, namespace, bundleRange);
+                    cluster, namespace, bundleRange);
                 return null;
             }
             NamespaceBundle nsBundle = validateNamespaceBundleOwnership(fqnn, policies.bundles, bundleRange,
-                    true, true);
+                true, true);
             final List<String> topicList = Lists.newArrayList();
             pulsar().getBrokerService().forEachTopic(topic -> {
                 TopicName topicName = TopicName.get(topic.getName());
@@ -281,6 +281,8 @@ public class NonPersistentTopics extends PersistentTopics {
                 }
             });
             return topicList;
+        } catch (WebApplicationException wae) {
+            throw wae;
         } catch (Exception e) {
             log.error("[{}] Failed to unload namespace bundle {}/{}", clientAppId(), fqnn.toString(), bundleRange, e);
             throw new RestException(e);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 633b1b2..609b27f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -336,8 +336,14 @@ public class NonPersistentTopics extends PersistentTopics {
                         bundleRange);
                 asyncResponse.resume(Response.noContent().build());
             } else {
-                NamespaceBundle nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles,
+                NamespaceBundle nsBundle;
+                try {
+                    nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles,
                         bundleRange, true, true);
+                } catch (WebApplicationException wae) {
+                    asyncResponse.resume(wae);
+                    return;
+                }
                 try {
                     final List<String> topicList = Lists.newArrayList();
                     pulsar().getBrokerService().forEachTopic(topic -> {
@@ -348,11 +354,20 @@ public class NonPersistentTopics extends PersistentTopics {
                     });
                     asyncResponse.resume(topicList);
                 } catch (Exception e) {
-                    log.error("[{}] Failed to unload namespace bundle {}/{}", clientAppId(),
+                    log.error("[{}] Failed to list topics on namespace bundle {}/{}", clientAppId(),
                             namespaceName, bundleRange, e);
                     asyncResponse.resume(new RestException(e));
                 }
             }
+        }).exceptionally(ex -> {
+            log.error("[{}] Failed to list topics on namespace bundle {}/{}", clientAppId(),
+                namespaceName, bundleRange, ex);
+            if (ex.getCause() instanceof WebApplicationException) {
+                asyncResponse.resume(ex.getCause());
+            } else {
+                asyncResponse.resume(new RestException(ex.getCause()));
+            }
+            return null;
         });
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
index 177ed3a..e21b5d5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
@@ -27,6 +27,7 @@ import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
 import com.google.common.collect.Sets;
 
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
@@ -82,6 +83,9 @@ public class TopicOwnerTest {
     protected PulsarService[] pulsarServices = new PulsarService[BROKER_COUNT];
     protected PulsarService leaderPulsar;
     protected PulsarAdmin leaderAdmin;
+    protected String testCluster = "my-cluster";
+    protected String testTenant = "my-tenant";
+    protected String testNamespace = testTenant + "/my-ns";
 
     @BeforeMethod
     void setup() throws Exception {
@@ -117,6 +121,12 @@ public class TopicOwnerTest {
         leaderPulsar = pulsarServices[0];
         leaderAdmin = pulsarAdmins[0];
         Thread.sleep(1000);
+
+        pulsarAdmins[0].clusters().createCluster(testCluster, new ClusterData(pulsarServices[0].getWebServiceAddress()));
+        TenantInfo tenantInfo = new TenantInfo();
+        tenantInfo.setAllowedClusters(Sets.newHashSet(testCluster));
+        pulsarAdmins[0].tenants().createTenant(testTenant, tenantInfo);
+        pulsarAdmins[0].namespaces().createNamespace(testNamespace, 16);
     }
 
     @AfterMethod(alwaysRun = true)
@@ -231,12 +241,6 @@ public class TopicOwnerTest {
 
     @Test
     public void testAcquireOwnershipWithZookeeperDisconnectedBeforeOwnershipNodeCreated() throws Exception {
-        pulsarAdmins[0].clusters().createCluster("my-cluster", new ClusterData(pulsarServices[0].getWebServiceAddress()));
-        TenantInfo tenantInfo = new TenantInfo();
-        tenantInfo.setAllowedClusters(Sets.newHashSet("my-cluster"));
-        pulsarAdmins[0].tenants().createTenant("my-tenant", tenantInfo);
-        pulsarAdmins[0].namespaces().createNamespace("my-tenant/my-ns", 16);
-
         String topic1 = "persistent://my-tenant/my-ns/topic-1";
         NamespaceService leaderNamespaceService = leaderPulsar.getNamespaceService();
         NamespaceBundle namespaceBundle = leaderNamespaceService.getBundle(TopicName.get(topic1));
@@ -286,12 +290,6 @@ public class TopicOwnerTest {
 
     @Test
     public void testAcquireOwnershipWithZookeeperDisconnectedAfterOwnershipNodeCreated() throws Exception {
-        pulsarAdmins[0].clusters().createCluster("my-cluster", new ClusterData(pulsarServices[0].getWebServiceAddress()));
-        TenantInfo tenantInfo = new TenantInfo();
-        tenantInfo.setAllowedClusters(Sets.newHashSet("my-cluster"));
-        pulsarAdmins[0].tenants().createTenant("my-tenant", tenantInfo);
-        pulsarAdmins[0].namespaces().createNamespace("my-tenant/my-ns", 16);
-
         String topic1 = "persistent://my-tenant/my-ns/topic-1";
         NamespaceService leaderNamespaceService = leaderPulsar.getNamespaceService();
         NamespaceBundle namespaceBundle = leaderNamespaceService.getBundle(TopicName.get(topic1));
@@ -340,12 +338,6 @@ public class TopicOwnerTest {
 
     @Test
     public void testReestablishOwnershipAfterInvalidateCache() throws Exception {
-        pulsarAdmins[0].clusters().createCluster("my-cluster", new ClusterData(pulsarServices[0].getWebServiceAddress()));
-        TenantInfo tenantInfo = new TenantInfo();
-        tenantInfo.setAllowedClusters(Sets.newHashSet("my-cluster"));
-        pulsarAdmins[0].tenants().createTenant("my-tenant", tenantInfo);
-        pulsarAdmins[0].namespaces().createNamespace("my-tenant/my-ns", 16);
-
         String topic1 = "persistent://my-tenant/my-ns/topic-1";
         NamespaceService leaderNamespaceService = leaderPulsar.getNamespaceService();
         NamespaceBundle namespaceBundle = leaderNamespaceService.getBundle(TopicName.get(topic1));
@@ -399,12 +391,6 @@ public class TopicOwnerTest {
 
     @Test
     public void testReleaseOwnershipWithZookeeperDisconnectedBeforeOwnershipNodeDeleted() throws Exception {
-        pulsarAdmins[0].clusters().createCluster("my-cluster", new ClusterData(pulsarServices[0].getWebServiceAddress()));
-        TenantInfo tenantInfo = new TenantInfo();
-        tenantInfo.setAllowedClusters(Sets.newHashSet("my-cluster"));
-        pulsarAdmins[0].tenants().createTenant("my-tenant", tenantInfo);
-        pulsarAdmins[0].namespaces().createNamespace("my-tenant/my-ns", 16);
-
         String topic1 = "persistent://my-tenant/my-ns/topic-1";
         NamespaceService leaderNamespaceService = leaderPulsar.getNamespaceService();
         NamespaceBundle namespaceBundle = leaderNamespaceService.getBundle(TopicName.get(topic1));
@@ -458,12 +444,6 @@ public class TopicOwnerTest {
 
     @Test
     public void testReleaseOwnershipWithZookeeperDisconnectedAfterOwnershipNodeDeleted() throws Exception {
-        pulsarAdmins[0].clusters().createCluster("my-cluster", new ClusterData(pulsarServices[0].getWebServiceAddress()));
-        TenantInfo tenantInfo = new TenantInfo();
-        tenantInfo.setAllowedClusters(Sets.newHashSet("my-cluster"));
-        pulsarAdmins[0].tenants().createTenant("my-tenant", tenantInfo);
-        pulsarAdmins[0].namespaces().createNamespace("my-tenant/my-ns", 16);
-
         String topic1 = "persistent://my-tenant/my-ns/topic-1";
         NamespaceService leaderNamespaceService = leaderPulsar.getNamespaceService();
         NamespaceBundle namespaceBundle = leaderNamespaceService.getBundle(TopicName.get(topic1));
@@ -517,12 +497,6 @@ public class TopicOwnerTest {
 
     @Test
     public void testConnectToInvalidateBundleCacheBroker() throws Exception {
-        pulsarAdmins[0].clusters().createCluster("my-cluster", new ClusterData(pulsarServices[0].getWebServiceAddress()));
-        TenantInfo tenantInfo = new TenantInfo();
-        tenantInfo.setAllowedClusters(Sets.newHashSet("my-cluster"));
-        pulsarAdmins[0].tenants().createTenant("my-tenant", tenantInfo);
-        pulsarAdmins[0].namespaces().createNamespace("my-tenant/my-ns", 16);
-
         Assert.assertEquals(pulsarAdmins[0].namespaces().getPolicies("my-tenant/my-ns").bundles.getNumBundles(), 16);
 
         final String topic1 = "persistent://my-tenant/my-ns/topic-1";
@@ -554,12 +528,6 @@ public class TopicOwnerTest {
 
     @Test
     public void testLookupPartitionedTopic() throws Exception {
-        pulsarAdmins[0].clusters().createCluster("my-cluster", new ClusterData(pulsarServices[0].getWebServiceAddress()));
-        TenantInfo tenantInfo = new TenantInfo();
-        tenantInfo.setAllowedClusters(Sets.newHashSet("my-cluster"));
-        pulsarAdmins[0].tenants().createTenant("my-tenant", tenantInfo);
-        pulsarAdmins[0].namespaces().createNamespace("my-tenant/my-ns", 16);
-
         final int partitions = 5;
         final String topic = "persistent://my-tenant/my-ns/partitionedTopic";
 
@@ -581,4 +549,29 @@ public class TopicOwnerTest {
         }
 
     }
+
+    @Test
+    public void testListNonPersistentTopic() throws Exception {
+        final String topicName = "non-persistent://my-tenant/my-ns/my-topic";
+        pulsarAdmins[0].topics().createPartitionedTopic(topicName, 16);
+
+        PulsarClient client = PulsarClient.builder().
+                serviceUrl(pulsarServices[0].getBrokerServiceUrl())
+                .build();
+
+        Consumer<byte[]> consumer = client.newConsumer()
+                .topic(topicName)
+                .subscriptionName("my-sub")
+                .subscribe();
+
+        List<String> topics = pulsarAdmins[0].topics().getList("my-tenant/my-ns");
+        Assert.assertEquals(topics.size(), 16);
+        for (String topic : topics) {
+            Assert.assertTrue(topic.contains("non-persistent"));
+            Assert.assertTrue(topic.contains("my-tenant/my-ns/my-topic"));
+        }
+
+        consumer.close();
+        client.close();
+    }
 }