You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/19 05:13:30 UTC
[pulsar] branch branch-2.7 updated: Handle web application
exception to redirect request (#9228)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new af3407e Handle web application exception to redirect request (#9228)
af3407e is described below
commit af3407e80c69b95e417ac4b9e933ee6c0f909548
Author: Sijie Guo <si...@apache.org>
AuthorDate: Mon Jan 18 21:10:06 2021 -0800
Handle web application exception to redirect request (#9228)
*Motivation*
`validateNamespaceBundleOwnership` throws a web application exception with `Response.temporaryRedirect(redirect)`
to redirect the request to the owner broker. But the web application exception is a runtime exception. If you don't
handle and propagate it correctly, it will cause web request to be hang.
PR #8746 changed some web calls to async but it doesn't handle the web application exception. It causes `topics list`
with non-persistent topics to be hang.
This pull request make sure the callers of `validateNamespaceBundleOwnership` catch and propagate the exceptions to
web response.
(cherry picked from commit aaa27c3fd2dfe75669564ccc00c50a9aed279915)
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 27 +++++---
.../broker/admin/v1/NonPersistentTopics.java | 8 ++-
.../broker/admin/v2/NonPersistentTopics.java | 19 +++++-
.../pulsar/broker/service/TopicOwnerTest.java | 77 ++++++++++------------
4 files changed, 75 insertions(+), 56 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 87d0d2f..2b6e793 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -542,9 +542,9 @@ public abstract class NamespacesBase extends AdminResource {
throw new RestException(e);
}
- NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
- authoritative, true);
try {
+ NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
+ authoritative, true);
List<String> topics = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join();
for (String topic : topics) {
NamespaceBundle topicBundle = pulsar().getNamespaceService()
@@ -614,10 +614,9 @@ public abstract class NamespacesBase extends AdminResource {
throw new RestException(e);
}
- NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
- authoritative, true);
-
try {
+ NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
+ authoritative, true);
// directly remove from owned namespace map and ephemeral node from ZK
pulsar().getNamespaceService().removeOwnedServiceUnit(bundle);
} catch (WebApplicationException wae) {
@@ -1316,8 +1315,15 @@ public abstract class NamespacesBase extends AdminResource {
asyncResponse.resume(Response.noContent().build());
return;
}
- NamespaceBundle nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
+ NamespaceBundle nsBundle;
+
+ try {
+ nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
authoritative, true);
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
+ return;
+ }
pulsar().getNamespaceService().unloadNamespaceBundle(nsBundle)
.thenRun(() -> {
@@ -1356,8 +1362,6 @@ public abstract class NamespacesBase extends AdminResource {
}
validatePoliciesReadOnlyAccess();
- NamespaceBundle nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
- authoritative, true);
List<String> supportedNamespaceBundleSplitAlgorithms = pulsar().getConfig().getSupportedNamespaceBundleSplitAlgorithms();
if (StringUtils.isNotBlank(splitAlgorithmName) && !supportedNamespaceBundleSplitAlgorithms.contains(splitAlgorithmName)) {
@@ -1366,8 +1370,13 @@ public abstract class NamespacesBase extends AdminResource {
}
try {
- pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload, getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName)).get();
+ NamespaceBundle nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
+ authoritative, true);
+ pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload,
+ getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName)).get();
log.info("[{}] Successfully split namespace bundle {}", clientAppId(), nsBundle.toString());
+ } catch (WebApplicationException wae) {
+ throw wae;
} catch (ExecutionException e) {
if (e.getCause() instanceof IllegalArgumentException) {
log.error("[{}] Failed to split namespace bundle {}/{} due to {}", clientAppId(), namespaceName,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index bc3ddd1..b2aa364 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -264,13 +264,13 @@ public class NonPersistentTopics extends PersistentTopics {
NamespaceName fqnn = NamespaceName.get(property, cluster, namespace);
try {
if (!isBundleOwnedByAnyBroker(fqnn, policies.bundles, bundleRange)
- .get(pulsar().getConfig().getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS)) {
+ .get(pulsar().getConfig().getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS)) {
log.info("[{}] Namespace bundle is not owned by any broker {}/{}/{}/{}", clientAppId(), property,
- cluster, namespace, bundleRange);
+ cluster, namespace, bundleRange);
return null;
}
NamespaceBundle nsBundle = validateNamespaceBundleOwnership(fqnn, policies.bundles, bundleRange,
- true, true);
+ true, true);
final List<String> topicList = Lists.newArrayList();
pulsar().getBrokerService().forEachTopic(topic -> {
TopicName topicName = TopicName.get(topic.getName());
@@ -279,6 +279,8 @@ public class NonPersistentTopics extends PersistentTopics {
}
});
return topicList;
+ } catch (WebApplicationException wae) {
+ throw wae;
} catch (Exception e) {
log.error("[{}] Failed to unload namespace bundle {}/{}", clientAppId(), fqnn.toString(), bundleRange, e);
throw new RestException(e);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 59b9dad..3805cf0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -333,8 +333,14 @@ public class NonPersistentTopics extends PersistentTopics {
bundleRange);
asyncResponse.resume(Response.noContent().build());
} else {
- NamespaceBundle nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles,
+ NamespaceBundle nsBundle;
+ try {
+ nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles,
bundleRange, true, true);
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
+ return;
+ }
try {
final List<String> topicList = Lists.newArrayList();
pulsar().getBrokerService().forEachTopic(topic -> {
@@ -345,11 +351,20 @@ public class NonPersistentTopics extends PersistentTopics {
});
asyncResponse.resume(topicList);
} catch (Exception e) {
- log.error("[{}] Failed to unload namespace bundle {}/{}", clientAppId(),
+ log.error("[{}] Failed to list topics on namespace bundle {}/{}", clientAppId(),
namespaceName, bundleRange, e);
asyncResponse.resume(new RestException(e));
}
}
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to list topics on namespace bundle {}/{}", clientAppId(),
+ namespaceName, bundleRange, ex);
+ if (ex.getCause() instanceof WebApplicationException) {
+ asyncResponse.resume(ex.getCause());
+ } else {
+ asyncResponse.resume(new RestException(ex.getCause()));
+ }
+ return null;
});
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
index 177ed3a..e21b5d5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicOwnerTest.java
@@ -27,6 +27,7 @@ import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.google.common.collect.Sets;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -82,6 +83,9 @@ public class TopicOwnerTest {
protected PulsarService[] pulsarServices = new PulsarService[BROKER_COUNT];
protected PulsarService leaderPulsar;
protected PulsarAdmin leaderAdmin;
+ protected String testCluster = "my-cluster";
+ protected String testTenant = "my-tenant";
+ protected String testNamespace = testTenant + "/my-ns";
@BeforeMethod
void setup() throws Exception {
@@ -117,6 +121,12 @@ public class TopicOwnerTest {
leaderPulsar = pulsarServices[0];
leaderAdmin = pulsarAdmins[0];
Thread.sleep(1000);
+
+ pulsarAdmins[0].clusters().createCluster(testCluster, new ClusterData(pulsarServices[0].getWebServiceAddress()));
+ TenantInfo tenantInfo = new TenantInfo();
+ tenantInfo.setAllowedClusters(Sets.newHashSet(testCluster));
+ pulsarAdmins[0].tenants().createTenant(testTenant, tenantInfo);
+ pulsarAdmins[0].namespaces().createNamespace(testNamespace, 16);
}
@AfterMethod(alwaysRun = true)
@@ -231,12 +241,6 @@ public class TopicOwnerTest {
@Test
public void testAcquireOwnershipWithZookeeperDisconnectedBeforeOwnershipNodeCreated() throws Exception {
- pulsarAdmins[0].clusters().createCluster("my-cluster", new ClusterData(pulsarServices[0].getWebServiceAddress()));
- TenantInfo tenantInfo = new TenantInfo();
- tenantInfo.setAllowedClusters(Sets.newHashSet("my-cluster"));
- pulsarAdmins[0].tenants().createTenant("my-tenant", tenantInfo);
- pulsarAdmins[0].namespaces().createNamespace("my-tenant/my-ns", 16);
-
String topic1 = "persistent://my-tenant/my-ns/topic-1";
NamespaceService leaderNamespaceService = leaderPulsar.getNamespaceService();
NamespaceBundle namespaceBundle = leaderNamespaceService.getBundle(TopicName.get(topic1));
@@ -286,12 +290,6 @@ public class TopicOwnerTest {
@Test
public void testAcquireOwnershipWithZookeeperDisconnectedAfterOwnershipNodeCreated() throws Exception {
- pulsarAdmins[0].clusters().createCluster("my-cluster", new ClusterData(pulsarServices[0].getWebServiceAddress()));
- TenantInfo tenantInfo = new TenantInfo();
- tenantInfo.setAllowedClusters(Sets.newHashSet("my-cluster"));
- pulsarAdmins[0].tenants().createTenant("my-tenant", tenantInfo);
- pulsarAdmins[0].namespaces().createNamespace("my-tenant/my-ns", 16);
-
String topic1 = "persistent://my-tenant/my-ns/topic-1";
NamespaceService leaderNamespaceService = leaderPulsar.getNamespaceService();
NamespaceBundle namespaceBundle = leaderNamespaceService.getBundle(TopicName.get(topic1));
@@ -340,12 +338,6 @@ public class TopicOwnerTest {
@Test
public void testReestablishOwnershipAfterInvalidateCache() throws Exception {
- pulsarAdmins[0].clusters().createCluster("my-cluster", new ClusterData(pulsarServices[0].getWebServiceAddress()));
- TenantInfo tenantInfo = new TenantInfo();
- tenantInfo.setAllowedClusters(Sets.newHashSet("my-cluster"));
- pulsarAdmins[0].tenants().createTenant("my-tenant", tenantInfo);
- pulsarAdmins[0].namespaces().createNamespace("my-tenant/my-ns", 16);
-
String topic1 = "persistent://my-tenant/my-ns/topic-1";
NamespaceService leaderNamespaceService = leaderPulsar.getNamespaceService();
NamespaceBundle namespaceBundle = leaderNamespaceService.getBundle(TopicName.get(topic1));
@@ -399,12 +391,6 @@ public class TopicOwnerTest {
@Test
public void testReleaseOwnershipWithZookeeperDisconnectedBeforeOwnershipNodeDeleted() throws Exception {
- pulsarAdmins[0].clusters().createCluster("my-cluster", new ClusterData(pulsarServices[0].getWebServiceAddress()));
- TenantInfo tenantInfo = new TenantInfo();
- tenantInfo.setAllowedClusters(Sets.newHashSet("my-cluster"));
- pulsarAdmins[0].tenants().createTenant("my-tenant", tenantInfo);
- pulsarAdmins[0].namespaces().createNamespace("my-tenant/my-ns", 16);
-
String topic1 = "persistent://my-tenant/my-ns/topic-1";
NamespaceService leaderNamespaceService = leaderPulsar.getNamespaceService();
NamespaceBundle namespaceBundle = leaderNamespaceService.getBundle(TopicName.get(topic1));
@@ -458,12 +444,6 @@ public class TopicOwnerTest {
@Test
public void testReleaseOwnershipWithZookeeperDisconnectedAfterOwnershipNodeDeleted() throws Exception {
- pulsarAdmins[0].clusters().createCluster("my-cluster", new ClusterData(pulsarServices[0].getWebServiceAddress()));
- TenantInfo tenantInfo = new TenantInfo();
- tenantInfo.setAllowedClusters(Sets.newHashSet("my-cluster"));
- pulsarAdmins[0].tenants().createTenant("my-tenant", tenantInfo);
- pulsarAdmins[0].namespaces().createNamespace("my-tenant/my-ns", 16);
-
String topic1 = "persistent://my-tenant/my-ns/topic-1";
NamespaceService leaderNamespaceService = leaderPulsar.getNamespaceService();
NamespaceBundle namespaceBundle = leaderNamespaceService.getBundle(TopicName.get(topic1));
@@ -517,12 +497,6 @@ public class TopicOwnerTest {
@Test
public void testConnectToInvalidateBundleCacheBroker() throws Exception {
- pulsarAdmins[0].clusters().createCluster("my-cluster", new ClusterData(pulsarServices[0].getWebServiceAddress()));
- TenantInfo tenantInfo = new TenantInfo();
- tenantInfo.setAllowedClusters(Sets.newHashSet("my-cluster"));
- pulsarAdmins[0].tenants().createTenant("my-tenant", tenantInfo);
- pulsarAdmins[0].namespaces().createNamespace("my-tenant/my-ns", 16);
-
Assert.assertEquals(pulsarAdmins[0].namespaces().getPolicies("my-tenant/my-ns").bundles.getNumBundles(), 16);
final String topic1 = "persistent://my-tenant/my-ns/topic-1";
@@ -554,12 +528,6 @@ public class TopicOwnerTest {
@Test
public void testLookupPartitionedTopic() throws Exception {
- pulsarAdmins[0].clusters().createCluster("my-cluster", new ClusterData(pulsarServices[0].getWebServiceAddress()));
- TenantInfo tenantInfo = new TenantInfo();
- tenantInfo.setAllowedClusters(Sets.newHashSet("my-cluster"));
- pulsarAdmins[0].tenants().createTenant("my-tenant", tenantInfo);
- pulsarAdmins[0].namespaces().createNamespace("my-tenant/my-ns", 16);
-
final int partitions = 5;
final String topic = "persistent://my-tenant/my-ns/partitionedTopic";
@@ -581,4 +549,29 @@ public class TopicOwnerTest {
}
}
+
+ @Test
+ public void testListNonPersistentTopic() throws Exception {
+ final String topicName = "non-persistent://my-tenant/my-ns/my-topic";
+ pulsarAdmins[0].topics().createPartitionedTopic(topicName, 16);
+
+ PulsarClient client = PulsarClient.builder().
+ serviceUrl(pulsarServices[0].getBrokerServiceUrl())
+ .build();
+
+ Consumer<byte[]> consumer = client.newConsumer()
+ .topic(topicName)
+ .subscriptionName("my-sub")
+ .subscribe();
+
+ List<String> topics = pulsarAdmins[0].topics().getList("my-tenant/my-ns");
+ Assert.assertEquals(topics.size(), 16);
+ for (String topic : topics) {
+ Assert.assertTrue(topic.contains("non-persistent"));
+ Assert.assertTrue(topic.contains("my-tenant/my-ns/my-topic"));
+ }
+
+ consumer.close();
+ client.close();
+ }
}