You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zi...@apache.org on 2022/07/13 11:44:27 UTC

[pulsar] branch master updated: [improve][broker] Make Namespaces.deleteNamespaceBundle async (#16287)

This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 147672f5428 [improve][broker] Make Namespaces.deleteNamespaceBundle async (#16287)
147672f5428 is described below

commit 147672f5428e910c66dbbf90a2af18b78f8bf104
Author: Zike Yang <zi...@apache.org>
AuthorDate: Wed Jul 13 19:44:19 2022 +0800

    [improve][broker] Make Namespaces.deleteNamespaceBundle async (#16287)
    
    Master Issue: #14365
    
    ### Motivation
    
    Please see #14365
    
    ### Modifications
    
    * Make Namespaces.deleteNamespaceBundle async
    * Combine internalDeleteNamespaceBundle
    * Make removeOwnedServiceUnit async
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 226 ++++++++-------------
 .../apache/pulsar/broker/admin/v1/Namespaces.java  |  24 ++-
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  21 +-
 .../pulsar/broker/namespace/NamespaceService.java  |   7 +-
 .../apache/pulsar/broker/admin/NamespacesTest.java |  28 ++-
 5 files changed, 131 insertions(+), 175 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 677a9532f62..1766874322c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -25,8 +25,10 @@ import static org.apache.pulsar.common.policies.data.PoliciesUtil.getBundles;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import java.lang.reflect.Field;
+import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -551,153 +553,95 @@ public abstract class NamespacesBase extends AdminResource {
         });
     }
 
-    protected void internalDeleteNamespaceBundle(String bundleRange, boolean authoritative, boolean force) {
-        if (force) {
-            internalDeleteNamespaceBundleForcefully(bundleRange, authoritative);
-        } else {
-            internalDeleteNamespaceBundle(bundleRange, authoritative);
-        }
-    }
-
     @SuppressWarnings("deprecation")
-    protected void internalDeleteNamespaceBundle(String bundleRange, boolean authoritative) {
-        validateNamespaceOperation(namespaceName, NamespaceOperation.DELETE_BUNDLE);
-        validatePoliciesReadOnlyAccess();
-
-        // ensure that non-global namespace is directed to the correct cluster
-        if (!namespaceName.isGlobal()) {
-            validateClusterOwnership(namespaceName.getCluster());
-        }
-
-        Policies policies = getNamespacePolicies(namespaceName);
-        // ensure the local cluster is the only cluster for the global namespace configuration
-        try {
-            if (namespaceName.isGlobal()) {
-                if (policies.replication_clusters.size() > 1) {
-                    // There are still more than one clusters configured for the global namespace
-                    throw new RestException(Status.PRECONDITION_FAILED, "Cannot delete the global namespace "
-                            + namespaceName + ". There are still more than one replication clusters configured.");
-                }
-                if (policies.replication_clusters.size() == 1
-                        && !policies.replication_clusters.contains(config().getClusterName())) {
-                    // the only replication cluster is other cluster, redirect
-                    String replCluster = Lists.newArrayList(policies.replication_clusters).get(0);
-                    ClusterData replClusterData =
-                            clusterResources().getCluster(replCluster)
-                            .orElseThrow(() -> new RestException(Status.NOT_FOUND,
-                                    "Cluster " + replCluster + " does not exist"));
-                    URL replClusterUrl;
-                    if (!config().isTlsEnabled() || !isRequestHttps()) {
-                        replClusterUrl = new URL(replClusterData.getServiceUrl());
-                    } else if (StringUtils.isNotBlank(replClusterData.getServiceUrlTls())) {
-                        replClusterUrl = new URL(replClusterData.getServiceUrlTls());
-                    } else {
-                        throw new RestException(Status.PRECONDITION_FAILED,
-                                "The replication cluster does not provide TLS encrypted service");
-                    }
-                    URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(replClusterUrl.getHost())
-                            .port(replClusterUrl.getPort()).replaceQueryParam("authoritative", false).build();
-                    if (log.isDebugEnabled()) {
-                        log.debug("[{}] Redirecting the rest call to {}: cluster={}",
-                                clientAppId(), redirect, replCluster);
+    protected CompletableFuture<Void> internalDeleteNamespaceBundleAsync(String bundleRange, boolean authoritative,
+                                                                         boolean force) {
+        return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.DELETE_BUNDLE)
+                .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
+                .thenCompose(__ -> {
+                    if (!namespaceName.isGlobal()) {
+                        return validateClusterOwnershipAsync(namespaceName.getCluster());
                     }
-                    throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
-                }
-            }
-        } catch (WebApplicationException wae) {
-            throw wae;
-        } catch (Exception e) {
-            throw new RestException(e);
-        }
-
-        try {
-            NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
-                authoritative, true);
-            List<String> topics = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName)
-                    .get(config().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
-            for (String topic : topics) {
-                NamespaceBundle topicBundle = pulsar().getNamespaceService()
-                        .getBundle(TopicName.get(topic));
-                if (bundle.equals(topicBundle)) {
-                    throw new RestException(Status.CONFLICT, "Cannot delete non empty bundle");
-                }
-            }
-
-            // remove from owned namespace map and ephemeral node from ZK
-            pulsar().getNamespaceService().removeOwnedServiceUnit(bundle);
-        } catch (WebApplicationException wae) {
-            throw wae;
-        } catch (Exception e) {
-            log.error("[{}] Failed to remove namespace bundle {}/{}", clientAppId(), namespaceName.toString(),
-                    bundleRange, e);
-            throw new RestException(e);
-        }
-    }
-
-    @SuppressWarnings("deprecation")
-    protected void internalDeleteNamespaceBundleForcefully(String bundleRange, boolean authoritative) {
-        validateNamespaceOperation(namespaceName, NamespaceOperation.DELETE_BUNDLE);
-        validatePoliciesReadOnlyAccess();
+                    return CompletableFuture.completedFuture(null);
+                })
+                .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+                .thenCompose(policies -> {
+                    CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
+                    if (namespaceName.isGlobal()) {
 
-        // ensure that non-global namespace is directed to the correct cluster
-        if (!namespaceName.isGlobal()) {
-            validateClusterOwnership(namespaceName.getCluster());
-        }
+                        if (policies.replication_clusters.size() > 1) {
+                            // There are still more than one clusters configured for the global namespace
+                            throw new RestException(Status.PRECONDITION_FAILED, "Cannot delete the global namespace "
+                                    + namespaceName
+                                    + ". There are still more than one replication clusters configured.");
+                        }
+                        if (policies.replication_clusters.size() == 1
+                                && !policies.replication_clusters.contains(config().getClusterName())) {
+                            // the only replication cluster is other cluster, redirect
+                            String replCluster = Lists.newArrayList(policies.replication_clusters).get(0);
+                            future = clusterResources().getClusterAsync(replCluster)
+                                    .thenCompose(clusterData -> {
+                                        if (clusterData.isEmpty()) {
+                                            throw new RestException(Status.NOT_FOUND,
+                                                    "Cluster " + replCluster + " does not exist");
+                                        }
+                                        ClusterData replClusterData = clusterData.get();
+                                        URL replClusterUrl;
+                                        try {
+                                            if (!config().isTlsEnabled() || !isRequestHttps()) {
+                                                replClusterUrl = new URL(replClusterData.getServiceUrl());
+                                            } else if (StringUtils.isNotBlank(replClusterData.getServiceUrlTls())) {
+                                                replClusterUrl = new URL(replClusterData.getServiceUrlTls());
+                                            } else {
+                                                throw new RestException(Status.PRECONDITION_FAILED,
+                                                        "The replication cluster does not provide TLS encrypted "
+                                                                + "service");
+                                            }
+                                        } catch (MalformedURLException malformedURLException) {
+                                            throw new RestException(malformedURLException);
+                                        }
 
-        Policies policies = getNamespacePolicies(namespaceName);
-        // ensure the local cluster is the only cluster for the global namespace configuration
-        try {
-            if (namespaceName.isGlobal()) {
-                if (policies.replication_clusters.size() > 1) {
-                    // There are still more than one clusters configured for the global namespace
-                    throw new RestException(Status.PRECONDITION_FAILED, "Cannot delete the global namespace "
-                            + namespaceName + ". There are still more than one replication clusters configured.");
-                }
-                if (policies.replication_clusters.size() == 1
-                        && !policies.replication_clusters.contains(config().getClusterName())) {
-                    // the only replication cluster is other cluster, redirect
-                    String replCluster = Lists.newArrayList(policies.replication_clusters).get(0);
-                    ClusterData replClusterData =
-                            clusterResources().getCluster(replCluster)
-                            .orElseThrow(() -> new RestException(Status.NOT_FOUND,
-                                    "Cluster " + replCluster + " does not exist"));
-                    URL replClusterUrl;
-                    if (!config().isTlsEnabled() || !isRequestHttps()) {
-                        replClusterUrl = new URL(replClusterData.getServiceUrl());
-                    } else if (StringUtils.isNotBlank(replClusterData.getServiceUrlTls())) {
-                        replClusterUrl = new URL(replClusterData.getServiceUrlTls());
-                    } else {
-                        throw new RestException(Status.PRECONDITION_FAILED,
-                                "The replication cluster does not provide TLS encrypted service");
-                    }
-                    URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(replClusterUrl.getHost())
-                            .port(replClusterUrl.getPort())
-                            .replaceQueryParam("authoritative", false).build();
-                    if (log.isDebugEnabled()) {
-                        log.debug("[{}] Redirecting the rest call to {}: cluster={}",
-                                clientAppId(), redirect, replCluster);
+                                        URI redirect =
+                                                UriBuilder.fromUri(uri.getRequestUri()).host(replClusterUrl.getHost())
+                                                        .port(replClusterUrl.getPort())
+                                                        .replaceQueryParam("authoritative", false).build();
+                                        if (log.isDebugEnabled()) {
+                                            log.debug("[{}] Redirecting the rest call to {}: cluster={}",
+                                                    clientAppId(), redirect, replCluster);
+                                        }
+                                        throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
+                                    });
+                        }
                     }
-                    throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
-                }
-            }
-        } catch (WebApplicationException wae) {
-            throw wae;
-        } catch (Exception e) {
-            throw new RestException(e);
-        }
+                    return future.thenCompose(__ -> {
+                        NamespaceBundle bundle =
+                                validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
+                                        authoritative, true);
+                        return pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName)
+                                .thenCompose(topics -> {
+                                    CompletableFuture<Void> deleteTopicsFuture =
+                                            CompletableFuture.completedFuture(null);
+                                    if (!force) {
+                                        List<CompletableFuture<NamespaceBundle>> futures = new ArrayList<>();
+                                        for (String topic : topics) {
+                                            futures.add(pulsar().getNamespaceService()
+                                                    .getBundleAsync(TopicName.get(topic))
+                                                    .thenCompose(topicBundle -> {
+                                                        if (bundle.equals(topicBundle)) {
+                                                            throw new RestException(Status.CONFLICT,
+                                                                    "Cannot delete non empty bundle");
+                                                        }
+                                                        return CompletableFuture.completedFuture(null);
+                                                    }));
 
-        try {
-            NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
-                authoritative, true);
-            // directly remove from owned namespace map and ephemeral node from ZK
-            pulsar().getNamespaceService().removeOwnedServiceUnit(bundle);
-        } catch (WebApplicationException wae) {
-            throw wae;
-        } catch (Exception e) {
-            log.error("[{}] Failed to remove namespace bundle {}/{}", clientAppId(), namespaceName.toString(),
-                    bundleRange, e);
-            throw new RestException(e);
-        }
+                                        }
+                                        deleteTopicsFuture = FutureUtil.waitForAll(futures);
+                                    }
+                                    return deleteTopicsFuture.thenCompose(
+                                            ___ -> pulsar().getNamespaceService().removeOwnedServiceUnitAsync(bundle));
+                                });
+                    });
+                });
     }
 
     protected CompletableFuture<Void> internalGrantPermissionOnNamespaceAsync(String role, Set<AuthAction> actions) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 673435fd4c8..9d09086c9b2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -259,14 +259,22 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace"),
             @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
-            @ApiResponse(code = 409, message = "Namespace bundle is not empty") })
-    public void deleteNamespaceBundle(@PathParam("property") String property,
-            @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
-            @PathParam("bundle") String bundleRange,
-            @QueryParam("force") @DefaultValue("false") boolean force,
-            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
-        validateNamespaceName(property, cluster, namespace);
-        internalDeleteNamespaceBundle(bundleRange, authoritative, force);
+            @ApiResponse(code = 409, message = "Namespace bundle is not empty")})
+    public void deleteNamespaceBundle(@Suspended AsyncResponse response, @PathParam("property") String property,
+                                      @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
+                                      @PathParam("bundle") String bundleRange,
+                                      @QueryParam("force") @DefaultValue("false") boolean force,
+                                      @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+        validateNamespaceName(property, cluster, namespace);
+        internalDeleteNamespaceBundleAsync(bundleRange, authoritative, force)
+                .thenRun(() -> response.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to delete namespace bundle {}", clientAppId(), namespaceName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(response, ex);
+                    return null;
+                });
     }
 
     @GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index ab7388cf3eb..91921489d50 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -207,13 +207,22 @@ public class Namespaces extends NamespacesBase {
             @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace"),
             @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
-            @ApiResponse(code = 409, message = "Namespace bundle is not empty") })
-    public void deleteNamespaceBundle(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
-            @PathParam("bundle") String bundleRange,
-            @QueryParam("force") @DefaultValue("false") boolean force,
-            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+            @ApiResponse(code = 409, message = "Namespace bundle is not empty")})
+    public void deleteNamespaceBundle(@Suspended AsyncResponse response, @PathParam("tenant") String tenant,
+                                      @PathParam("namespace") String namespace,
+                                      @PathParam("bundle") String bundleRange,
+                                      @QueryParam("force") @DefaultValue("false") boolean force,
+                                      @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateNamespaceName(tenant, namespace);
-        internalDeleteNamespaceBundle(bundleRange, authoritative, force);
+        internalDeleteNamespaceBundleAsync(bundleRange, authoritative, force)
+                .thenRun(() -> response.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to delete namespace bundle {}", clientAppId(), namespaceName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(response, ex);
+                    return null;
+                });
     }
 
     @GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 86fd5b58662..eecabf3fb36 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1047,10 +1047,9 @@ public class NamespaceService implements AutoCloseable {
                 .thenCompose(ownershipCache::checkOwnershipAsync);
     }
 
-    public void removeOwnedServiceUnit(NamespaceBundle nsBundle) throws Exception {
-        ownershipCache.removeOwnership(nsBundle).get(
-                pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
-        bundleFactory.invalidateBundleCache(nsBundle.getNamespaceObject());
+    public CompletableFuture<Void> removeOwnedServiceUnitAsync(NamespaceBundle nsBundle) {
+        return ownershipCache.removeOwnership(nsBundle)
+                .thenRun(() -> bundleFactory.invalidateBundleCache(nsBundle.getNamespaceObject()));
     }
 
     protected void onNamespaceBundleOwned(NamespaceBundle bundle) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index e415c4d9ca1..11a16d4d17f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -837,18 +837,16 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         doReturn(preconditionFailed).when(namespacesAdmin)
                 .deleteNamespaceBundleAsync(Mockito.anyString(), Mockito.anyString());
 
-        try {
-            namespaces.deleteNamespaceBundle(testTenant, testLocalCluster, bundledNsLocal, "0x00000000_0x80000000",
-                    false, false);
-            fail("Should have failed");
-        } catch (RestException re) {
-            assertEquals(re.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
-        }
+        AsyncResponse response = mock(AsyncResponse.class);
+        ArgumentCaptor<RestException> captor = ArgumentCaptor.forClass(RestException.class);
+        namespaces.deleteNamespaceBundle(response, testTenant, testLocalCluster, bundledNsLocal,
+                "0x00000000_0x80000000", false, false);
+        verify(response, timeout(5000).times(1)).resume(captor.capture());
+        assertEquals(captor.getValue().getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
         NamespaceBundles nsBundles = nsSvc.getNamespaceBundleFactory().getBundles(testNs, bundleData);
         doReturn(Optional.empty()).when(nsSvc).getWebServiceUrl(any(NamespaceBundle.class), any(LookupOptions.class));
-        AsyncResponse response = mock(AsyncResponse.class);
+        response = mock(AsyncResponse.class);
         namespaces.deleteNamespace(response, testTenant, testLocalCluster, bundledNsLocal, false, false);
-        ArgumentCaptor<RestException> captor = ArgumentCaptor.forClass(RestException.class);
         verify(response, timeout(5000).times(1)).resume(captor.capture());
         assertEquals(captor.getValue().getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
         // make one bundle owned
@@ -857,13 +855,11 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         doReturn(true).when(nsSvc).isServiceUnitOwned(nsBundles.getBundles().get(0));
         doReturn(CompletableFuture.completedFuture(null)).when(namespacesAdmin).deleteNamespaceBundleAsync(
                 testTenant + "/" + testLocalCluster + "/" + bundledNsLocal, "0x00000000_0x80000000");
-        try {
-            namespaces.deleteNamespaceBundle(testTenant, testLocalCluster, bundledNsLocal, "0x80000000_0xffffffff",
-                    false, false);
-            fail("Should have failed");
-        } catch (RestException re) {
-            assertEquals(re.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
-        }
+        response = mock(AsyncResponse.class);
+        namespaces.deleteNamespaceBundle(response, testTenant, testLocalCluster, bundledNsLocal,
+                "0x80000000_0xffffffff",  false, false);
+        verify(response, timeout(5000).times(1)).resume(captor.capture());
+        assertEquals(captor.getValue().getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
         response = mock(AsyncResponse.class);
         doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getWebServiceUrl(any(NamespaceBundle.class), any(LookupOptions.class));
         for (NamespaceBundle bundle : nsBundles.getBundles()) {