You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/09 12:46:58 UTC

[pulsar] branch branch-2.8 updated: Fix delete namespace issue. (#14619)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new 0a5d41f  Fix delete namespace issue. (#14619)
0a5d41f is described below

commit 0a5d41feffd8bce885f0e65a92fe5d8a7d2c7631
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Wed Mar 9 20:44:48 2022 +0800

    Fix delete namespace issue. (#14619)
---
 .../apache/pulsar/broker/admin/AdminResource.java  | 14 ++++--
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 57 +++++++++++++---------
 .../apache/pulsar/broker/admin/NamespacesTest.java | 23 +++++----
 3 files changed, 55 insertions(+), 39 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 04e8c50..066592a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.internal.TopicsImpl;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.naming.Constants;
@@ -780,12 +781,15 @@ public abstract class AdminResource extends PulsarWebResource {
     }
 
     protected void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable throwable) {
-        if (throwable instanceof WebApplicationException) {
-            asyncResponse.resume(throwable);
-        } else if (throwable instanceof BrokerServiceException.NotAllowedException) {
-            asyncResponse.resume(new RestException(Status.CONFLICT, throwable));
+        Throwable realCause = FutureUtil.unwrapCompletionException(throwable);
+        if (realCause instanceof WebApplicationException) {
+            asyncResponse.resume(realCause);
+        } else if (realCause instanceof BrokerServiceException.NotAllowedException) {
+            asyncResponse.resume(new RestException(Status.CONFLICT, realCause));
+        } else if (realCause instanceof PulsarAdminException) {
+            asyncResponse.resume(new RestException(((PulsarAdminException) realCause)));
         } else {
-            asyncResponse.resume(new RestException(throwable));
+            asyncResponse.resume(new RestException(realCause));
         }
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 9b45cb0..e5580c8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -269,31 +269,44 @@ public abstract class NamespacesBase extends AdminResource {
 
         // remove from owned namespace map and ephemeral node from ZK
         final List<CompletableFuture<Void>> futures = Lists.newArrayList();
-        try {
-            // remove system topics first.
-            if (!topics.isEmpty()) {
-                for (String topic : topics) {
-                    pulsar().getBrokerService().getTopicIfExists(topic).whenComplete((topicOptional, ex) -> {
-                        topicOptional.ifPresent(systemTopic -> futures.add(systemTopic.deleteForcefully()));
-                    });
-                }
-            }
-            NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
-                    .getBundles(namespaceName);
-            for (NamespaceBundle bundle : bundles.getBundles()) {
-                // check if the bundle is owned by any broker, if not then we do not need to delete the bundle
-                if (pulsar().getNamespaceService().getOwner(bundle).isPresent()) {
-                    futures.add(pulsar().getAdminClient().namespaces()
-                            .deleteNamespaceBundleAsync(namespaceName.toString(), bundle.getBundleRange()));
+        // remove system topics first.
+        if (!topics.isEmpty()) {
+            for (String topic : topics) {
+                try {
+                    futures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true, true));
+                } catch (Exception ex) {
+                    log.error("[{}] Failed to delete system topic {}", clientAppId(), topic, ex);
+                    asyncResponse.resume(new RestException(Status.INTERNAL_SERVER_ERROR, ex));
+                    return;
                 }
             }
-        } catch (Exception e) {
-            log.error("[{}] Failed to remove owned namespace {}", clientAppId(), namespaceName, e);
-            asyncResponse.resume(new RestException(e));
-            return;
         }
-
-        FutureUtil.waitForAll(futures).handle((result, exception) -> {
+        FutureUtil.waitForAll(futures).thenCompose(__ -> {
+            List<CompletableFuture<Void>> deleteBundleFutures = Lists.newArrayList();
+            return pulsar().getNamespaceService().getNamespaceBundleFactory()
+                    .getBundlesAsync(namespaceName).thenCompose(bundles -> {
+                        for (NamespaceBundle bundle : bundles.getBundles()) {
+                            // check if the bundle is owned by any broker, if not then we do not need to delete
+                            // the bundle
+                            deleteBundleFutures.add(pulsar().getNamespaceService().getOwnerAsync(bundle)
+                                    .thenCompose(ownership -> {
+                                if (ownership.isPresent()) {
+                                    try {
+                                        return pulsar().getAdminClient().namespaces()
+                                                .deleteNamespaceBundleAsync(namespaceName.toString(),
+                                                        bundle.getBundleRange());
+                                    } catch (PulsarServerException e) {
+                                        throw new RestException(e);
+                                    }
+                                } else {
+                                    return CompletableFuture.completedFuture(null);
+                                }
+                            }));
+                        }
+                        return FutureUtil.waitForAll(deleteBundleFutures);
+                    });
+        })
+        .handle((result, exception) -> {
             if (exception != null) {
                 if (exception.getCause() instanceof PulsarAdminException) {
                     asyncResponse.resume(new RestException((PulsarAdminException) exception.getCause()));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index ed04b50..1b465cb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -780,7 +780,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
     @Test
     public void testDeleteNamespaceWithBundles() throws Exception {
         URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
-        String bundledNsLocal = "test-bundled-namespace-1";
+        String bundledNsLocal = "test-delete-namespace-with-bundles";
         List<String> boundaries = Lists.newArrayList("0x00000000", "0x80000000", "0xffffffff");
         BundlesData bundleData = BundlesData.builder()
                 .boundaries(boundaries)
@@ -805,8 +805,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
                 return bundle.getNamespaceObject().equals(testNs);
             }
         }));
-        doReturn(Optional.of(new NamespaceEphemeralData())).when(nsSvc)
-                .getOwner(Mockito.argThat(new ArgumentMatcher<NamespaceBundle>() {
+        doReturn(CompletableFuture.completedFuture(Optional.of(new NamespaceEphemeralData()))).when(nsSvc)
+                .getOwnerAsync(Mockito.argThat(new ArgumentMatcher<NamespaceBundle>() {
                     @Override
                     public boolean matches(NamespaceBundle bundle) {
                         return bundle.getNamespaceObject().equals(testNs);
@@ -829,14 +829,15 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         } catch (RestException re) {
             assertEquals(re.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
         }
-
+        NamespaceBundles nsBundles = nsSvc.getNamespaceBundleFactory().getBundles(testNs, bundleData);
+        doReturn(Optional.empty()).when(nsSvc).getWebServiceUrl(any(NamespaceBundle.class), any(LookupOptions.class));
         AsyncResponse response = mock(AsyncResponse.class);
         namespaces.deleteNamespace(response, testTenant, testLocalCluster, bundledNsLocal, false, false);
         ArgumentCaptor<RestException> captor = ArgumentCaptor.forClass(RestException.class);
         verify(response, timeout(5000).times(1)).resume(captor.capture());
         assertEquals(captor.getValue().getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
 
-        NamespaceBundles nsBundles = nsSvc.getNamespaceBundleFactory().getBundles(testNs, bundleData);
+
         // make one bundle owned
         LookupOptions optionsHttps = LookupOptions.builder().authoritative(false).requestHttps(true).readOnly(false).build();
         doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(nsBundles.getBundles().get(0), optionsHttps);
@@ -851,19 +852,17 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         } catch (RestException re) {
             assertEquals(re.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
         }
-
         response = mock(AsyncResponse.class);
-        namespaces.deleteNamespace(response, testTenant, testLocalCluster, bundledNsLocal, false, false);
-        captor = ArgumentCaptor.forClass(RestException.class);
-        verify(response, timeout(5000).times(1)).resume(captor.capture());
-        assertEquals(captor.getValue().getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
+        doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(any(NamespaceBundle.class), any(LookupOptions.class));
 
         // ensure all three bundles are owned by the local broker
         for (NamespaceBundle bundle : nsBundles.getBundles()) {
-            doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(bundle, optionsHttps);
             doReturn(true).when(nsSvc).isServiceUnitOwned(bundle);
         }
-        doNothing().when(namespacesAdmin).deleteNamespaceBundle(Mockito.anyString(), Mockito.anyString());
+        namespaces.deleteNamespace(response, testTenant, testLocalCluster, bundledNsLocal, false, false);
+        ArgumentCaptor<Response> captor2 = ArgumentCaptor.forClass(Response.class);
+        verify(response, timeout(5000).times(1)).resume(captor2.capture());
+        assertEquals(captor2.getValue().getStatus(), Status.NO_CONTENT.getStatusCode());
     }
 
     @Test