You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/01 03:12:32 UTC

[pulsar] 05/10: Fix delete namespace issue. (#14215)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 30a2ffb7e1209c6f357b6c87a19b67619bb452a4
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Sun Feb 27 11:53:12 2022 +0800

    Fix delete namespace issue. (#14215)
    
    ### Motivation
    
    When we delete namespace with `force=false`, sometimes there may occur `Cannot delete non empty bundle`.
    Because delete system topic and delete namespace bundles are put into futures, can't keep their order.
    We should delete `system topic` first and then delete `namespace bundles`.
    
    ### Modifications
    - Use admin cli to delete system topic.
    - Delete system topic first and then delete namespace bundles
    - Make internalClearZkSources async and the code more readable.
    
    (cherry picked from commit 145f29af49074acfff1881895de06e00a11658d9)
---
 .../apache/pulsar/broker/admin/AdminResource.java  |  3 +
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 93 ++++++++++------------
 .../apache/pulsar/broker/admin/NamespacesTest.java | 24 +++---
 3 files changed, 54 insertions(+), 66 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 2aaa685..9935794 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.internal.TopicsImpl;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.naming.Constants;
@@ -756,6 +757,8 @@ public abstract class AdminResource extends PulsarWebResource {
             asyncResponse.resume(realCause);
         } else if (realCause instanceof BrokerServiceException.NotAllowedException) {
             asyncResponse.resume(new RestException(Status.CONFLICT, realCause));
+        } else if (realCause instanceof PulsarAdminException) {
+            asyncResponse.resume(new RestException(((PulsarAdminException) realCause)));
         } else {
             asyncResponse.resume(new RestException(realCause));
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index c7a2b89..382128c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -263,52 +263,55 @@ public abstract class NamespacesBase extends AdminResource {
 
         // remove from owned namespace map and ephemeral node from ZK
         final List<CompletableFuture<Void>> futures = Lists.newArrayList();
-        try {
-            // remove system topics first.
-            if (!topics.isEmpty()) {
-                for (String topic : topics) {
-                    pulsar().getBrokerService().getTopicIfExists(topic).whenComplete((topicOptional, ex) -> {
-                        topicOptional.ifPresent(systemTopic -> futures.add(systemTopic.deleteForcefully()));
-                    });
+        // remove system topics first.
+        if (!topics.isEmpty()) {
+            for (String topic : topics) {
+                try {
+                    futures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true, true));
+                } catch (Exception ex) {
+                    log.error("[{}] Failed to delete system topic {}", clientAppId(), topic, ex);
+                    asyncResponse.resume(new RestException(Status.INTERNAL_SERVER_ERROR, ex));
+                    return;
                 }
             }
+        }
+        FutureUtil.waitForAll(futures).thenCompose(__ -> {
+            List<CompletableFuture<Void>> deleteBundleFutures = Lists.newArrayList();
             NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
-                    .getBundles(namespaceName);
+                            .getBundles(namespaceName);
             for (NamespaceBundle bundle : bundles.getBundles()) {
-                // check if the bundle is owned by any broker, if not then we do not need to delete the bundle
-                if (pulsar().getNamespaceService().getOwner(bundle).isPresent()) {
-                    futures.add(pulsar().getAdminClient().namespaces()
-                            .deleteNamespaceBundleAsync(namespaceName.toString(), bundle.getBundleRange()));
-                }
-            }
-        } catch (Exception e) {
-            log.error("[{}] Failed to remove owned namespace {}", clientAppId(), namespaceName, e);
-            asyncResponse.resume(new RestException(e));
-            return;
-        }
-
-        FutureUtil.waitForAll(futures).handle((result, exception) -> {
-            if (exception != null) {
-                if (exception.getCause() instanceof PulsarAdminException) {
-                    asyncResponse.resume(new RestException((PulsarAdminException) exception.getCause()));
-                    return null;
-                } else {
-                    log.error("[{}] Failed to remove owned namespace {}", clientAppId(), namespaceName, exception);
-                    asyncResponse.resume(new RestException(exception.getCause()));
-                    return null;
-                }
+                        // check if the bundle is owned by any broker, if not then we do not need to delete the bundle
+                deleteBundleFutures.add(pulsar().getNamespaceService().getOwnerAsync(bundle).thenCompose(ownership -> {
+                    if (ownership.isPresent()) {
+                        try {
+                            return pulsar().getAdminClient().namespaces()
+                                    .deleteNamespaceBundleAsync(namespaceName.toString(), bundle.getBundleRange());
+                        } catch (PulsarServerException e) {
+                            throw new RestException(e);
+                        }
+                    } else {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                }));
             }
-
-            internalClearZkSources(asyncResponse);
-
+            return FutureUtil.waitForAll(deleteBundleFutures);
+        })
+        .thenCompose(__ -> internalClearZkSources())
+        .thenAccept(__ -> {
+            log.info("[{}] Remove namespace successfully {}", clientAppId(), namespaceName);
+            asyncResponse.resume(Response.noContent().build());
+        })
+        .exceptionally(ex -> {
+            log.error("[{}] Failed to remove namespace {}", clientAppId(), namespaceName, ex.getCause());
+            resumeAsyncResponseExceptionally(asyncResponse, ex);
             return null;
         });
     }
 
     // clear zk-node resources for deleting namespace
-    protected void internalClearZkSources(AsyncResponse asyncResponse) {
+    protected CompletableFuture<Void> internalClearZkSources() {
         // clear resource of `/namespace/{namespaceName}` for zk-node
-        namespaceResources().deleteNamespaceAsync(namespaceName)
+        return namespaceResources().deleteNamespaceAsync(namespaceName)
                 .thenCompose(ignore -> namespaceResources().getPartitionedTopicResources()
                         .clearPartitionedTopicMetadataAsync(namespaceName))
                 // clear resource for manager-ledger z-node
@@ -322,18 +325,8 @@ public abstract class NamespacesBase extends AdminResource {
                 // clear z-node of local policies
                 .thenCompose(ignore -> getLocalPolicies().deleteLocalPoliciesAsync(namespaceName))
                 // clear /loadbalance/bundle-data
-                .thenCompose(ignore -> namespaceResources().deleteBundleDataAsync(namespaceName))
-                .whenComplete((ignore, ex) -> {
-                    if (ex != null) {
-                        log.warn("[{}] Failed to remove namespace or managed-ledger for {}",
-                                clientAppId(), namespaceName, ex);
-                        asyncResponse.resume(new RestException(ex));
-                    } else {
-                        log.info("[{}] Remove namespace or managed-ledger successfully {}",
-                                clientAppId(), namespaceName);
-                        asyncResponse.resume(Response.noContent().build());
-                    }
-                });
+                .thenCompose(ignore -> namespaceResources().deleteBundleDataAsync(namespaceName));
+
     }
 
     @SuppressWarnings("deprecation")
@@ -499,7 +492,7 @@ public abstract class NamespacesBase extends AdminResource {
             return;
         }
 
-        FutureUtil.waitForAll(bundleFutures).handle((result, exception) -> {
+        FutureUtil.waitForAll(bundleFutures).thenCompose(__ -> internalClearZkSources()).handle((result, exception) -> {
             if (exception != null) {
                 if (exception.getCause() instanceof PulsarAdminException) {
                     asyncResponse.resume(new RestException((PulsarAdminException) exception.getCause()));
@@ -511,9 +504,7 @@ public abstract class NamespacesBase extends AdminResource {
                     return null;
                 }
             }
-
-            internalClearZkSources(asyncResponse);
-
+            asyncResponse.resume(Response.noContent().build());
             return null;
         });
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index b8ad2f2..7bd2607 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -774,7 +774,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
     @Test
     public void testDeleteNamespaceWithBundles() throws Exception {
         URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
-        String bundledNsLocal = "test-bundled-namespace-1";
+        String bundledNsLocal = "test-delete-namespace-with-bundles";
         List<String> boundaries = Lists.newArrayList("0x00000000", "0x80000000", "0xffffffff");
         BundlesData bundleData = BundlesData.builder()
                 .boundaries(boundaries)
@@ -799,8 +799,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
                 return bundle.getNamespaceObject().equals(testNs);
             }
         }));
-        doReturn(Optional.of(mock(NamespaceEphemeralData.class))).when(nsSvc)
-                .getOwner(Mockito.argThat(new ArgumentMatcher<NamespaceBundle>() {
+        doReturn(CompletableFuture.completedFuture(Optional.of(mock(NamespaceEphemeralData.class)))).when(nsSvc)
+                .getOwnerAsync(Mockito.argThat(new ArgumentMatcher<NamespaceBundle>() {
                     @Override
                     public boolean matches(NamespaceBundle bundle) {
                         return bundle.getNamespaceObject().equals(testNs);
@@ -823,21 +823,19 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         } catch (RestException re) {
             assertEquals(re.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
         }
-
+        NamespaceBundles nsBundles = nsSvc.getNamespaceBundleFactory().getBundles(testNs, bundleData);
+        doReturn(Optional.empty()).when(nsSvc).getWebServiceUrl(any(NamespaceBundle.class), any(LookupOptions.class));
         AsyncResponse response = mock(AsyncResponse.class);
         namespaces.deleteNamespace(response, testTenant, testLocalCluster, bundledNsLocal, false, false);
         ArgumentCaptor<RestException> captor = ArgumentCaptor.forClass(RestException.class);
         verify(response, timeout(5000).times(1)).resume(captor.capture());
         assertEquals(captor.getValue().getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
-
-        NamespaceBundles nsBundles = nsSvc.getNamespaceBundleFactory().getBundles(testNs, bundleData);
         // make one bundle owned
         LookupOptions optionsHttps = LookupOptions.builder().authoritative(false).requestHttps(true).readOnly(false).build();
         doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(nsBundles.getBundles().get(0), optionsHttps);
         doReturn(true).when(nsSvc).isServiceUnitOwned(nsBundles.getBundles().get(0));
         doReturn(CompletableFuture.completedFuture(null)).when(namespacesAdmin).deleteNamespaceBundleAsync(
                 testTenant + "/" + testLocalCluster + "/" + bundledNsLocal, "0x00000000_0x80000000");
-
         try {
             namespaces.deleteNamespaceBundle(testTenant, testLocalCluster, bundledNsLocal, "0x80000000_0xffffffff",
                     false, false);
@@ -845,19 +843,15 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         } catch (RestException re) {
             assertEquals(re.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
         }
-
         response = mock(AsyncResponse.class);
         namespaces.deleteNamespace(response, testTenant, testLocalCluster, bundledNsLocal, false, false);
-        captor = ArgumentCaptor.forClass(RestException.class);
-        verify(response, timeout(5000).times(1)).resume(captor.capture());
-        assertEquals(captor.getValue().getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
-
-        // ensure all three bundles are owned by the local broker
+        doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(any(NamespaceBundle.class), any(LookupOptions.class));
         for (NamespaceBundle bundle : nsBundles.getBundles()) {
-            doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(bundle, optionsHttps);
             doReturn(true).when(nsSvc).isServiceUnitOwned(bundle);
         }
-        doNothing().when(namespacesAdmin).deleteNamespaceBundle(Mockito.anyString(), Mockito.anyString());
+        ArgumentCaptor<Response> captor2 = ArgumentCaptor.forClass(Response.class);
+        verify(response, timeout(5000).times(1)).resume(captor2.capture());
+        assertEquals(captor2.getValue().getStatus(), Status.NO_CONTENT.getStatusCode());
     }
 
     @Test