You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/01 03:12:32 UTC
[pulsar] 05/10: Fix delete namespace issue. (#14215)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 30a2ffb7e1209c6f357b6c87a19b67619bb452a4
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Sun Feb 27 11:53:12 2022 +0800
Fix delete namespace issue. (#14215)
### Motivation
When we delete namespace with `force=false`, sometimes there may occur `Cannot delete non empty bundle`.
Because delete system topic and delete namespace bundles are put into futures, can't keep their order.
We should delete `system topic` first and then delete `namespace bundles`.
### Modifications
- Use admin cli to delete system topic.
- Delete system topic first and then delete namespace bundles
- Make internalClearZkSources async and the code more readable.
(cherry picked from commit 145f29af49074acfff1881895de06e00a11658d9)
---
.../apache/pulsar/broker/admin/AdminResource.java | 3 +
.../pulsar/broker/admin/impl/NamespacesBase.java | 93 ++++++++++------------
.../apache/pulsar/broker/admin/NamespacesTest.java | 24 +++---
3 files changed, 54 insertions(+), 66 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 2aaa685..9935794 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.naming.Constants;
@@ -756,6 +757,8 @@ public abstract class AdminResource extends PulsarWebResource {
asyncResponse.resume(realCause);
} else if (realCause instanceof BrokerServiceException.NotAllowedException) {
asyncResponse.resume(new RestException(Status.CONFLICT, realCause));
+ } else if (realCause instanceof PulsarAdminException) {
+ asyncResponse.resume(new RestException(((PulsarAdminException) realCause)));
} else {
asyncResponse.resume(new RestException(realCause));
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index c7a2b89..382128c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -263,52 +263,55 @@ public abstract class NamespacesBase extends AdminResource {
// remove from owned namespace map and ephemeral node from ZK
final List<CompletableFuture<Void>> futures = Lists.newArrayList();
- try {
- // remove system topics first.
- if (!topics.isEmpty()) {
- for (String topic : topics) {
- pulsar().getBrokerService().getTopicIfExists(topic).whenComplete((topicOptional, ex) -> {
- topicOptional.ifPresent(systemTopic -> futures.add(systemTopic.deleteForcefully()));
- });
+ // remove system topics first.
+ if (!topics.isEmpty()) {
+ for (String topic : topics) {
+ try {
+ futures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true, true));
+ } catch (Exception ex) {
+ log.error("[{}] Failed to delete system topic {}", clientAppId(), topic, ex);
+ asyncResponse.resume(new RestException(Status.INTERNAL_SERVER_ERROR, ex));
+ return;
}
}
+ }
+ FutureUtil.waitForAll(futures).thenCompose(__ -> {
+ List<CompletableFuture<Void>> deleteBundleFutures = Lists.newArrayList();
NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
- .getBundles(namespaceName);
+ .getBundles(namespaceName);
for (NamespaceBundle bundle : bundles.getBundles()) {
- // check if the bundle is owned by any broker, if not then we do not need to delete the bundle
- if (pulsar().getNamespaceService().getOwner(bundle).isPresent()) {
- futures.add(pulsar().getAdminClient().namespaces()
- .deleteNamespaceBundleAsync(namespaceName.toString(), bundle.getBundleRange()));
- }
- }
- } catch (Exception e) {
- log.error("[{}] Failed to remove owned namespace {}", clientAppId(), namespaceName, e);
- asyncResponse.resume(new RestException(e));
- return;
- }
-
- FutureUtil.waitForAll(futures).handle((result, exception) -> {
- if (exception != null) {
- if (exception.getCause() instanceof PulsarAdminException) {
- asyncResponse.resume(new RestException((PulsarAdminException) exception.getCause()));
- return null;
- } else {
- log.error("[{}] Failed to remove owned namespace {}", clientAppId(), namespaceName, exception);
- asyncResponse.resume(new RestException(exception.getCause()));
- return null;
- }
+ // check if the bundle is owned by any broker, if not then we do not need to delete the bundle
+ deleteBundleFutures.add(pulsar().getNamespaceService().getOwnerAsync(bundle).thenCompose(ownership -> {
+ if (ownership.isPresent()) {
+ try {
+ return pulsar().getAdminClient().namespaces()
+ .deleteNamespaceBundleAsync(namespaceName.toString(), bundle.getBundleRange());
+ } catch (PulsarServerException e) {
+ throw new RestException(e);
+ }
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ }));
}
-
- internalClearZkSources(asyncResponse);
-
+ return FutureUtil.waitForAll(deleteBundleFutures);
+ })
+ .thenCompose(__ -> internalClearZkSources())
+ .thenAccept(__ -> {
+ log.info("[{}] Remove namespace successfully {}", clientAppId(), namespaceName);
+ asyncResponse.resume(Response.noContent().build());
+ })
+ .exceptionally(ex -> {
+ log.error("[{}] Failed to remove namespace {}", clientAppId(), namespaceName, ex.getCause());
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
// clear zk-node resources for deleting namespace
- protected void internalClearZkSources(AsyncResponse asyncResponse) {
+ protected CompletableFuture<Void> internalClearZkSources() {
// clear resource of `/namespace/{namespaceName}` for zk-node
- namespaceResources().deleteNamespaceAsync(namespaceName)
+ return namespaceResources().deleteNamespaceAsync(namespaceName)
.thenCompose(ignore -> namespaceResources().getPartitionedTopicResources()
.clearPartitionedTopicMetadataAsync(namespaceName))
// clear resource for manager-ledger z-node
@@ -322,18 +325,8 @@ public abstract class NamespacesBase extends AdminResource {
// clear z-node of local policies
.thenCompose(ignore -> getLocalPolicies().deleteLocalPoliciesAsync(namespaceName))
// clear /loadbalance/bundle-data
- .thenCompose(ignore -> namespaceResources().deleteBundleDataAsync(namespaceName))
- .whenComplete((ignore, ex) -> {
- if (ex != null) {
- log.warn("[{}] Failed to remove namespace or managed-ledger for {}",
- clientAppId(), namespaceName, ex);
- asyncResponse.resume(new RestException(ex));
- } else {
- log.info("[{}] Remove namespace or managed-ledger successfully {}",
- clientAppId(), namespaceName);
- asyncResponse.resume(Response.noContent().build());
- }
- });
+ .thenCompose(ignore -> namespaceResources().deleteBundleDataAsync(namespaceName));
+
}
@SuppressWarnings("deprecation")
@@ -499,7 +492,7 @@ public abstract class NamespacesBase extends AdminResource {
return;
}
- FutureUtil.waitForAll(bundleFutures).handle((result, exception) -> {
+ FutureUtil.waitForAll(bundleFutures).thenCompose(__ -> internalClearZkSources()).handle((result, exception) -> {
if (exception != null) {
if (exception.getCause() instanceof PulsarAdminException) {
asyncResponse.resume(new RestException((PulsarAdminException) exception.getCause()));
@@ -511,9 +504,7 @@ public abstract class NamespacesBase extends AdminResource {
return null;
}
}
-
- internalClearZkSources(asyncResponse);
-
+ asyncResponse.resume(Response.noContent().build());
return null;
});
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index b8ad2f2..7bd2607 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -774,7 +774,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
@Test
public void testDeleteNamespaceWithBundles() throws Exception {
URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress());
- String bundledNsLocal = "test-bundled-namespace-1";
+ String bundledNsLocal = "test-delete-namespace-with-bundles";
List<String> boundaries = Lists.newArrayList("0x00000000", "0x80000000", "0xffffffff");
BundlesData bundleData = BundlesData.builder()
.boundaries(boundaries)
@@ -799,8 +799,8 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
return bundle.getNamespaceObject().equals(testNs);
}
}));
- doReturn(Optional.of(mock(NamespaceEphemeralData.class))).when(nsSvc)
- .getOwner(Mockito.argThat(new ArgumentMatcher<NamespaceBundle>() {
+ doReturn(CompletableFuture.completedFuture(Optional.of(mock(NamespaceEphemeralData.class)))).when(nsSvc)
+ .getOwnerAsync(Mockito.argThat(new ArgumentMatcher<NamespaceBundle>() {
@Override
public boolean matches(NamespaceBundle bundle) {
return bundle.getNamespaceObject().equals(testNs);
@@ -823,21 +823,19 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
} catch (RestException re) {
assertEquals(re.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
}
-
+ NamespaceBundles nsBundles = nsSvc.getNamespaceBundleFactory().getBundles(testNs, bundleData);
+ doReturn(Optional.empty()).when(nsSvc).getWebServiceUrl(any(NamespaceBundle.class), any(LookupOptions.class));
AsyncResponse response = mock(AsyncResponse.class);
namespaces.deleteNamespace(response, testTenant, testLocalCluster, bundledNsLocal, false, false);
ArgumentCaptor<RestException> captor = ArgumentCaptor.forClass(RestException.class);
verify(response, timeout(5000).times(1)).resume(captor.capture());
assertEquals(captor.getValue().getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
-
- NamespaceBundles nsBundles = nsSvc.getNamespaceBundleFactory().getBundles(testNs, bundleData);
// make one bundle owned
LookupOptions optionsHttps = LookupOptions.builder().authoritative(false).requestHttps(true).readOnly(false).build();
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(nsBundles.getBundles().get(0), optionsHttps);
doReturn(true).when(nsSvc).isServiceUnitOwned(nsBundles.getBundles().get(0));
doReturn(CompletableFuture.completedFuture(null)).when(namespacesAdmin).deleteNamespaceBundleAsync(
testTenant + "/" + testLocalCluster + "/" + bundledNsLocal, "0x00000000_0x80000000");
-
try {
namespaces.deleteNamespaceBundle(testTenant, testLocalCluster, bundledNsLocal, "0x80000000_0xffffffff",
false, false);
@@ -845,19 +843,15 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
} catch (RestException re) {
assertEquals(re.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
}
-
response = mock(AsyncResponse.class);
namespaces.deleteNamespace(response, testTenant, testLocalCluster, bundledNsLocal, false, false);
- captor = ArgumentCaptor.forClass(RestException.class);
- verify(response, timeout(5000).times(1)).resume(captor.capture());
- assertEquals(captor.getValue().getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
-
- // ensure all three bundles are owned by the local broker
+ doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(any(NamespaceBundle.class), any(LookupOptions.class));
for (NamespaceBundle bundle : nsBundles.getBundles()) {
- doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(bundle, optionsHttps);
doReturn(true).when(nsSvc).isServiceUnitOwned(bundle);
}
- doNothing().when(namespacesAdmin).deleteNamespaceBundle(Mockito.anyString(), Mockito.anyString());
+ ArgumentCaptor<Response> captor2 = ArgumentCaptor.forClass(Response.class);
+ verify(response, timeout(5000).times(1)).resume(captor2.capture());
+ assertEquals(captor2.getValue().getStatus(), Status.NO_CONTENT.getStatusCode());
}
@Test