You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zi...@apache.org on 2022/07/13 11:44:27 UTC
[pulsar] branch master updated: [improve][broker] Make Namespaces.deleteNamespaceBundle async (#16287)
This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 147672f5428 [improve][broker] Make Namespaces.deleteNamespaceBundle async (#16287)
147672f5428 is described below
commit 147672f5428e910c66dbbf90a2af18b78f8bf104
Author: Zike Yang <zi...@apache.org>
AuthorDate: Wed Jul 13 19:44:19 2022 +0800
[improve][broker] Make Namespaces.deleteNamespaceBundle async (#16287)
Master Issue: #14365
### Motivation
Please see #14365
### Modifications
* Make Namespaces.deleteNamespaceBundle async
* Combine internalDeleteNamespaceBundle
* Make removeOwnedServiceUnit async
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 226 ++++++++-------------
.../apache/pulsar/broker/admin/v1/Namespaces.java | 24 ++-
.../apache/pulsar/broker/admin/v2/Namespaces.java | 21 +-
.../pulsar/broker/namespace/NamespaceService.java | 7 +-
.../apache/pulsar/broker/admin/NamespacesTest.java | 28 ++-
5 files changed, 131 insertions(+), 175 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 677a9532f62..1766874322c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -25,8 +25,10 @@ import static org.apache.pulsar.common.policies.data.PoliciesUtil.getBundles;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
+import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -551,153 +553,95 @@ public abstract class NamespacesBase extends AdminResource {
});
}
- protected void internalDeleteNamespaceBundle(String bundleRange, boolean authoritative, boolean force) {
- if (force) {
- internalDeleteNamespaceBundleForcefully(bundleRange, authoritative);
- } else {
- internalDeleteNamespaceBundle(bundleRange, authoritative);
- }
- }
-
@SuppressWarnings("deprecation")
- protected void internalDeleteNamespaceBundle(String bundleRange, boolean authoritative) {
- validateNamespaceOperation(namespaceName, NamespaceOperation.DELETE_BUNDLE);
- validatePoliciesReadOnlyAccess();
-
- // ensure that non-global namespace is directed to the correct cluster
- if (!namespaceName.isGlobal()) {
- validateClusterOwnership(namespaceName.getCluster());
- }
-
- Policies policies = getNamespacePolicies(namespaceName);
- // ensure the local cluster is the only cluster for the global namespace configuration
- try {
- if (namespaceName.isGlobal()) {
- if (policies.replication_clusters.size() > 1) {
- // There are still more than one clusters configured for the global namespace
- throw new RestException(Status.PRECONDITION_FAILED, "Cannot delete the global namespace "
- + namespaceName + ". There are still more than one replication clusters configured.");
- }
- if (policies.replication_clusters.size() == 1
- && !policies.replication_clusters.contains(config().getClusterName())) {
- // the only replication cluster is other cluster, redirect
- String replCluster = Lists.newArrayList(policies.replication_clusters).get(0);
- ClusterData replClusterData =
- clusterResources().getCluster(replCluster)
- .orElseThrow(() -> new RestException(Status.NOT_FOUND,
- "Cluster " + replCluster + " does not exist"));
- URL replClusterUrl;
- if (!config().isTlsEnabled() || !isRequestHttps()) {
- replClusterUrl = new URL(replClusterData.getServiceUrl());
- } else if (StringUtils.isNotBlank(replClusterData.getServiceUrlTls())) {
- replClusterUrl = new URL(replClusterData.getServiceUrlTls());
- } else {
- throw new RestException(Status.PRECONDITION_FAILED,
- "The replication cluster does not provide TLS encrypted service");
- }
- URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(replClusterUrl.getHost())
- .port(replClusterUrl.getPort()).replaceQueryParam("authoritative", false).build();
- if (log.isDebugEnabled()) {
- log.debug("[{}] Redirecting the rest call to {}: cluster={}",
- clientAppId(), redirect, replCluster);
+ protected CompletableFuture<Void> internalDeleteNamespaceBundleAsync(String bundleRange, boolean authoritative,
+ boolean force) {
+ return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.DELETE_BUNDLE)
+ .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+ .thenCompose(__ -> {
+ if (!namespaceName.isGlobal()) {
+ return validateClusterOwnershipAsync(namespaceName.getCluster());
}
- throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
- }
- }
- } catch (WebApplicationException wae) {
- throw wae;
- } catch (Exception e) {
- throw new RestException(e);
- }
-
- try {
- NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
- authoritative, true);
- List<String> topics = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName)
- .get(config().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
- for (String topic : topics) {
- NamespaceBundle topicBundle = pulsar().getNamespaceService()
- .getBundle(TopicName.get(topic));
- if (bundle.equals(topicBundle)) {
- throw new RestException(Status.CONFLICT, "Cannot delete non empty bundle");
- }
- }
-
- // remove from owned namespace map and ephemeral node from ZK
- pulsar().getNamespaceService().removeOwnedServiceUnit(bundle);
- } catch (WebApplicationException wae) {
- throw wae;
- } catch (Exception e) {
- log.error("[{}] Failed to remove namespace bundle {}/{}", clientAppId(), namespaceName.toString(),
- bundleRange, e);
- throw new RestException(e);
- }
- }
-
- @SuppressWarnings("deprecation")
- protected void internalDeleteNamespaceBundleForcefully(String bundleRange, boolean authoritative) {
- validateNamespaceOperation(namespaceName, NamespaceOperation.DELETE_BUNDLE);
- validatePoliciesReadOnlyAccess();
+ return CompletableFuture.completedFuture(null);
+ })
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenCompose(policies -> {
+ CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
+ if (namespaceName.isGlobal()) {
- // ensure that non-global namespace is directed to the correct cluster
- if (!namespaceName.isGlobal()) {
- validateClusterOwnership(namespaceName.getCluster());
- }
+ if (policies.replication_clusters.size() > 1) {
+ // There are still more than one clusters configured for the global namespace
+ throw new RestException(Status.PRECONDITION_FAILED, "Cannot delete the global namespace "
+ + namespaceName
+ + ". There are still more than one replication clusters configured.");
+ }
+ if (policies.replication_clusters.size() == 1
+ && !policies.replication_clusters.contains(config().getClusterName())) {
+ // the only replication cluster is other cluster, redirect
+ String replCluster = Lists.newArrayList(policies.replication_clusters).get(0);
+ future = clusterResources().getClusterAsync(replCluster)
+ .thenCompose(clusterData -> {
+ if (clusterData.isEmpty()) {
+ throw new RestException(Status.NOT_FOUND,
+ "Cluster " + replCluster + " does not exist");
+ }
+ ClusterData replClusterData = clusterData.get();
+ URL replClusterUrl;
+ try {
+ if (!config().isTlsEnabled() || !isRequestHttps()) {
+ replClusterUrl = new URL(replClusterData.getServiceUrl());
+ } else if (StringUtils.isNotBlank(replClusterData.getServiceUrlTls())) {
+ replClusterUrl = new URL(replClusterData.getServiceUrlTls());
+ } else {
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "The replication cluster does not provide TLS encrypted "
+ + "service");
+ }
+ } catch (MalformedURLException malformedURLException) {
+ throw new RestException(malformedURLException);
+ }
- Policies policies = getNamespacePolicies(namespaceName);
- // ensure the local cluster is the only cluster for the global namespace configuration
- try {
- if (namespaceName.isGlobal()) {
- if (policies.replication_clusters.size() > 1) {
- // There are still more than one clusters configured for the global namespace
- throw new RestException(Status.PRECONDITION_FAILED, "Cannot delete the global namespace "
- + namespaceName + ". There are still more than one replication clusters configured.");
- }
- if (policies.replication_clusters.size() == 1
- && !policies.replication_clusters.contains(config().getClusterName())) {
- // the only replication cluster is other cluster, redirect
- String replCluster = Lists.newArrayList(policies.replication_clusters).get(0);
- ClusterData replClusterData =
- clusterResources().getCluster(replCluster)
- .orElseThrow(() -> new RestException(Status.NOT_FOUND,
- "Cluster " + replCluster + " does not exist"));
- URL replClusterUrl;
- if (!config().isTlsEnabled() || !isRequestHttps()) {
- replClusterUrl = new URL(replClusterData.getServiceUrl());
- } else if (StringUtils.isNotBlank(replClusterData.getServiceUrlTls())) {
- replClusterUrl = new URL(replClusterData.getServiceUrlTls());
- } else {
- throw new RestException(Status.PRECONDITION_FAILED,
- "The replication cluster does not provide TLS encrypted service");
- }
- URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(replClusterUrl.getHost())
- .port(replClusterUrl.getPort())
- .replaceQueryParam("authoritative", false).build();
- if (log.isDebugEnabled()) {
- log.debug("[{}] Redirecting the rest call to {}: cluster={}",
- clientAppId(), redirect, replCluster);
+ URI redirect =
+ UriBuilder.fromUri(uri.getRequestUri()).host(replClusterUrl.getHost())
+ .port(replClusterUrl.getPort())
+ .replaceQueryParam("authoritative", false).build();
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Redirecting the rest call to {}: cluster={}",
+ clientAppId(), redirect, replCluster);
+ }
+ throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
+ });
+ }
}
- throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
- }
- }
- } catch (WebApplicationException wae) {
- throw wae;
- } catch (Exception e) {
- throw new RestException(e);
- }
+ return future.thenCompose(__ -> {
+ NamespaceBundle bundle =
+ validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
+ authoritative, true);
+ return pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName)
+ .thenCompose(topics -> {
+ CompletableFuture<Void> deleteTopicsFuture =
+ CompletableFuture.completedFuture(null);
+ if (!force) {
+ List<CompletableFuture<NamespaceBundle>> futures = new ArrayList<>();
+ for (String topic : topics) {
+ futures.add(pulsar().getNamespaceService()
+ .getBundleAsync(TopicName.get(topic))
+ .thenCompose(topicBundle -> {
+ if (bundle.equals(topicBundle)) {
+ throw new RestException(Status.CONFLICT,
+ "Cannot delete non empty bundle");
+ }
+ return CompletableFuture.completedFuture(null);
+ }));
- try {
- NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
- authoritative, true);
- // directly remove from owned namespace map and ephemeral node from ZK
- pulsar().getNamespaceService().removeOwnedServiceUnit(bundle);
- } catch (WebApplicationException wae) {
- throw wae;
- } catch (Exception e) {
- log.error("[{}] Failed to remove namespace bundle {}/{}", clientAppId(), namespaceName.toString(),
- bundleRange, e);
- throw new RestException(e);
- }
+ }
+ deleteTopicsFuture = FutureUtil.waitForAll(futures);
+ }
+ return deleteTopicsFuture.thenCompose(
+ ___ -> pulsar().getNamespaceService().removeOwnedServiceUnitAsync(bundle));
+ });
+ });
+ });
}
protected CompletableFuture<Void> internalGrantPermissionOnNamespaceAsync(String role, Set<AuthAction> actions) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 673435fd4c8..9d09086c9b2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -259,14 +259,22 @@ public class Namespaces extends NamespacesBase {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
- @ApiResponse(code = 409, message = "Namespace bundle is not empty") })
- public void deleteNamespaceBundle(@PathParam("property") String property,
- @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
- @PathParam("bundle") String bundleRange,
- @QueryParam("force") @DefaultValue("false") boolean force,
- @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
- validateNamespaceName(property, cluster, namespace);
- internalDeleteNamespaceBundle(bundleRange, authoritative, force);
+ @ApiResponse(code = 409, message = "Namespace bundle is not empty")})
+ public void deleteNamespaceBundle(@Suspended AsyncResponse response, @PathParam("property") String property,
+ @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
+ @PathParam("bundle") String bundleRange,
+ @QueryParam("force") @DefaultValue("false") boolean force,
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+ validateNamespaceName(property, cluster, namespace);
+ internalDeleteNamespaceBundleAsync(bundleRange, authoritative, force)
+ .thenRun(() -> response.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to delete namespace bundle {}", clientAppId(), namespaceName, ex);
+ }
+ resumeAsyncResponseExceptionally(response, ex);
+ return null;
+ });
}
@GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index ab7388cf3eb..91921489d50 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -207,13 +207,22 @@ public class Namespaces extends NamespacesBase {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
- @ApiResponse(code = 409, message = "Namespace bundle is not empty") })
- public void deleteNamespaceBundle(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
- @PathParam("bundle") String bundleRange,
- @QueryParam("force") @DefaultValue("false") boolean force,
- @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+ @ApiResponse(code = 409, message = "Namespace bundle is not empty")})
+ public void deleteNamespaceBundle(@Suspended AsyncResponse response, @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("bundle") String bundleRange,
+ @QueryParam("force") @DefaultValue("false") boolean force,
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateNamespaceName(tenant, namespace);
- internalDeleteNamespaceBundle(bundleRange, authoritative, force);
+ internalDeleteNamespaceBundleAsync(bundleRange, authoritative, force)
+ .thenRun(() -> response.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to delete namespace bundle {}", clientAppId(), namespaceName, ex);
+ }
+ resumeAsyncResponseExceptionally(response, ex);
+ return null;
+ });
}
@GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 86fd5b58662..eecabf3fb36 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1047,10 +1047,9 @@ public class NamespaceService implements AutoCloseable {
.thenCompose(ownershipCache::checkOwnershipAsync);
}
- public void removeOwnedServiceUnit(NamespaceBundle nsBundle) throws Exception {
- ownershipCache.removeOwnership(nsBundle).get(
- pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
- bundleFactory.invalidateBundleCache(nsBundle.getNamespaceObject());
+ public CompletableFuture<Void> removeOwnedServiceUnitAsync(NamespaceBundle nsBundle) {
+ return ownershipCache.removeOwnership(nsBundle)
+ .thenRun(() -> bundleFactory.invalidateBundleCache(nsBundle.getNamespaceObject()));
}
protected void onNamespaceBundleOwned(NamespaceBundle bundle) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index e415c4d9ca1..11a16d4d17f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -837,18 +837,16 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
doReturn(preconditionFailed).when(namespacesAdmin)
.deleteNamespaceBundleAsync(Mockito.anyString(), Mockito.anyString());
- try {
- namespaces.deleteNamespaceBundle(testTenant, testLocalCluster, bundledNsLocal, "0x00000000_0x80000000",
- false, false);
- fail("Should have failed");
- } catch (RestException re) {
- assertEquals(re.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
- }
+ AsyncResponse response = mock(AsyncResponse.class);
+ ArgumentCaptor<RestException> captor = ArgumentCaptor.forClass(RestException.class);
+ namespaces.deleteNamespaceBundle(response, testTenant, testLocalCluster, bundledNsLocal,
+ "0x00000000_0x80000000", false, false);
+ verify(response, timeout(5000).times(1)).resume(captor.capture());
+ assertEquals(captor.getValue().getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
NamespaceBundles nsBundles = nsSvc.getNamespaceBundleFactory().getBundles(testNs, bundleData);
doReturn(Optional.empty()).when(nsSvc).getWebServiceUrl(any(NamespaceBundle.class), any(LookupOptions.class));
- AsyncResponse response = mock(AsyncResponse.class);
+ response = mock(AsyncResponse.class);
namespaces.deleteNamespace(response, testTenant, testLocalCluster, bundledNsLocal, false, false);
- ArgumentCaptor<RestException> captor = ArgumentCaptor.forClass(RestException.class);
verify(response, timeout(5000).times(1)).resume(captor.capture());
assertEquals(captor.getValue().getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
// make one bundle owned
@@ -857,13 +855,11 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
doReturn(true).when(nsSvc).isServiceUnitOwned(nsBundles.getBundles().get(0));
doReturn(CompletableFuture.completedFuture(null)).when(namespacesAdmin).deleteNamespaceBundleAsync(
testTenant + "/" + testLocalCluster + "/" + bundledNsLocal, "0x00000000_0x80000000");
- try {
- namespaces.deleteNamespaceBundle(testTenant, testLocalCluster, bundledNsLocal, "0x80000000_0xffffffff",
- false, false);
- fail("Should have failed");
- } catch (RestException re) {
- assertEquals(re.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
- }
+ response = mock(AsyncResponse.class);
+ namespaces.deleteNamespaceBundle(response, testTenant, testLocalCluster, bundledNsLocal,
+ "0x80000000_0xffffffff", false, false);
+ verify(response, timeout(5000).times(1)).resume(captor.capture());
+ assertEquals(captor.getValue().getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
response = mock(AsyncResponse.class);
doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(any(NamespaceBundle.class), any(LookupOptions.class));
for (NamespaceBundle bundle : nsBundles.getBundles()) {