You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/09/21 23:12:57 UTC

[pulsar] branch master updated: Added support to force deleting namespace (#7993)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7505dac  Added support to force deleting namespace (#7993)
7505dac is described below

commit 7505dacf2f731d69d42ca02eb3ce6a2159816995
Author: Fangbin Sun <su...@gmail.com>
AuthorDate: Tue Sep 22 07:12:38 2020 +0800

    Added support to force deleting namespace (#7993)
    
    ### Motivation
    
    Fixes #7959
    
    ### Modifications
    
    Add a optional field to force the deletion of all stuffs related to namespace.
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 234 +++++++++++++++++++++
 .../apache/pulsar/broker/admin/v1/Namespaces.java  |   6 +-
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |   6 +-
 .../org/apache/pulsar/broker/admin/AdminTest.java  |   2 +-
 .../apache/pulsar/broker/admin/NamespacesTest.java |  24 +--
 .../org/apache/pulsar/client/admin/Namespaces.java |  72 +++++++
 .../client/admin/internal/NamespacesImpl.java      |  40 ++++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |   2 +-
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java |   8 +-
 9 files changed, 374 insertions(+), 20 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index a1d7e3f..030ebe1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -97,6 +97,7 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.ZKUtil;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -145,6 +146,14 @@ public abstract class NamespacesBase extends AdminResource {
         }
     }
 
+    protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean authoritative, boolean force) {
+        if (force) {
+            internalDeleteNamespaceForcefully(asyncResponse, authoritative);
+        } else {
+            internalDeleteNamespace(asyncResponse, authoritative);
+        }
+    }
+
     @SuppressWarnings("deprecation")
     protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean authoritative) {
         validateTenantOperation(namespaceName.getTenant(), TenantOperation.DELETE_NAMESPACE);
@@ -310,6 +319,167 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     @SuppressWarnings("deprecation")
+    protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, boolean authoritative) {
+        validateTenantOperation(namespaceName.getTenant(), TenantOperation.DELETE_NAMESPACE);
+        validatePoliciesReadOnlyAccess();
+
+        // ensure that non-global namespace is directed to the correct cluster
+        if (!namespaceName.isGlobal()) {
+            validateClusterOwnership(namespaceName.getCluster());
+        }
+
+        Entry<Policies, Stat> policiesNode = null;
+        Policies policies = null;
+
+        // ensure the local cluster is the only cluster for the global namespace configuration
+        try {
+            policiesNode = policiesCache().getWithStat(path(POLICIES, namespaceName.toString())).orElseThrow(
+                    () -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist."));
+
+            policies = policiesNode.getKey();
+            if (namespaceName.isGlobal()) {
+                if (policies.replication_clusters.size() > 1) {
+                    // There are still more than one clusters configured for the global namespace
+                    throw new RestException(Status.PRECONDITION_FAILED, "Cannot delete the global namespace "
+                            + namespaceName + ". There are still more than one replication clusters configured.");
+                }
+                if (policies.replication_clusters.size() == 1
+                        && !policies.replication_clusters.contains(config().getClusterName())) {
+                    // the only replication cluster is other cluster, redirect
+                    String replCluster = Lists.newArrayList(policies.replication_clusters).get(0);
+                    ClusterData replClusterData = clustersCache().get(AdminResource.path("clusters", replCluster))
+                            .orElseThrow(() -> new RestException(Status.NOT_FOUND,
+                                    "Cluster " + replCluster + " does not exist"));
+                    URL replClusterUrl;
+                    if (!config().isTlsEnabled() || !isRequestHttps()) {
+                        replClusterUrl = new URL(replClusterData.getServiceUrl());
+                    } else if (StringUtils.isNotBlank(replClusterData.getServiceUrlTls())) {
+                        replClusterUrl = new URL(replClusterData.getServiceUrlTls());
+                    } else {
+                        throw new RestException(Status.PRECONDITION_FAILED,
+                                "The replication cluster does not provide TLS encrypted service");
+                    }
+                    URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(replClusterUrl.getHost())
+                            .port(replClusterUrl.getPort()).replaceQueryParam("authoritative", false).build();
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Redirecting the rest call to {}: cluster={}", clientAppId(), redirect,
+                                replCluster);
+                    }
+                    throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
+                }
+            }
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+            return;
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+            return;
+        }
+
+        List<String> topics;
+        try {
+            topics = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join();
+            topics.addAll(getPartitionedTopicList(TopicDomain.persistent));
+            topics.addAll(getPartitionedTopicList(TopicDomain.non_persistent));
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+            return;
+        }
+
+        // set the policies to deleted so that somebody else cannot acquire this namespace
+        try {
+            policies.deleted = true;
+            globalZk().setData(path(POLICIES, namespaceName.toString()), jsonMapper().writeValueAsBytes(policies),
+                    policiesNode.getValue().getVersion());
+            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+        } catch (Exception e) {
+            log.error("[{}] Failed to delete namespace on global ZK {}", clientAppId(), namespaceName, e);
+            asyncResponse.resume(new RestException(e));
+            return;
+        }
+
+        // remove from owned namespace map and ephemeral node from ZK
+        final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+        try {
+            // firstly remove all topics including system topics
+            if (!topics.isEmpty()) {
+                for (String topic : topics) {
+                    pulsar().getBrokerService().getTopicIfExists(topic).whenComplete((topicOptional, ex) -> {
+                        topicOptional.ifPresent(tp -> futures.add(tp.deleteForcefully()));
+                    });
+                }
+            }
+            // forcefully delete namespace bundles
+            NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
+                    .getBundles(namespaceName);
+            for (NamespaceBundle bundle : bundles.getBundles()) {
+                // check if the bundle is owned by any broker, if not then we do not need to delete the bundle
+                if (pulsar().getNamespaceService().getOwner(bundle).isPresent()) {
+                    futures.add(pulsar().getAdminClient().namespaces()
+                            .deleteNamespaceBundleAsync(namespaceName.toString(), bundle.getBundleRange(), true));
+                }
+            }
+        } catch (Exception e) {
+            log.error("[{}] Failed to remove owned namespace {}", clientAppId(), namespaceName, e);
+            asyncResponse.resume(new RestException(e));
+            return;
+        }
+
+        FutureUtil.waitForAll(futures).handle((result, exception) -> {
+            if (exception != null) {
+                if (exception.getCause() instanceof PulsarAdminException) {
+                    asyncResponse.resume(new RestException((PulsarAdminException) exception.getCause()));
+                    return null;
+                } else {
+                    log.error("[{}] Failed to remove owned namespace {}", clientAppId(), namespaceName, exception);
+                    asyncResponse.resume(new RestException(exception.getCause()));
+                    return null;
+                }
+            }
+
+            try {
+                // remove partitioned topics znode
+                final String globalPartitionedPath = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString());
+                // check whether partitioned topics znode exist
+                if (zkPathExists(globalPartitionedPath)) {
+                    ZKUtil.deleteRecursive(globalZk(), globalPartitionedPath);
+                    policiesCache().invalidate(globalPartitionedPath);
+                }
+
+                // we have successfully removed all the ownership for the namespace, the policies znode can be deleted
+                // now
+                final String globalZkPolicyPath = path(POLICIES, namespaceName.toString());
+                final String lcaolZkPolicyPath = joinPath(LOCAL_POLICIES_ROOT, namespaceName.toString());
+                globalZk().delete(globalZkPolicyPath, -1);
+
+                try {
+                    localZk().delete(lcaolZkPolicyPath, -1);
+                } catch (NoNodeException nne) {
+                    // If the z-node with the modified information is not there anymore, we're already good
+                }
+
+                policiesCache().invalidate(globalZkPolicyPath);
+                localCacheService().policiesCache().invalidate(lcaolZkPolicyPath);
+            } catch (Exception e) {
+                log.error("[{}] Failed to remove owned namespace {} from ZK", clientAppId(), namespaceName, e);
+                asyncResponse.resume(new RestException(e));
+                return null;
+            }
+
+            asyncResponse.resume(Response.noContent().build());
+            return null;
+        });
+    }
+
+    protected void internalDeleteNamespaceBundle(String bundleRange, boolean authoritative, boolean force) {
+        if (force) {
+            internalDeleteNamespaceBundleForcefully(bundleRange, authoritative);
+        } else {
+            internalDeleteNamespaceBundle(bundleRange, authoritative);
+        }
+    }
+
+    @SuppressWarnings("deprecation")
     protected void internalDeleteNamespaceBundle(String bundleRange, boolean authoritative) {
         validateNamespaceOperation(namespaceName, NamespaceOperation.DELETE_BUNDLE);
         validatePoliciesReadOnlyAccess();
@@ -381,6 +551,70 @@ public abstract class NamespacesBase extends AdminResource {
         }
     }
 
+    @SuppressWarnings("deprecation")
+    protected void internalDeleteNamespaceBundleForcefully(String bundleRange, boolean authoritative) {
+        validateNamespaceOperation(namespaceName, NamespaceOperation.DELETE_BUNDLE);
+        validatePoliciesReadOnlyAccess();
+
+        // ensure that non-global namespace is directed to the correct cluster
+        if (!namespaceName.isGlobal()) {
+            validateClusterOwnership(namespaceName.getCluster());
+        }
+
+        Policies policies = getNamespacePolicies(namespaceName);
+        // ensure the local cluster is the only cluster for the global namespace configuration
+        try {
+            if (namespaceName.isGlobal()) {
+                if (policies.replication_clusters.size() > 1) {
+                    // There are still more than one clusters configured for the global namespace
+                    throw new RestException(Status.PRECONDITION_FAILED, "Cannot delete the global namespace "
+                            + namespaceName + ". There are still more than one replication clusters configured.");
+                }
+                if (policies.replication_clusters.size() == 1
+                        && !policies.replication_clusters.contains(config().getClusterName())) {
+                    // the only replication cluster is other cluster, redirect
+                    String replCluster = Lists.newArrayList(policies.replication_clusters).get(0);
+                    ClusterData replClusterData = clustersCache().get(AdminResource.path("clusters", replCluster))
+                            .orElseThrow(() -> new RestException(Status.NOT_FOUND,
+                                    "Cluser " + replCluster + " does not exist"));
+                    URL replClusterUrl;
+                    if (!config().isTlsEnabled() || !isRequestHttps()) {
+                        replClusterUrl = new URL(replClusterData.getServiceUrl());
+                    } else if (StringUtils.isNotBlank(replClusterData.getServiceUrlTls())) {
+                        replClusterUrl = new URL(replClusterData.getServiceUrlTls());
+                    } else {
+                        throw new RestException(Status.PRECONDITION_FAILED,
+                                "The replication cluster does not provide TLS encrypted service");
+                    }
+                    URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(replClusterUrl.getHost())
+                            .port(replClusterUrl.getPort()).replaceQueryParam("authoritative", false).build();
+                    if(log.isDebugEnabled()) {
+                        log.debug("[{}] Redirecting the rest call to {}: cluster={}", clientAppId(), redirect, replCluster);
+                    }
+                    throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
+                }
+            }
+        } catch (WebApplicationException wae) {
+            throw wae;
+        } catch (Exception e) {
+            throw new RestException(e);
+        }
+
+        NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
+                authoritative, true);
+
+        try {
+            // directly remove from owned namespace map and ephemeral node from ZK
+            pulsar().getNamespaceService().removeOwnedServiceUnit(bundle);
+        } catch (WebApplicationException wae) {
+            throw wae;
+        } catch (Exception e) {
+            log.error("[{}] Failed to remove namespace bundle {}/{}", clientAppId(), namespaceName.toString(),
+                    bundleRange, e);
+            throw new RestException(e);
+        }
+    }
+
     protected void internalGrantPermissionOnNamespace(String role, Set<AuthAction> actions) {
         validateNamespaceOperation(namespaceName, NamespaceOperation.GRANT_PERMISSION);
         checkNotNull(role, "Role should not be null");
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 1082429..3fc076b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -196,10 +196,11 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 409, message = "Namespace is not empty") })
     public void deleteNamespace(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property,
             @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
+            @QueryParam("force") @DefaultValue("false") boolean force,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         try {
             validateNamespaceName(property, cluster, namespace);
-            internalDeleteNamespace(asyncResponse, authoritative);
+            internalDeleteNamespace(asyncResponse, authoritative, force);
         } catch (WebApplicationException wae) {
             asyncResponse.resume(wae);
         } catch (Exception e) {
@@ -218,9 +219,10 @@ public class Namespaces extends NamespacesBase {
     public void deleteNamespaceBundle(@PathParam("property") String property,
             @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
             @PathParam("bundle") String bundleRange,
+            @QueryParam("force") @DefaultValue("false") boolean force,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateNamespaceName(property, cluster, namespace);
-        internalDeleteNamespaceBundle(bundleRange, authoritative);
+        internalDeleteNamespaceBundle(bundleRange, authoritative, force);
     }
 
     @GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index f162207..2f6b47b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -148,10 +148,11 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 409, message = "Namespace is not empty") })
     public void deleteNamespace(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
+            @QueryParam("force") @DefaultValue("false") boolean force,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         try {
             validateNamespaceName(tenant, namespace);
-            internalDeleteNamespace(asyncResponse, authoritative);
+            internalDeleteNamespace(asyncResponse, authoritative, force);
         } catch (WebApplicationException wae) {
             asyncResponse.resume(wae);
         } catch (Exception e) {
@@ -169,9 +170,10 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 409, message = "Namespace bundle is not empty") })
     public void deleteNamespaceBundle(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
             @PathParam("bundle") String bundleRange,
+            @QueryParam("force") @DefaultValue("false") boolean force,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateNamespaceName(tenant, namespace);
-        internalDeleteNamespaceBundle(bundleRange, authoritative);
+        internalDeleteNamespaceBundle(bundleRange, authoritative, force);
     }
 
     @GET
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 4a5cfdb..33891d6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -588,7 +588,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
         }
 
         AsyncResponse response = mock(AsyncResponse.class);
-        namespaces.deleteNamespace(response, "my-tenant", "use", "my-namespace", false);
+        namespaces.deleteNamespace(response, "my-tenant", "use", "my-namespace", false, false);
         ArgumentCaptor<Response> captor = ArgumentCaptor.forClass(Response.class);
         verify(response, timeout(5000).times(1)).resume(captor.capture());
         assertEquals(captor.getValue().getStatus(), Status.NO_CONTENT.getStatusCode());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 5158352..d4d7f88 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -611,7 +611,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
 
         AsyncResponse response = mock(AsyncResponse.class);
         namespaces.deleteNamespace(response, this.testTenant, this.testOtherCluster,
-                this.testLocalNamespaces.get(2).getLocalName(), false);
+                this.testLocalNamespaces.get(2).getLocalName(), false, false);
         ArgumentCaptor<WebApplicationException> captor = ArgumentCaptor.forClass(WebApplicationException.class);
         verify(response, timeout(5000).times(1)).resume(captor.capture());
         assertEquals(captor.getValue().getResponse().getStatus(), Status.TEMPORARY_REDIRECT.getStatusCode());
@@ -656,7 +656,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
 
         response = mock(AsyncResponse.class);
         namespaces.deleteNamespace(response, this.testLocalNamespaces.get(2).getTenant(),
-                this.testLocalNamespaces.get(2).getCluster(), this.testLocalNamespaces.get(2).getLocalName(), false);
+                this.testLocalNamespaces.get(2).getCluster(), this.testLocalNamespaces.get(2).getLocalName(), false, false);
         captor = ArgumentCaptor.forClass(WebApplicationException.class);
         verify(response, timeout(5000).times(1)).resume(captor.capture());
         assertEquals(captor.getValue().getResponse().getStatus(), Status.TEMPORARY_REDIRECT.getStatusCode());
@@ -667,7 +667,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
     @Test
     public void testDeleteNamespaces() throws Exception {
         AsyncResponse response = mock(AsyncResponse.class);
-        namespaces.deleteNamespace(response, this.testTenant, this.testLocalCluster, "non-existing-namespace-1", false);
+        namespaces.deleteNamespace(response, this.testTenant, this.testLocalCluster, "non-existing-namespace-1", false, false);
         ArgumentCaptor<RestException> errorCaptor = ArgumentCaptor.forClass(RestException.class);
         verify(response, timeout(5000).times(1)).resume(errorCaptor.capture());
         assertEquals(errorCaptor.getValue().getResponse().getStatus(), Status.NOT_FOUND.getStatusCode());
@@ -684,7 +684,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         doReturn(true).when(nsSvc).isServiceUnitOwned(testNs);
 
         response = mock(AsyncResponse.class);
-        namespaces.deleteNamespace(response, testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), false);
+        namespaces.deleteNamespace(response, testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), false, false);
         errorCaptor = ArgumentCaptor.forClass(RestException.class);
         // Ok, namespace not empty
         verify(response, timeout(5000).times(1)).resume(errorCaptor.capture());
@@ -698,7 +698,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
                 new byte[0], null, null);
 
         response = mock(AsyncResponse.class);
-        namespaces.deleteNamespace(response, testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), false);
+        namespaces.deleteNamespace(response, testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), false, false);
         errorCaptor = ArgumentCaptor.forClass(RestException.class);
         // Ok, namespace not empty
         verify(response, timeout(5000).times(1)).resume(errorCaptor.capture());
@@ -711,7 +711,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testNs, options);
         doReturn(true).when(nsSvc).isServiceUnitOwned(testNs);
         response = mock(AsyncResponse.class);
-        namespaces.deleteNamespace(response, testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), false);
+        namespaces.deleteNamespace(response, testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), false, false);
         ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
         assertEquals(responseCaptor.getValue().getStatus(), Status.NO_CONTENT.getStatusCode());
@@ -721,7 +721,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testNs, options);
         doReturn(true).when(nsSvc).isServiceUnitOwned(testNs);
         response = mock(AsyncResponse.class);
-        namespaces.deleteNamespace(response, testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), false);
+        namespaces.deleteNamespace(response, testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), false, false);
         responseCaptor = ArgumentCaptor.forClass(Response.class);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
         assertEquals(responseCaptor.getValue().getStatus(), Status.NO_CONTENT.getStatusCode());
@@ -737,7 +737,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testNs, options);
         doReturn(true).when(nsSvc).isServiceUnitOwned(testNs);
         response = mock(AsyncResponse.class);
-        namespaces.deleteNamespace(response, testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), false);
+        namespaces.deleteNamespace(response, testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), false, false);
         responseCaptor = ArgumentCaptor.forClass(Response.class);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
         assertEquals(responseCaptor.getValue().getStatus(), Status.NO_CONTENT.getStatusCode());
@@ -783,14 +783,14 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
 
         try {
             namespaces.deleteNamespaceBundle(testTenant, testLocalCluster, bundledNsLocal, "0x00000000_0x80000000",
-                    false);
+                    false, false);
             fail("Should have failed");
         } catch (RestException re) {
             assertEquals(re.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
         }
 
         AsyncResponse response = mock(AsyncResponse.class);
-        namespaces.deleteNamespace(response, testTenant, testLocalCluster, bundledNsLocal, false);
+        namespaces.deleteNamespace(response, testTenant, testLocalCluster, bundledNsLocal, false, false);
         ArgumentCaptor<RestException> captor = ArgumentCaptor.forClass(RestException.class);
         verify(response, timeout(5000).times(1)).resume(captor.capture());
         assertEquals(captor.getValue().getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
@@ -805,14 +805,14 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
 
         try {
             namespaces.deleteNamespaceBundle(testTenant, testLocalCluster, bundledNsLocal, "0x80000000_0xffffffff",
-                    false);
+                    false, false);
             fail("Should have failed");
         } catch (RestException re) {
             assertEquals(re.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
         }
 
         response = mock(AsyncResponse.class);
-        namespaces.deleteNamespace(response, testTenant, testLocalCluster, bundledNsLocal, false);
+        namespaces.deleteNamespace(response, testTenant, testLocalCluster, bundledNsLocal, false, false);
         captor = ArgumentCaptor.forClass(RestException.class);
         verify(response, timeout(5000).times(1)).resume(captor.capture());
         assertEquals(captor.getValue().getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index f4315e5..eb397ab 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -417,6 +417,27 @@ public interface Namespaces {
     void deleteNamespace(String namespace) throws PulsarAdminException;
 
     /**
+     * Delete an existing namespace.
+     * <p/>
+     * Force flag deletes namespace forcefully by force deleting all topics under it.
+     *
+     * @param namespace
+     *            Namespace name
+     * @param force
+     *            Delete namespace forcefully
+     *
+     * @throws NotAuthorizedException
+     *             You don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws ConflictException
+     *             Namespace is not empty
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void deleteNamespace(String namespace, boolean force) throws PulsarAdminException;
+
+    /**
      * Delete an existing namespace asynchronously.
      * <p/>
      * The namespace needs to be empty.
@@ -427,6 +448,18 @@ public interface Namespaces {
     CompletableFuture<Void> deleteNamespaceAsync(String namespace);
 
     /**
+     * Delete an existing namespace asynchronously.
+     * <p/>
+     * Force flag deletes namespace forcefully by force deleting all topics under it.
+     *
+     * @param namespace
+     *            Namespace name
+     * @param force
+     *            Delete namespace forcefully
+     */
+    CompletableFuture<Void> deleteNamespaceAsync(String namespace, boolean force);
+
+    /**
      * Delete an existing bundle in a namespace.
      * <p/>
      * The bundle needs to be empty.
@@ -448,6 +481,29 @@ public interface Namespaces {
     void deleteNamespaceBundle(String namespace, String bundleRange) throws PulsarAdminException;
 
     /**
+     * Delete an existing bundle in a namespace.
+     * <p/>
+     * Force flag deletes bundle forcefully.
+     *
+     * @param namespace
+     *            Namespace name
+     * @param bundleRange
+     *            range of the bundle
+     * @param force
+     *            Delete bundle forcefully
+     *
+     * @throws NotAuthorizedException
+     *             You don't have admin permission
+     * @throws NotFoundException
+     *             Namespace/bundle does not exist
+     * @throws ConflictException
+     *             Bundle is not empty
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void deleteNamespaceBundle(String namespace, String bundleRange, boolean force) throws PulsarAdminException;
+
+    /**
      * Delete an existing bundle in a namespace asynchronously.
      * <p/>
      * The bundle needs to be empty.
@@ -462,6 +518,22 @@ public interface Namespaces {
     CompletableFuture<Void> deleteNamespaceBundleAsync(String namespace, String bundleRange);
 
     /**
+     * Delete an existing bundle in a namespace asynchronously.
+     * <p/>
+     * Force flag deletes bundle forcefully.
+     *
+     * @param namespace
+     *            Namespace name
+     * @param bundleRange
+     *            range of the bundle
+     * @param force
+     *            Delete bundle forcefully
+     *
+     * @return a future that can be used to track when the bundle is deleted
+     */
+    CompletableFuture<Void> deleteNamespaceBundleAsync(String namespace, String bundleRange, boolean force);
+
+    /**
      * Get permissions on a namespace.
      * <p/>
      * Retrieve the permissions for a namespace.
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 83c6ce4..11269d0 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -332,9 +332,29 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
     }
 
     @Override
+    public void deleteNamespace(String namespace, boolean force) throws PulsarAdminException {
+        try {
+            deleteNamespaceAsync(namespace, force).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
     public CompletableFuture<Void> deleteNamespaceAsync(String namespace) {
+        return deleteNamespaceAsync(namespace, false);
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteNamespaceAsync(String namespace, boolean force) {
         NamespaceName ns = NamespaceName.get(namespace);
         WebTarget path = namespacePath(ns);
+        path = path.queryParam("force", force);
         return asyncDeleteRequest(path);
     }
 
@@ -353,9 +373,29 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
     }
 
     @Override
+    public void deleteNamespaceBundle(String namespace, String bundleRange, boolean force) throws PulsarAdminException {
+        try {
+            deleteNamespaceBundleAsync(namespace, bundleRange, force).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
     public CompletableFuture<Void> deleteNamespaceBundleAsync(String namespace, String bundleRange) {
+        return deleteNamespaceBundleAsync(namespace, bundleRange, false);
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteNamespaceBundleAsync(String namespace, String bundleRange, boolean force) {
         NamespaceName ns = NamespaceName.get(namespace);
         WebTarget path = namespacePath(ns, bundleRange);
+        path = path.queryParam("force", force);
         return asyncDeleteRequest(path);
     }
 
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 965887d..1f18e83 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -274,7 +274,7 @@ public class PulsarAdminToolTest {
         verify(mockNamespaces).createNamespace("myprop/clust/ns1");
 
         namespaces.run(split("delete myprop/clust/ns1"));
-        verify(mockNamespaces).deleteNamespace("myprop/clust/ns1");
+        verify(mockNamespaces).deleteNamespace("myprop/clust/ns1", false);
 
         namespaces.run(split("permissions myprop/clust/ns1"));
         verify(mockNamespaces).getPermissions("myprop/clust/ns1");
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 661a56a..0f4f86f 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -165,15 +165,19 @@ public class CmdNamespaces extends CmdBase {
         }
     }
 
-    @Parameters(commandDescription = "Deletes a namespace. The namespace needs to be empty")
+    @Parameters(commandDescription = "Deletes a namespace.")
     private class Delete extends CliCommand {
         @Parameter(description = "tenant/namespace\n", required = true)
         private java.util.List<String> params;
 
+        @Parameter(names = { "-f",
+                "--force" }, description = "Delete namespace forcefully by force deleting all topics under it")
+        private boolean force = false;
+
         @Override
         void run() throws PulsarAdminException {
             String namespace = validateNamespace(params);
-            admin.namespaces().deleteNamespace(namespace);
+            admin.namespaces().deleteNamespace(namespace, force);
         }
     }