You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/09 12:46:58 UTC
[pulsar] branch branch-2.8 updated: Fix delete namespace issue. (#14619)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new 0a5d41f Fix delete namespace issue. (#14619)
0a5d41f is described below
commit 0a5d41feffd8bce885f0e65a92fe5d8a7d2c7631
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Wed Mar 9 20:44:48 2022 +0800
Fix delete namespace issue. (#14619)
---
.../apache/pulsar/broker/admin/AdminResource.java | 14 ++++--
.../pulsar/broker/admin/impl/NamespacesBase.java | 57 +++++++++++++---------
.../apache/pulsar/broker/admin/NamespacesTest.java | 23 +++++----
3 files changed, 55 insertions(+), 39 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 04e8c50..066592a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.naming.Constants;
@@ -780,12 +781,15 @@ public abstract class AdminResource extends PulsarWebResource {
}
protected void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable throwable) {
- if (throwable instanceof WebApplicationException) {
- asyncResponse.resume(throwable);
- } else if (throwable instanceof BrokerServiceException.NotAllowedException) {
- asyncResponse.resume(new RestException(Status.CONFLICT, throwable));
+ Throwable realCause = FutureUtil.unwrapCompletionException(throwable);
+ if (realCause instanceof WebApplicationException) {
+ asyncResponse.resume(realCause);
+ } else if (realCause instanceof BrokerServiceException.NotAllowedException) {
+ asyncResponse.resume(new RestException(Status.CONFLICT, realCause));
+ } else if (realCause instanceof PulsarAdminException) {
+ asyncResponse.resume(new RestException(((PulsarAdminException) realCause)));
} else {
- asyncResponse.resume(new RestException(throwable));
+ asyncResponse.resume(new RestException(realCause));
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 9b45cb0..e5580c8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -269,31 +269,44 @@ public abstract class NamespacesBase extends AdminResource {
// remove from owned namespace map and ephemeral node from ZK
final List<CompletableFuture<Void>> futures = Lists.newArrayList();
- try {
- // remove system topics first.
- if (!topics.isEmpty()) {
- for (String topic : topics) {
- pulsar().getBrokerService().getTopicIfExists(topic).whenComplete((topicOptional, ex) -> {
- topicOptional.ifPresent(systemTopic -> futures.add(systemTopic.deleteForcefully()));
- });
- }
- }
- NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
- .getBundles(namespaceName);
- for (NamespaceBundle bundle : bundles.getBundles()) {
- // check if the bundle is owned by any broker, if not then we do not need to delete the bundle
- if (pulsar().getNamespaceService().getOwner(bundle).isPresent()) {
- futures.add(pulsar().getAdminClient().namespaces()
- .deleteNamespaceBundleAsync(namespaceName.toString(), bundle.getBundleRange()));
+ // remove system topics first.
+ if (!topics.isEmpty()) {
+ for (String topic : topics) {
+ try {
+ futures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true, true));
+ } catch (Exception ex) {
+ log.error("[{}] Failed to delete system topic {}", clientAppId(), topic, ex);
+ asyncResponse.resume(new RestException(Status.INTERNAL_SERVER_ERROR, ex));
+ return;
}
}
- } catch (Exception e) {
- log.error("[{}] Failed to remove owned namespace {}", clientAppId(), namespaceName, e);
- asyncResponse.resume(new RestException(e));
- return;
}
-
- FutureUtil.waitForAll(futures).handle((result, exception) -> {
+ FutureUtil.waitForAll(futures).thenCompose(__ -> {
+ List<CompletableFuture<Void>> deleteBundleFutures = Lists.newArrayList();
+ return pulsar().getNamespaceService().getNamespaceBundleFactory()
+ .getBundlesAsync(namespaceName).thenCompose(bundles -> {
+ for (NamespaceBundle bundle : bundles.getBundles()) {
+ // check if the bundle is owned by any broker, if not then we do not need to delete
+ // the bundle
+ deleteBundleFutures.add(pulsar().getNamespaceService().getOwnerAsync(bundle)
+ .thenCompose(ownership -> {
+ if (ownership.isPresent()) {
+ try {
+ return pulsar().getAdminClient().namespaces()
+ .deleteNamespaceBundleAsync(namespaceName.toString(),
+ bundle.getBundleRange());
+ } catch (PulsarServerException e) {
+ throw new RestException(e);
+ }
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ }));
+ }
+ return FutureUtil.waitForAll(deleteBundleFutures);
+ });
+ })
+ .handle((result, exception) -> {
if (exception != null) {
if (exception.getCause() instanceof PulsarAdminException) {
asyncResponse.resume(new RestException((PulsarAdminException) exception.getCause()));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index ed04b50..1b465cb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -780,7 +780,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
@Test
public void testDeleteNamespaceWithBundles() throws Exception {
URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
- String bundledNsLocal = "test-bundled-namespace-1";
+ String bundledNsLocal = "test-delete-namespace-with-bundles";
List<String> boundaries = Lists.newArrayList("0x00000000", "0x80000000", "0xffffffff");
BundlesData bundleData = BundlesData.builder()
.boundaries(boundaries)
@@ -805,8 +805,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
return bundle.getNamespaceObject().equals(testNs);
}
}));
- doReturn(Optional.of(new NamespaceEphemeralData())).when(nsSvc)
- .getOwner(Mockito.argThat(new ArgumentMatcher<NamespaceBundle>() {
+ doReturn(CompletableFuture.completedFuture(Optional.of(new NamespaceEphemeralData()))).when(nsSvc)
+ .getOwnerAsync(Mockito.argThat(new ArgumentMatcher<NamespaceBundle>() {
@Override
public boolean matches(NamespaceBundle bundle) {
return bundle.getNamespaceObject().equals(testNs);
@@ -829,14 +829,15 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
} catch (RestException re) {
assertEquals(re.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
}
-
+ NamespaceBundles nsBundles = nsSvc.getNamespaceBundleFactory().getBundles(testNs, bundleData);
+ doReturn(Optional.empty()).when(nsSvc).getWebServiceUrl(any(NamespaceBundle.class), any(LookupOptions.class));
AsyncResponse response = mock(AsyncResponse.class);
namespaces.deleteNamespace(response, testTenant, testLocalCluster, bundledNsLocal, false, false);
ArgumentCaptor<RestException> captor = ArgumentCaptor.forClass(RestException.class);
verify(response, timeout(5000).times(1)).resume(captor.capture());
assertEquals(captor.getValue().getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
- NamespaceBundles nsBundles = nsSvc.getNamespaceBundleFactory().getBundles(testNs, bundleData);
+
// make one bundle owned
LookupOptions optionsHttps = LookupOptions.builder().authoritative(false).requestHttps(true).readOnly(false).build();
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(nsBundles.getBundles().get(0), optionsHttps);
@@ -851,19 +852,17 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
} catch (RestException re) {
assertEquals(re.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
}
-
response = mock(AsyncResponse.class);
- namespaces.deleteNamespace(response, testTenant, testLocalCluster, bundledNsLocal, false, false);
- captor = ArgumentCaptor.forClass(RestException.class);
- verify(response, timeout(5000).times(1)).resume(captor.capture());
- assertEquals(captor.getValue().getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
+ doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(any(NamespaceBundle.class), any(LookupOptions.class));
// ensure all three bundles are owned by the local broker
for (NamespaceBundle bundle : nsBundles.getBundles()) {
- doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(bundle, optionsHttps);
doReturn(true).when(nsSvc).isServiceUnitOwned(bundle);
}
- doNothing().when(namespacesAdmin).deleteNamespaceBundle(Mockito.anyString(), Mockito.anyString());
+ namespaces.deleteNamespace(response, testTenant, testLocalCluster, bundledNsLocal, false, false);
+ ArgumentCaptor<Response> captor2 = ArgumentCaptor.forClass(Response.class);
+ verify(response, timeout(5000).times(1)).resume(captor2.capture());
+ assertEquals(captor2.getValue().getStatus(), Status.NO_CONTENT.getStatusCode());
}
@Test