You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/05/27 14:37:02 UTC
[pulsar] branch master updated: [improve][broker] Make some operation SubscribeRate methods in Namespaces async (#15656)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d09c6eb26ad [improve][broker] Make some operation SubscribeRate methods in Namespaces async (#15656)
d09c6eb26ad is described below
commit d09c6eb26adcaf0aff2c3464e445eb8df5af70a6
Author: yanliang <ya...@163.com>
AuthorDate: Fri May 27 22:36:51 2022 +0800
[improve][broker] Make some operation SubscribeRate methods in Namespaces async (#15656)
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 43 ++----
.../apache/pulsar/broker/admin/v2/Namespaces.java | 45 ++++--
.../pulsar/broker/admin/NamespacesV2Test.java | 162 +++++++++++++++++++++
3 files changed, 210 insertions(+), 40 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index dd882dfdadf..b143a03038b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1363,43 +1363,30 @@ public abstract class NamespacesBase extends AdminResource {
return policies.subscriptionDispatchRate.get(pulsar().getConfiguration().getClusterName());
}
- protected void internalSetSubscribeRate(SubscribeRate subscribeRate) {
- validateSuperUserAccess();
+ protected CompletableFuture<Void> internalSetSubscribeRateAsync(SubscribeRate subscribeRate) {
log.info("[{}] Set namespace subscribe-rate {}/{}", clientAppId(), namespaceName, subscribeRate);
- try {
- updatePolicies(namespaceName, policies -> {
- policies.clusterSubscribeRate.put(pulsar().getConfiguration().getClusterName(), subscribeRate);
- return policies;
- });
+ return validateSuperUserAccessAsync().thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
+ policies.clusterSubscribeRate.put(pulsar().getConfiguration().getClusterName(), subscribeRate);
log.info("[{}] Successfully updated the subscribeRate for cluster on namespace {}", clientAppId(),
namespaceName);
- } catch (Exception e) {
- log.error("[{}] Failed to update the subscribeRate for cluster on namespace {}", clientAppId(),
- namespaceName, e);
- throw new RestException(e);
- }
+ return policies;
+ }));
}
- protected void internalDeleteSubscribeRate() {
- validateSuperUserAccess();
- try {
- updatePolicies(namespaceName, policies -> {
- policies.clusterSubscribeRate.remove(pulsar().getConfiguration().getClusterName());
- return policies;
- });
+ protected CompletableFuture<Void> internalDeleteSubscribeRateAsync() {
+ return validateSuperUserAccessAsync().thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
+ policies.clusterSubscribeRate.remove(pulsar().getConfiguration().getClusterName());
log.info("[{}] Successfully delete the subscribeRate for cluster on namespace {}", clientAppId(),
namespaceName);
- } catch (Exception e) {
- log.error("[{}] Failed to delete the subscribeRate for cluster on namespace {}", clientAppId(),
- namespaceName, e);
- throw new RestException(e);
- }
+ return policies;
+ }));
}
- protected SubscribeRate internalGetSubscribeRate() {
- validateNamespacePolicyOperation(namespaceName, PolicyName.RATE, PolicyOperation.READ);
- Policies policies = getNamespacePolicies(namespaceName);
- return policies.clusterSubscribeRate.get(pulsar().getConfiguration().getClusterName());
+
+ protected CompletableFuture<SubscribeRate> internalGetSubscribeRateAsync() {
+ return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.RATE, PolicyOperation.READ)
+ .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
+ .thenApply(policies -> policies.clusterSubscribeRate.get(pulsar().getConfiguration().getClusterName()));
}
protected void internalRemoveReplicatorDispatchRate() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 52e47b534dc..e246c7d86e6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -815,31 +815,52 @@ public class Namespaces extends NamespacesBase {
@DELETE
@Path("/{tenant}/{namespace}/subscribeRate")
@ApiOperation(value = "Delete subscribe-rate throttling for all topics of the namespace")
- @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
- public void deleteSubscribeRate(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) {
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission")})
+ public void deleteSubscribeRate(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
- internalDeleteSubscribeRate();
+ internalDeleteSubscribeRateAsync()
+ .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("Failed to delete the subscribeRate for cluster on namespace {}", namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
@Path("/{tenant}/{namespace}/subscribeRate")
@ApiOperation(value = "Set subscribe-rate throttling for all topics of the namespace")
- @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") })
- public void setSubscribeRate(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
- @ApiParam(value = "Subscribe rate for all topics of the specified namespace") SubscribeRate subscribeRate) {
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission")})
+ public void setSubscribeRate(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @ApiParam(value = "Subscribe rate for all topics of the specified namespace")
+ SubscribeRate subscribeRate) {
validateNamespaceName(tenant, namespace);
- internalSetSubscribeRate(subscribeRate);
+ internalSetSubscribeRateAsync(subscribeRate)
+ .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ log.error("Failed to update the subscribeRate for cluster on namespace {}", namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
@Path("/{tenant}/{namespace}/subscribeRate")
@ApiOperation(value = "Get subscribe-rate configured for the namespace")
- @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
- @ApiResponse(code = 404, message = "Namespace does not exist") })
- public SubscribeRate getSubscribeRate(@PathParam("tenant") String tenant,
- @PathParam("namespace") String namespace) {
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Namespace does not exist")})
+ public void getSubscribeRate(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
- return internalGetSubscribeRate();
+ internalGetSubscribeRateAsync()
+ .thenAccept(subscribeRate -> asyncResponse.resume(subscribeRate))
+ .exceptionally(ex -> {
+ log.error("Failed to get subscribe rate for namespace {}", namespaceName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@DELETE
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
new file mode 100644
index 00000000000..4c4929390bd
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesV2Test.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import java.lang.reflect.Field;
+import java.util.List;
+import java.util.Objects;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriInfo;
+import org.apache.pulsar.broker.admin.v2.Namespaces;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.web.PulsarWebResource;
+import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PolicyName;
+import org.apache.pulsar.common.policies.data.PolicyOperation;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-admin-v2")
+public class NamespacesV2Test extends MockedPulsarServiceBaseTest {
+ private static final Logger log = LoggerFactory.getLogger(NamespacesV2Test.class);
+
+ private Namespaces namespaces;
+
+ private List<NamespaceName> testLocalNamespaces;
+ private final String testNamespace = "v2-test-namespace";
+ private final String testTenant = "v2-tenant";
+ private final String testLocalCluster = "use";
+
+ protected NamespaceService nsSvc;
+ protected Field uriField;
+ protected UriInfo uriInfo;
+
+ public NamespacesV2Test() {
+ super();
+ }
+
+ @BeforeClass
+ public void initNamespace() throws Exception {
+ testLocalNamespaces = Lists.newArrayList();
+ testLocalNamespaces.add(NamespaceName.get(this.testTenant, this.testLocalCluster, this.testNamespace));
+
+ uriField = PulsarWebResource.class.getDeclaredField("uri");
+ uriField.setAccessible(true);
+ uriInfo = mock(UriInfo.class);
+ }
+
+ @Override
+ @BeforeMethod
+ public void setup() throws Exception {
+ resetConfig();
+ conf.setClusterName(testLocalCluster);
+ super.internalSetup();
+
+ namespaces = spy(Namespaces.class);
+ namespaces.setServletContext(new MockServletContext());
+ namespaces.setPulsar(pulsar);
+ doReturn(false).when(namespaces).isRequestHttps();
+ doReturn("test").when(namespaces).clientAppId();
+ doReturn(null).when(namespaces).originalPrincipal();
+ doReturn(null).when(namespaces).clientAuthData();
+ doReturn(Sets.newTreeSet(Lists.newArrayList("use", "usw", "usc", "global"))).when(namespaces).clusters();
+
+ admin.clusters().createCluster("use", ClusterData.builder().serviceUrl("http://broker-use.com:8080").build());
+ admin.clusters().createCluster("usw", ClusterData.builder().serviceUrl("http://broker-usw.com:8080").build());
+ admin.clusters().createCluster("usc", ClusterData.builder().serviceUrl("http://broker-usc.com:8080").build());
+ admin.tenants().createTenant(this.testTenant,
+ new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("use", "usc", "usw")));
+
+ createTestNamespaces(this.testLocalNamespaces);
+
+ doThrow(new RestException(Response.Status.UNAUTHORIZED, "unauthorized")).when(namespaces)
+ .validateNamespacePolicyOperation(NamespaceName.get("other-tenant/use/test-namespace-1"),
+ PolicyName.PERSISTENCE, PolicyOperation.WRITE);
+
+ doThrow(new RestException(Response.Status.UNAUTHORIZED, "unauthorized")).when(namespaces)
+ .validateNamespacePolicyOperation(NamespaceName.get("other-tenant/use/test-namespace-1"),
+ PolicyName.RETENTION, PolicyOperation.WRITE);
+
+ nsSvc = pulsar.getNamespaceService();
+ }
+
+ @Override
+ @AfterMethod(alwaysRun = true)
+ public void cleanup() throws Exception {
+ super.internalCleanup();
+ conf.setClusterName(testLocalCluster);
+ }
+
+ private void createTestNamespaces(List<NamespaceName> nsnames) throws Exception {
+ for (NamespaceName nsName : nsnames) {
+ asyncRequests(ctx -> namespaces.createNamespace(ctx, nsName.getTenant(),
+ nsName.getLocalName(), null));
+ }
+ }
+
+ @Test
+ public void testOperationSubscribeRate() throws Exception {
+ // 1. set subscribe rate
+ asyncRequests(response -> namespaces.setSubscribeRate(response, this.testTenant,
+ this.testNamespace, new SubscribeRate()));
+
+ // 2. query subscribe rate & check
+ SubscribeRate subscribeRate =
+ (SubscribeRate) asyncRequests(response -> namespaces.getSubscribeRate(response,
+ this.testTenant, this.testNamespace));
+ assertTrue(Objects.nonNull(subscribeRate));
+ assertTrue(Objects.isNull(SubscribeRate.normalize(subscribeRate)));
+
+ // 3. remove & check
+ asyncRequests(response -> namespaces.deleteSubscribeRate(response, this.testTenant, this.testNamespace));
+ subscribeRate =
+ (SubscribeRate) asyncRequests(response -> namespaces.getSubscribeRate(response,
+ this.testTenant, this.testNamespace));
+ assertTrue(Objects.isNull(subscribeRate));
+
+ // 4. invalid namespace check
+ String invalidNamespace = this.testNamespace + "/";
+ try {
+ asyncRequests(response -> namespaces.setSubscribeRate(response, this.testTenant, invalidNamespace,
+ new SubscribeRate()));
+ fail("should have failed");
+ } catch (RestException e) {
+ assertEquals(e.getResponse().getStatus(), Response.Status.PRECONDITION_FAILED.getStatusCode());
+ }
+ }
+
+}