You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/09/21 23:12:57 UTC
[pulsar] branch master updated: Added support to force deleting
namespace (#7993)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7505dac Added support to force deleting namespace (#7993)
7505dac is described below
commit 7505dacf2f731d69d42ca02eb3ce6a2159816995
Author: Fangbin Sun <su...@gmail.com>
AuthorDate: Tue Sep 22 07:12:38 2020 +0800
Added support to force deleting namespace (#7993)
### Motivation
Fixes #7959
### Modifications
Add a optional field to force the deletion of all stuffs related to namespace.
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 234 +++++++++++++++++++++
.../apache/pulsar/broker/admin/v1/Namespaces.java | 6 +-
.../apache/pulsar/broker/admin/v2/Namespaces.java | 6 +-
.../org/apache/pulsar/broker/admin/AdminTest.java | 2 +-
.../apache/pulsar/broker/admin/NamespacesTest.java | 24 +--
.../org/apache/pulsar/client/admin/Namespaces.java | 72 +++++++
.../client/admin/internal/NamespacesImpl.java | 40 ++++
.../pulsar/admin/cli/PulsarAdminToolTest.java | 2 +-
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 8 +-
9 files changed, 374 insertions(+), 20 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index a1d7e3f..030ebe1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -97,6 +97,7 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.ZKUtil;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -145,6 +146,14 @@ public abstract class NamespacesBase extends AdminResource {
}
}
+ protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean authoritative, boolean force) {
+ if (force) {
+ internalDeleteNamespaceForcefully(asyncResponse, authoritative);
+ } else {
+ internalDeleteNamespace(asyncResponse, authoritative);
+ }
+ }
+
@SuppressWarnings("deprecation")
protected void internalDeleteNamespace(AsyncResponse asyncResponse, boolean authoritative) {
validateTenantOperation(namespaceName.getTenant(), TenantOperation.DELETE_NAMESPACE);
@@ -310,6 +319,167 @@ public abstract class NamespacesBase extends AdminResource {
}
@SuppressWarnings("deprecation")
+ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, boolean authoritative) {
+ validateTenantOperation(namespaceName.getTenant(), TenantOperation.DELETE_NAMESPACE);
+ validatePoliciesReadOnlyAccess();
+
+ // ensure that non-global namespace is directed to the correct cluster
+ if (!namespaceName.isGlobal()) {
+ validateClusterOwnership(namespaceName.getCluster());
+ }
+
+ Entry<Policies, Stat> policiesNode = null;
+ Policies policies = null;
+
+ // ensure the local cluster is the only cluster for the global namespace configuration
+ try {
+ policiesNode = policiesCache().getWithStat(path(POLICIES, namespaceName.toString())).orElseThrow(
+ () -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist."));
+
+ policies = policiesNode.getKey();
+ if (namespaceName.isGlobal()) {
+ if (policies.replication_clusters.size() > 1) {
+ // There are still more than one clusters configured for the global namespace
+ throw new RestException(Status.PRECONDITION_FAILED, "Cannot delete the global namespace "
+ + namespaceName + ". There are still more than one replication clusters configured.");
+ }
+ if (policies.replication_clusters.size() == 1
+ && !policies.replication_clusters.contains(config().getClusterName())) {
+ // the only replication cluster is other cluster, redirect
+ String replCluster = Lists.newArrayList(policies.replication_clusters).get(0);
+ ClusterData replClusterData = clustersCache().get(AdminResource.path("clusters", replCluster))
+ .orElseThrow(() -> new RestException(Status.NOT_FOUND,
+ "Cluster " + replCluster + " does not exist"));
+ URL replClusterUrl;
+ if (!config().isTlsEnabled() || !isRequestHttps()) {
+ replClusterUrl = new URL(replClusterData.getServiceUrl());
+ } else if (StringUtils.isNotBlank(replClusterData.getServiceUrlTls())) {
+ replClusterUrl = new URL(replClusterData.getServiceUrlTls());
+ } else {
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "The replication cluster does not provide TLS encrypted service");
+ }
+ URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(replClusterUrl.getHost())
+ .port(replClusterUrl.getPort()).replaceQueryParam("authoritative", false).build();
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Redirecting the rest call to {}: cluster={}", clientAppId(), redirect,
+ replCluster);
+ }
+ throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
+ }
+ }
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
+ return;
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
+
+ List<String> topics;
+ try {
+ topics = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join();
+ topics.addAll(getPartitionedTopicList(TopicDomain.persistent));
+ topics.addAll(getPartitionedTopicList(TopicDomain.non_persistent));
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
+
+ // set the policies to deleted so that somebody else cannot acquire this namespace
+ try {
+ policies.deleted = true;
+ globalZk().setData(path(POLICIES, namespaceName.toString()), jsonMapper().writeValueAsBytes(policies),
+ policiesNode.getValue().getVersion());
+ policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+ } catch (Exception e) {
+ log.error("[{}] Failed to delete namespace on global ZK {}", clientAppId(), namespaceName, e);
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
+
+ // remove from owned namespace map and ephemeral node from ZK
+ final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+ try {
+ // firstly remove all topics including system topics
+ if (!topics.isEmpty()) {
+ for (String topic : topics) {
+ pulsar().getBrokerService().getTopicIfExists(topic).whenComplete((topicOptional, ex) -> {
+ topicOptional.ifPresent(tp -> futures.add(tp.deleteForcefully()));
+ });
+ }
+ }
+ // forcefully delete namespace bundles
+ NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
+ .getBundles(namespaceName);
+ for (NamespaceBundle bundle : bundles.getBundles()) {
+ // check if the bundle is owned by any broker, if not then we do not need to delete the bundle
+ if (pulsar().getNamespaceService().getOwner(bundle).isPresent()) {
+ futures.add(pulsar().getAdminClient().namespaces()
+ .deleteNamespaceBundleAsync(namespaceName.toString(), bundle.getBundleRange(), true));
+ }
+ }
+ } catch (Exception e) {
+ log.error("[{}] Failed to remove owned namespace {}", clientAppId(), namespaceName, e);
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
+
+ FutureUtil.waitForAll(futures).handle((result, exception) -> {
+ if (exception != null) {
+ if (exception.getCause() instanceof PulsarAdminException) {
+ asyncResponse.resume(new RestException((PulsarAdminException) exception.getCause()));
+ return null;
+ } else {
+ log.error("[{}] Failed to remove owned namespace {}", clientAppId(), namespaceName, exception);
+ asyncResponse.resume(new RestException(exception.getCause()));
+ return null;
+ }
+ }
+
+ try {
+ // remove partitioned topics znode
+ final String globalPartitionedPath = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString());
+ // check whether partitioned topics znode exist
+ if (zkPathExists(globalPartitionedPath)) {
+ ZKUtil.deleteRecursive(globalZk(), globalPartitionedPath);
+ policiesCache().invalidate(globalPartitionedPath);
+ }
+
+ // we have successfully removed all the ownership for the namespace, the policies znode can be deleted
+ // now
+ final String globalZkPolicyPath = path(POLICIES, namespaceName.toString());
+ final String lcaolZkPolicyPath = joinPath(LOCAL_POLICIES_ROOT, namespaceName.toString());
+ globalZk().delete(globalZkPolicyPath, -1);
+
+ try {
+ localZk().delete(lcaolZkPolicyPath, -1);
+ } catch (NoNodeException nne) {
+ // If the z-node with the modified information is not there anymore, we're already good
+ }
+
+ policiesCache().invalidate(globalZkPolicyPath);
+ localCacheService().policiesCache().invalidate(lcaolZkPolicyPath);
+ } catch (Exception e) {
+ log.error("[{}] Failed to remove owned namespace {} from ZK", clientAppId(), namespaceName, e);
+ asyncResponse.resume(new RestException(e));
+ return null;
+ }
+
+ asyncResponse.resume(Response.noContent().build());
+ return null;
+ });
+ }
+
+ protected void internalDeleteNamespaceBundle(String bundleRange, boolean authoritative, boolean force) {
+ if (force) {
+ internalDeleteNamespaceBundleForcefully(bundleRange, authoritative);
+ } else {
+ internalDeleteNamespaceBundle(bundleRange, authoritative);
+ }
+ }
+
+ @SuppressWarnings("deprecation")
protected void internalDeleteNamespaceBundle(String bundleRange, boolean authoritative) {
validateNamespaceOperation(namespaceName, NamespaceOperation.DELETE_BUNDLE);
validatePoliciesReadOnlyAccess();
@@ -381,6 +551,70 @@ public abstract class NamespacesBase extends AdminResource {
}
}
+ @SuppressWarnings("deprecation")
+ protected void internalDeleteNamespaceBundleForcefully(String bundleRange, boolean authoritative) {
+ validateNamespaceOperation(namespaceName, NamespaceOperation.DELETE_BUNDLE);
+ validatePoliciesReadOnlyAccess();
+
+ // ensure that non-global namespace is directed to the correct cluster
+ if (!namespaceName.isGlobal()) {
+ validateClusterOwnership(namespaceName.getCluster());
+ }
+
+ Policies policies = getNamespacePolicies(namespaceName);
+ // ensure the local cluster is the only cluster for the global namespace configuration
+ try {
+ if (namespaceName.isGlobal()) {
+ if (policies.replication_clusters.size() > 1) {
+ // There are still more than one clusters configured for the global namespace
+ throw new RestException(Status.PRECONDITION_FAILED, "Cannot delete the global namespace "
+ + namespaceName + ". There are still more than one replication clusters configured.");
+ }
+ if (policies.replication_clusters.size() == 1
+ && !policies.replication_clusters.contains(config().getClusterName())) {
+ // the only replication cluster is other cluster, redirect
+ String replCluster = Lists.newArrayList(policies.replication_clusters).get(0);
+ ClusterData replClusterData = clustersCache().get(AdminResource.path("clusters", replCluster))
+ .orElseThrow(() -> new RestException(Status.NOT_FOUND,
+ "Cluser " + replCluster + " does not exist"));
+ URL replClusterUrl;
+ if (!config().isTlsEnabled() || !isRequestHttps()) {
+ replClusterUrl = new URL(replClusterData.getServiceUrl());
+ } else if (StringUtils.isNotBlank(replClusterData.getServiceUrlTls())) {
+ replClusterUrl = new URL(replClusterData.getServiceUrlTls());
+ } else {
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "The replication cluster does not provide TLS encrypted service");
+ }
+ URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(replClusterUrl.getHost())
+ .port(replClusterUrl.getPort()).replaceQueryParam("authoritative", false).build();
+ if(log.isDebugEnabled()) {
+ log.debug("[{}] Redirecting the rest call to {}: cluster={}", clientAppId(), redirect, replCluster);
+ }
+ throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
+ }
+ }
+ } catch (WebApplicationException wae) {
+ throw wae;
+ } catch (Exception e) {
+ throw new RestException(e);
+ }
+
+ NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
+ authoritative, true);
+
+ try {
+ // directly remove from owned namespace map and ephemeral node from ZK
+ pulsar().getNamespaceService().removeOwnedServiceUnit(bundle);
+ } catch (WebApplicationException wae) {
+ throw wae;
+ } catch (Exception e) {
+ log.error("[{}] Failed to remove namespace bundle {}/{}", clientAppId(), namespaceName.toString(),
+ bundleRange, e);
+ throw new RestException(e);
+ }
+ }
+
protected void internalGrantPermissionOnNamespace(String role, Set<AuthAction> actions) {
validateNamespaceOperation(namespaceName, NamespaceOperation.GRANT_PERMISSION);
checkNotNull(role, "Role should not be null");
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 1082429..3fc076b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -196,10 +196,11 @@ public class Namespaces extends NamespacesBase {
@ApiResponse(code = 409, message = "Namespace is not empty") })
public void deleteNamespace(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
+ @QueryParam("force") @DefaultValue("false") boolean force,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateNamespaceName(property, cluster, namespace);
- internalDeleteNamespace(asyncResponse, authoritative);
+ internalDeleteNamespace(asyncResponse, authoritative, force);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
@@ -218,9 +219,10 @@ public class Namespaces extends NamespacesBase {
public void deleteNamespaceBundle(@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@PathParam("bundle") String bundleRange,
+ @QueryParam("force") @DefaultValue("false") boolean force,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateNamespaceName(property, cluster, namespace);
- internalDeleteNamespaceBundle(bundleRange, authoritative);
+ internalDeleteNamespaceBundle(bundleRange, authoritative, force);
}
@GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index f162207..2f6b47b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -148,10 +148,11 @@ public class Namespaces extends NamespacesBase {
@ApiResponse(code = 409, message = "Namespace is not empty") })
public void deleteNamespace(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
+ @QueryParam("force") @DefaultValue("false") boolean force,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateNamespaceName(tenant, namespace);
- internalDeleteNamespace(asyncResponse, authoritative);
+ internalDeleteNamespace(asyncResponse, authoritative, force);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
@@ -169,9 +170,10 @@ public class Namespaces extends NamespacesBase {
@ApiResponse(code = 409, message = "Namespace bundle is not empty") })
public void deleteNamespaceBundle(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@PathParam("bundle") String bundleRange,
+ @QueryParam("force") @DefaultValue("false") boolean force,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateNamespaceName(tenant, namespace);
- internalDeleteNamespaceBundle(bundleRange, authoritative);
+ internalDeleteNamespaceBundle(bundleRange, authoritative, force);
}
@GET
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 4a5cfdb..33891d6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -588,7 +588,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
}
AsyncResponse response = mock(AsyncResponse.class);
- namespaces.deleteNamespace(response, "my-tenant", "use", "my-namespace", false);
+ namespaces.deleteNamespace(response, "my-tenant", "use", "my-namespace", false, false);
ArgumentCaptor<Response> captor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(captor.capture());
assertEquals(captor.getValue().getStatus(), Status.NO_CONTENT.getStatusCode());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 5158352..d4d7f88 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -611,7 +611,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
AsyncResponse response = mock(AsyncResponse.class);
namespaces.deleteNamespace(response, this.testTenant, this.testOtherCluster,
- this.testLocalNamespaces.get(2).getLocalName(), false);
+ this.testLocalNamespaces.get(2).getLocalName(), false, false);
ArgumentCaptor<WebApplicationException> captor = ArgumentCaptor.forClass(WebApplicationException.class);
verify(response, timeout(5000).times(1)).resume(captor.capture());
assertEquals(captor.getValue().getResponse().getStatus(), Status.TEMPORARY_REDIRECT.getStatusCode());
@@ -656,7 +656,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
response = mock(AsyncResponse.class);
namespaces.deleteNamespace(response, this.testLocalNamespaces.get(2).getTenant(),
- this.testLocalNamespaces.get(2).getCluster(), this.testLocalNamespaces.get(2).getLocalName(), false);
+ this.testLocalNamespaces.get(2).getCluster(), this.testLocalNamespaces.get(2).getLocalName(), false, false);
captor = ArgumentCaptor.forClass(WebApplicationException.class);
verify(response, timeout(5000).times(1)).resume(captor.capture());
assertEquals(captor.getValue().getResponse().getStatus(), Status.TEMPORARY_REDIRECT.getStatusCode());
@@ -667,7 +667,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
@Test
public void testDeleteNamespaces() throws Exception {
AsyncResponse response = mock(AsyncResponse.class);
- namespaces.deleteNamespace(response, this.testTenant, this.testLocalCluster, "non-existing-namespace-1", false);
+ namespaces.deleteNamespace(response, this.testTenant, this.testLocalCluster, "non-existing-namespace-1", false, false);
ArgumentCaptor<RestException> errorCaptor = ArgumentCaptor.forClass(RestException.class);
verify(response, timeout(5000).times(1)).resume(errorCaptor.capture());
assertEquals(errorCaptor.getValue().getResponse().getStatus(), Status.NOT_FOUND.getStatusCode());
@@ -684,7 +684,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
doReturn(true).when(nsSvc).isServiceUnitOwned(testNs);
response = mock(AsyncResponse.class);
- namespaces.deleteNamespace(response, testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), false);
+ namespaces.deleteNamespace(response, testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), false, false);
errorCaptor = ArgumentCaptor.forClass(RestException.class);
// Ok, namespace not empty
verify(response, timeout(5000).times(1)).resume(errorCaptor.capture());
@@ -698,7 +698,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
new byte[0], null, null);
response = mock(AsyncResponse.class);
- namespaces.deleteNamespace(response, testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), false);
+ namespaces.deleteNamespace(response, testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), false, false);
errorCaptor = ArgumentCaptor.forClass(RestException.class);
// Ok, namespace not empty
verify(response, timeout(5000).times(1)).resume(errorCaptor.capture());
@@ -711,7 +711,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testNs, options);
doReturn(true).when(nsSvc).isServiceUnitOwned(testNs);
response = mock(AsyncResponse.class);
- namespaces.deleteNamespace(response, testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), false);
+ namespaces.deleteNamespace(response, testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), false, false);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
assertEquals(responseCaptor.getValue().getStatus(), Status.NO_CONTENT.getStatusCode());
@@ -721,7 +721,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testNs, options);
doReturn(true).when(nsSvc).isServiceUnitOwned(testNs);
response = mock(AsyncResponse.class);
- namespaces.deleteNamespace(response, testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), false);
+ namespaces.deleteNamespace(response, testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), false, false);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
assertEquals(responseCaptor.getValue().getStatus(), Status.NO_CONTENT.getStatusCode());
@@ -737,7 +737,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(testNs, options);
doReturn(true).when(nsSvc).isServiceUnitOwned(testNs);
response = mock(AsyncResponse.class);
- namespaces.deleteNamespace(response, testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), false);
+ namespaces.deleteNamespace(response, testNs.getTenant(), testNs.getCluster(), testNs.getLocalName(), false, false);
responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
assertEquals(responseCaptor.getValue().getStatus(), Status.NO_CONTENT.getStatusCode());
@@ -783,14 +783,14 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
try {
namespaces.deleteNamespaceBundle(testTenant, testLocalCluster, bundledNsLocal, "0x00000000_0x80000000",
- false);
+ false, false);
fail("Should have failed");
} catch (RestException re) {
assertEquals(re.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
}
AsyncResponse response = mock(AsyncResponse.class);
- namespaces.deleteNamespace(response, testTenant, testLocalCluster, bundledNsLocal, false);
+ namespaces.deleteNamespace(response, testTenant, testLocalCluster, bundledNsLocal, false, false);
ArgumentCaptor<RestException> captor = ArgumentCaptor.forClass(RestException.class);
verify(response, timeout(5000).times(1)).resume(captor.capture());
assertEquals(captor.getValue().getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
@@ -805,14 +805,14 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
try {
namespaces.deleteNamespaceBundle(testTenant, testLocalCluster, bundledNsLocal, "0x80000000_0xffffffff",
- false);
+ false, false);
fail("Should have failed");
} catch (RestException re) {
assertEquals(re.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
}
response = mock(AsyncResponse.class);
- namespaces.deleteNamespace(response, testTenant, testLocalCluster, bundledNsLocal, false);
+ namespaces.deleteNamespace(response, testTenant, testLocalCluster, bundledNsLocal, false, false);
captor = ArgumentCaptor.forClass(RestException.class);
verify(response, timeout(5000).times(1)).resume(captor.capture());
assertEquals(captor.getValue().getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index f4315e5..eb397ab 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -417,6 +417,27 @@ public interface Namespaces {
void deleteNamespace(String namespace) throws PulsarAdminException;
/**
+ * Delete an existing namespace.
+ * <p/>
+ * Force flag deletes namespace forcefully by force deleting all topics under it.
+ *
+ * @param namespace
+ * Namespace name
+ * @param force
+ * Delete namespace forcefully
+ *
+ * @throws NotAuthorizedException
+ * You don't have admin permission
+ * @throws NotFoundException
+ * Namespace does not exist
+ * @throws ConflictException
+ * Namespace is not empty
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void deleteNamespace(String namespace, boolean force) throws PulsarAdminException;
+
+ /**
* Delete an existing namespace asynchronously.
* <p/>
* The namespace needs to be empty.
@@ -427,6 +448,18 @@ public interface Namespaces {
CompletableFuture<Void> deleteNamespaceAsync(String namespace);
/**
+ * Delete an existing namespace asynchronously.
+ * <p/>
+ * Force flag deletes namespace forcefully by force deleting all topics under it.
+ *
+ * @param namespace
+ * Namespace name
+ * @param force
+ * Delete namespace forcefully
+ */
+ CompletableFuture<Void> deleteNamespaceAsync(String namespace, boolean force);
+
+ /**
* Delete an existing bundle in a namespace.
* <p/>
* The bundle needs to be empty.
@@ -448,6 +481,29 @@ public interface Namespaces {
void deleteNamespaceBundle(String namespace, String bundleRange) throws PulsarAdminException;
/**
+ * Delete an existing bundle in a namespace.
+ * <p/>
+ * Force flag deletes bundle forcefully.
+ *
+ * @param namespace
+ * Namespace name
+ * @param bundleRange
+ * range of the bundle
+ * @param force
+ * Delete bundle forcefully
+ *
+ * @throws NotAuthorizedException
+ * You don't have admin permission
+ * @throws NotFoundException
+ * Namespace/bundle does not exist
+ * @throws ConflictException
+ * Bundle is not empty
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void deleteNamespaceBundle(String namespace, String bundleRange, boolean force) throws PulsarAdminException;
+
+ /**
* Delete an existing bundle in a namespace asynchronously.
* <p/>
* The bundle needs to be empty.
@@ -462,6 +518,22 @@ public interface Namespaces {
CompletableFuture<Void> deleteNamespaceBundleAsync(String namespace, String bundleRange);
/**
+ * Delete an existing bundle in a namespace asynchronously.
+ * <p/>
+ * Force flag deletes bundle forcefully.
+ *
+ * @param namespace
+ * Namespace name
+ * @param bundleRange
+ * range of the bundle
+ * @param force
+ * Delete bundle forcefully
+ *
+ * @return a future that can be used to track when the bundle is deleted
+ */
+ CompletableFuture<Void> deleteNamespaceBundleAsync(String namespace, String bundleRange, boolean force);
+
+ /**
* Get permissions on a namespace.
* <p/>
* Retrieve the permissions for a namespace.
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 83c6ce4..11269d0 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -332,9 +332,29 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
}
@Override
+ public void deleteNamespace(String namespace, boolean force) throws PulsarAdminException {
+ try {
+ deleteNamespaceAsync(namespace, force).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
public CompletableFuture<Void> deleteNamespaceAsync(String namespace) {
+ return deleteNamespaceAsync(namespace, false);
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteNamespaceAsync(String namespace, boolean force) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns);
+ path = path.queryParam("force", force);
return asyncDeleteRequest(path);
}
@@ -353,9 +373,29 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
}
@Override
+ public void deleteNamespaceBundle(String namespace, String bundleRange, boolean force) throws PulsarAdminException {
+ try {
+ deleteNamespaceBundleAsync(namespace, bundleRange, force).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
public CompletableFuture<Void> deleteNamespaceBundleAsync(String namespace, String bundleRange) {
+ return deleteNamespaceBundleAsync(namespace, bundleRange, false);
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteNamespaceBundleAsync(String namespace, String bundleRange, boolean force) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, bundleRange);
+ path = path.queryParam("force", force);
return asyncDeleteRequest(path);
}
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 965887d..1f18e83 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -274,7 +274,7 @@ public class PulsarAdminToolTest {
verify(mockNamespaces).createNamespace("myprop/clust/ns1");
namespaces.run(split("delete myprop/clust/ns1"));
- verify(mockNamespaces).deleteNamespace("myprop/clust/ns1");
+ verify(mockNamespaces).deleteNamespace("myprop/clust/ns1", false);
namespaces.run(split("permissions myprop/clust/ns1"));
verify(mockNamespaces).getPermissions("myprop/clust/ns1");
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 661a56a..0f4f86f 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -165,15 +165,19 @@ public class CmdNamespaces extends CmdBase {
}
}
- @Parameters(commandDescription = "Deletes a namespace. The namespace needs to be empty")
+ @Parameters(commandDescription = "Deletes a namespace.")
private class Delete extends CliCommand {
@Parameter(description = "tenant/namespace\n", required = true)
private java.util.List<String> params;
+ @Parameter(names = { "-f",
+ "--force" }, description = "Delete namespace forcefully by force deleting all topics under it")
+ private boolean force = false;
+
@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
- admin.namespaces().deleteNamespace(namespace);
+ admin.namespaces().deleteNamespace(namespace, force);
}
}