You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/05/13 03:11:07 UTC

[pulsar] branch master updated: [improve][broker] Make some methods of `ClusterBase` pure async. (#15437)

This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new aa03ccd2ccd [improve][broker] Make some methods of `ClusterBase` pure async. (#15437)
aa03ccd2ccd is described below

commit aa03ccd2ccd2ac664a463b16eb6d6a402e76b120
Author: Qiang Zhao <ma...@gmail.com>
AuthorDate: Fri May 13 11:10:57 2022 +0800

    [improve][broker] Make some methods of `ClusterBase` pure async. (#15437)
---
 .../pulsar/broker/admin/impl/ClustersBase.java     | 194 ++++++++++-----------
 .../pulsar/broker/namespace/NamespaceService.java  |  13 +-
 .../apache/pulsar/broker/admin/AdminApi2Test.java  |   1 +
 .../org/apache/pulsar/broker/admin/AdminTest.java  |  14 +-
 .../pulsar/broker/admin/v1/V1_AdminApiTest.java    |   1 +
 5 files changed, 109 insertions(+), 114 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
index 6ffd2ee0058..80309b2a026 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java
@@ -32,7 +32,6 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
@@ -397,31 +396,49 @@ public class ClustersBase extends AdminResource {
             @ApiResponse(code = 404, message = "Cluster doesn't exist."),
             @ApiResponse(code = 500, message = "Internal server error.")
     })
-    public Map<String, ? extends NamespaceIsolationData> getNamespaceIsolationPolicies(
-        @ApiParam(
-            value = "The cluster name",
-            required = true
-        )
-        @PathParam("cluster") String cluster
-    ) throws Exception {
-        validateSuperUserAccess();
-        if (!clusterResources().clusterExists(cluster)) {
-            throw new RestException(Status.NOT_FOUND, "Cluster " + cluster + " does not exist.");
-        }
+    public void getNamespaceIsolationPolicies(
+        @Suspended AsyncResponse asyncResponse,
+        @ApiParam(value = "The cluster name", required = true) @PathParam("cluster") String cluster
+    ) {
+        validateSuperUserAccessAsync()
+                .thenCompose(__ -> validateClusterExistAsync(cluster, Status.NOT_FOUND))
+                .thenCompose(__ -> internalGetNamespaceIsolationPolicies(cluster))
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get clusters/{}/namespaceIsolationPolicies", clientAppId(), cluster, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
+    }
 
-        try {
-            NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPolicies()
-                    .getIsolationDataPolicies(cluster)
-                    .orElseThrow(() -> new RestException(Status.NOT_FOUND,
-                            "NamespaceIsolationPolicies for cluster " + cluster + " does not exist"));
-            // construct the response to Namespace isolation data map
-            return nsIsolationPolicies.getPolicies();
-        } catch (Exception e) {
-            log.error("[{}] Failed to get clusters/{}/namespaceIsolationPolicies", clientAppId(), cluster, e);
-            throw new RestException(e);
-        }
+    /**
+     * Verify that the cluster exists.
+     * For compatibility to avoid breaking changes, we can specify a REST status code when it doesn't exist.
+     * @param cluster Cluster name
+     * @param notExistStatus REST status code
+     */
+    private CompletableFuture<Void> validateClusterExistAsync(String cluster, Status notExistStatus) {
+        return clusterResources().clusterExistsAsync(cluster)
+                .thenAccept(clusterExist -> {
+                    if (!clusterExist) {
+                        throw new RestException(notExistStatus, "Cluster " + cluster + " does not exist.");
+                    }
+                });
+    }
+
+    private CompletableFuture<Map<String, NamespaceIsolationDataImpl>> internalGetNamespaceIsolationPolicies(
+            String cluster) {
+            return namespaceIsolationPolicies().getIsolationDataPoliciesAsync(cluster)
+                    .thenApply(namespaceIsolationPolicies -> {
+                        if (!namespaceIsolationPolicies.isPresent()) {
+                            throw new RestException(Status.NOT_FOUND,
+                                    "NamespaceIsolationPolicies for cluster " + cluster + " does not exist");
+                        }
+                        return namespaceIsolationPolicies.get().getPolicies();
+                    });
     }
 
+
     @GET
     @Path("/{cluster}/namespaceIsolationPolicies/{policyName}")
     @ApiOperation(
@@ -435,40 +452,28 @@ public class ClustersBase extends AdminResource {
             @ApiResponse(code = 412, message = "Cluster doesn't exist."),
             @ApiResponse(code = 500, message = "Internal server error.")
     })
-    public NamespaceIsolationData getNamespaceIsolationPolicy(
-        @ApiParam(
-            value = "The cluster name",
-            required = true
-        )
-        @PathParam("cluster") String cluster,
-        @ApiParam(
-            value = "The name of the namespace isolation policy",
-            required = true
-        )
+    public void getNamespaceIsolationPolicy(
+        @Suspended AsyncResponse asyncResponse,
+        @ApiParam(value = "The cluster name", required = true) @PathParam("cluster") String cluster,
+        @ApiParam(value = "The name of the namespace isolation policy", required = true)
         @PathParam("policyName") String policyName
-    ) throws Exception {
-        validateSuperUserAccess();
-        validateClusterExists(cluster);
-
-        try {
-            NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPolicies()
-                    .getIsolationDataPolicies(cluster)
-                    .orElseThrow(() -> new RestException(Status.NOT_FOUND,
-                            "NamespaceIsolationPolicies for cluster " + cluster + " does not exist"));
-            // construct the response to Namespace isolation data map
-            if (!nsIsolationPolicies.getPolicies().containsKey(policyName)) {
-                log.info("[{}] Cannot find NamespaceIsolationPolicy {} for cluster {}",
-                        clientAppId(), policyName, cluster);
-                throw new RestException(Status.NOT_FOUND,
-                        "Cannot find NamespaceIsolationPolicy " + policyName + " for cluster " + cluster);
-            }
-            return nsIsolationPolicies.getPolicies().get(policyName);
-        } catch (RestException re) {
-            throw re;
-        } catch (Exception e) {
-            log.error("[{}] Failed to get clusters/{}/namespaceIsolationPolicies/{}", clientAppId(), cluster, e);
-            throw new RestException(e);
-        }
+    ) {
+        validateSuperUserAccessAsync()
+                .thenCompose(__ -> validateClusterExistAsync(cluster, Status.PRECONDITION_FAILED))
+                .thenCompose(__ -> internalGetNamespaceIsolationPolicies(cluster))
+                .thenAccept(policies -> {
+                    // construct the response to Namespace isolation data map
+                    if (!policies.containsKey(policyName)) {
+                        throw new RestException(Status.NOT_FOUND,
+                                "Cannot find NamespaceIsolationPolicy " + policyName + " for cluster " + cluster);
+                    }
+                    asyncResponse.resume(policies.get(policyName));
+                }).exceptionally(ex -> {
+                    log.error("[{}] Failed to get clusters/{}/namespaceIsolationPolicies/{}",
+                            clientAppId(), cluster, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET
@@ -485,53 +490,44 @@ public class ClustersBase extends AdminResource {
         @ApiResponse(code = 412, message = "Cluster doesn't exist."),
         @ApiResponse(code = 500, message = "Internal server error.")
     })
-    public List<BrokerNamespaceIsolationData> getBrokersWithNamespaceIsolationPolicy(
-            @ApiParam(
-                value = "The cluster name",
-                required = true
-            )
+    public void getBrokersWithNamespaceIsolationPolicy(
+            @Suspended AsyncResponse asyncResponse,
+            @ApiParam(value = "The cluster name", required = true)
             @PathParam("cluster") String cluster) {
-        validateSuperUserAccess();
-        validateClusterExists(cluster);
-
-        Set<String> availableBrokers;
-        Map<String, ? extends NamespaceIsolationData> nsPolicies;
-        try {
-            availableBrokers = pulsar().getLoadManager().get().getAvailableBrokers();
-        } catch (Exception e) {
-            log.error("[{}] Failed to get list of brokers in cluster {}", clientAppId(), cluster, e);
-            throw new RestException(e);
-        }
-        try {
-            Optional<NamespaceIsolationPolicies> nsPoliciesResult = namespaceIsolationPolicies()
-                    .getIsolationDataPolicies(cluster);
-            if (!nsPoliciesResult.isPresent()) {
-                throw new RestException(Status.NOT_FOUND, "namespace-isolation policies not found for " + cluster);
-            }
-            nsPolicies = nsPoliciesResult.get().getPolicies();
-        } catch (Exception e) {
-            log.error("[{}] Failed to get namespace isolation-policies {}", clientAppId(), cluster, e);
-            throw new RestException(e);
-        }
-        return availableBrokers.stream().map(broker -> {
-            BrokerNamespaceIsolationData.Builder brokerIsolationData = BrokerNamespaceIsolationData.builder()
-                    .brokerName(broker);
-            if (nsPolicies != null) {
-                List<String> namespaceRegexes = new ArrayList<>();
-                nsPolicies.forEach((name, policyData) -> {
-                    NamespaceIsolationPolicyImpl nsPolicyImpl = new NamespaceIsolationPolicyImpl(policyData);
-                    if (nsPolicyImpl.isPrimaryBroker(broker) || nsPolicyImpl.isSecondaryBroker(broker)) {
-                        namespaceRegexes.addAll(policyData.getNamespaces());
-                        if (nsPolicyImpl.isPrimaryBroker(broker)) {
-                            brokerIsolationData.primary(true);
-                        }
-                    }
+        validateSuperUserAccessAsync()
+                .thenCompose(__ -> validateClusterExistAsync(cluster, Status.PRECONDITION_FAILED))
+                .thenCompose(__ -> pulsar().getLoadManager().get().getAvailableBrokersAsync())
+                .thenCompose(availableBrokers -> internalGetNamespaceIsolationPolicies(cluster)
+                        .thenApply(policies -> availableBrokers.stream()
+                                .map(broker -> internalGetBrokerNsIsolationData(broker, policies))
+                                .collect(Collectors.toList())))
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    log.error("[{}] Failed to get namespace isolation-policies {}", clientAppId(), cluster, ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
                 });
+    }
 
-                brokerIsolationData.namespaceRegex(namespaceRegexes);
-            }
+    private BrokerNamespaceIsolationData internalGetBrokerNsIsolationData(
+                                                                    String broker,
+                                                                    Map<String, NamespaceIsolationDataImpl> policies) {
+        BrokerNamespaceIsolationData.Builder brokerIsolationData =
+                BrokerNamespaceIsolationData.builder().brokerName(broker);
+        if (policies == null) {
             return brokerIsolationData.build();
-        }).collect(Collectors.toList());
+        }
+        List<String> namespaceRegexes = new ArrayList<>();
+        policies.forEach((name, policyData) -> {
+            NamespaceIsolationPolicyImpl nsPolicyImpl = new NamespaceIsolationPolicyImpl(policyData);
+            if (nsPolicyImpl.isPrimaryBroker(broker) || nsPolicyImpl.isSecondaryBroker(broker)) {
+                namespaceRegexes.addAll(policyData.getNamespaces());
+                brokerIsolationData.primary(nsPolicyImpl.isPrimaryBroker(broker));
+                brokerIsolationData.policyName(name);
+            }
+        });
+        brokerIsolationData.namespaceRegex(namespaceRegexes);
+        return brokerIsolationData.build();
     }
 
     @GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 558d408f1d3..8d6480db964 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -733,8 +733,10 @@ public class NamespaceService implements AutoCloseable {
     }
 
     public CompletableFuture<Map<String, NamespaceOwnershipStatus>> getOwnedNameSpacesStatusAsync() {
-       return getLocalNamespaceIsolationPoliciesAsync()
-                .thenCompose(namespaceIsolationPolicies -> {
+       return pulsar.getPulsarResources().getNamespaceResources().getIsolationPolicies()
+               .getIsolationDataPoliciesAsync(pulsar.getConfiguration().getClusterName())
+               .thenApply(nsIsolationPoliciesOpt -> nsIsolationPoliciesOpt.orElseGet(NamespaceIsolationPolicies::new))
+               .thenCompose(namespaceIsolationPolicies -> {
                     Collection<CompletableFuture<OwnedBundle>> futures =
                             ownershipCache.getOwnedBundlesAsync().values();
                     return FutureUtil.waitForAll(futures)
@@ -768,13 +770,6 @@ public class NamespaceService implements AutoCloseable {
         return nsOwnedStatus;
     }
 
-    private CompletableFuture<NamespaceIsolationPolicies> getLocalNamespaceIsolationPoliciesAsync() {
-        String localCluster = pulsar.getConfiguration().getClusterName();
-        return pulsar.getPulsarResources().getNamespaceResources().getIsolationPolicies()
-                .getIsolationDataPoliciesAsync(localCluster)
-                .thenApply(nsIsolationPolicies -> nsIsolationPolicies.orElseGet(NamespaceIsolationPolicies::new));
-    }
-
     public boolean isNamespaceBundleDisabled(NamespaceBundle bundle) throws Exception {
         try {
             // Does ZooKeeper says that the namespace is disabled?
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index a6b89112634..a1a674eac9c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -1029,6 +1029,7 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest {
         assertEquals(brokerIsolationDataList.get(0).getBrokerName(), brokerAddress);
         assertEquals(brokerIsolationDataList.get(0).getNamespaceRegex().size(), 1);
         assertEquals(brokerIsolationDataList.get(0).getNamespaceRegex().get(0), namespaceRegex);
+        assertEquals(brokerIsolationDataList.get(0).getPolicyName(), policyName1);
 
         BrokerNamespaceIsolationDataImpl brokerIsolationData = (BrokerNamespaceIsolationDataImpl) admin.clusters()
                 .getBrokerWithNamespaceIsolationPolicy(cluster, brokerAddress);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 4677ccb521e..bfc29250028 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -202,6 +202,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
     }
 
     @Test
+    @SuppressWarnings("unchecked")
     public void clusters() throws Exception {
         assertEquals(asynRequests(ctx -> clusters.getClusters(ctx)), Sets.newHashSet());
         verify(clusters, never()).validateSuperUserAccessAsync();
@@ -239,7 +240,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
                 ClusterData.builder().serviceUrl("http://new-broker.messaging.use.example.com:8080").build());
 
         try {
-            clusters.getNamespaceIsolationPolicies("use");
+            asynRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, "use"));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 404);
@@ -259,7 +260,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
                 .build();
         AsyncResponse response = mock(AsyncResponse.class);
         clusters.setNamespaceIsolationPolicy(response,"use", "policy1", policyData);
-        clusters.getNamespaceIsolationPolicies("use");
+        asynRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, "use"));
 
         try {
             asynRequests(ctx -> clusters.deleteCluster(ctx, "use"));
@@ -269,7 +270,8 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
         }
 
         clusters.deleteNamespaceIsolationPolicy("use", "policy1");
-        assertTrue(clusters.getNamespaceIsolationPolicies("use").isEmpty());
+        assertTrue(((Map<String, NamespaceIsolationDataImpl>) asynRequests(ctx ->
+                clusters.getNamespaceIsolationPolicies(ctx, "use"))).isEmpty());
 
         asynRequests(ctx -> clusters.deleteCluster(ctx, "use"));
         assertEquals(asynRequests(ctx -> clusters.getClusters(ctx)), Sets.newHashSet());
@@ -289,7 +291,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
         }
 
         try {
-            clusters.getNamespaceIsolationPolicies("use");
+            asynRequests(ctx -> clusters.getNamespaceIsolationPolicies(ctx, "use"));
             fail("should have failed");
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), 404);
@@ -406,8 +408,8 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
         } catch (RestException e) {
             assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
         }
-        verify(clusters, times(18)).validateSuperUserAccessAsync();
-        verify(clusters, times(6)).validateSuperUserAccess();
+        verify(clusters, times(22)).validateSuperUserAccessAsync();
+        verify(clusters, times(2)).validateSuperUserAccess();
     }
 
     Object asynRequests(Consumer<TestAsyncResponse> function) throws Exception {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
index 6ff29a1bc19..6289dca6e68 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
@@ -287,6 +287,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
             List<BrokerNamespaceIsolationData> isoList = admin.clusters().getBrokersWithNamespaceIsolationPolicy("use");
             assertEquals(isoList.size(), 1);
             assertTrue(isoList.get(0).isPrimary());
+            assertEquals(isoList.get(0).getPolicyName(), policyName1);
 
             // verify update of primary
             nsPolicyData1.getPrimary().remove(0);