You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/04/18 18:03:06 UTC
[pulsar] branch master updated: [PIP-82] [pulsar-broker] CRUD
support for ResourceGroup (#10218)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 86e89ac [PIP-82] [pulsar-broker] CRUD support for ResourceGroup (#10218)
86e89ac is described below
commit 86e89acee615dcc31ba6eebb38cf1749466daafa
Author: ravi-vaidyanathan <79...@users.noreply.github.com>
AuthorDate: Sun Apr 18 11:02:00 2021 -0700
[PIP-82] [pulsar-broker] CRUD support for ResourceGroup (#10218)
* CRUD support for ResourceGroup
* remove ZK cache, use lombok @data
Co-authored-by: Ravi Vaidyanathan <rv...@splunk.com>
Co-authored-by: Matteo Merli <mm...@apache.org>
---
.../broker/cache/ConfigurationCacheService.java | 2 +
.../pulsar/broker/resources/PulsarResources.java | 2 +
.../broker/resources/ResourceGroupResources.java | 35 +---
.../pulsar/broker/admin/impl/NamespacesBase.java | 24 ++-
.../broker/admin/impl/ResourceGroupsBase.java | 180 ++++++++++++++++++++
.../apache/pulsar/broker/admin/v2/Namespaces.java | 41 +++++
.../pulsar/broker/admin/v2/ResourceGroups.java | 82 +++++++++
.../pulsar/broker/web/PulsarWebResource.java | 24 +++
.../pulsar/broker/admin/ResourceGroupsTest.java | 181 ++++++++++++++++++++
.../org/apache/pulsar/client/admin/Namespaces.java | 91 ++++++++++
.../apache/pulsar/client/admin/PulsarAdmin.java | 5 +
.../apache/pulsar/client/admin/ResourceGroups.java | 186 +++++++++++++++++++++
.../client/admin/internal/NamespacesImpl.java | 81 +++++++++
.../client/admin/internal/PulsarAdminImpl.java | 10 ++
.../client/admin/internal/ResourceGroupsImpl.java | 169 +++++++++++++++++++
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 44 +++++
.../apache/pulsar/admin/cli/CmdResourceGroups.java | 145 ++++++++++++++++
.../apache/pulsar/admin/cli/PulsarAdminTool.java | 1 +
.../pulsar/common/policies/data/Policies.java | 12 +-
.../pulsar/common/policies/data/PolicyName.java | 3 +-
.../data/{PolicyName.java => ResourceGroup.java} | 38 ++---
21 files changed, 1295 insertions(+), 61 deletions(-)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java
index 33b30f2..fa29bdf 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java
@@ -28,6 +28,7 @@ import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.ResourceGroup;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -65,6 +66,7 @@ public class ConfigurationCacheService {
private PulsarResources pulsarResources;
public static final String POLICIES = "policies";
+ public static final String RESOURCEGROUPS = "resourcegroups";
public static final String FAILURE_DOMAIN = "failureDomain";
public final String CLUSTER_FAILURE_DOMAIN_ROOT;
public static final String POLICIES_ROOT = "/admin/policies";
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
index 48db47f..fa4853a 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
@@ -33,6 +33,7 @@ public class PulsarResources {
public static final int DEFAULT_OPERATION_TIMEOUT_SEC = 30;
private TenantResources tenantResources;
private ClusterResources clusterResources;
+ private ResourceGroupResources resourcegroupResources;
private NamespaceResources namespaceResources;
private DynamicConfigurationResources dynamicConfigResources;
private LocalPoliciesResources localPolicies;
@@ -49,6 +50,7 @@ public class PulsarResources {
tenantResources = new TenantResources(configurationMetadataStore, operationTimeoutSec);
clusterResources = new ClusterResources(configurationMetadataStore, operationTimeoutSec);
namespaceResources = new NamespaceResources(configurationMetadataStore, operationTimeoutSec);
+ resourcegroupResources = new ResourceGroupResources(configurationMetadataStore, operationTimeoutSec);
}
if (localMetadataStore != null) {
dynamicConfigResources = new DynamicConfigurationResources(localMetadataStore, operationTimeoutSec);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ResourceGroupResources.java
similarity index 60%
copy from pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
copy to pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ResourceGroupResources.java
index 81cfb2a..7d6bf6f 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ResourceGroupResources.java
@@ -16,34 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.pulsar.broker.resources;
-package org.apache.pulsar.common.policies.data;
+import org.apache.pulsar.common.policies.data.ResourceGroup;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
-/**
- * PolicyName authorization operations.
- */
-public enum PolicyName {
- ALL,
- ANTI_AFFINITY,
- BACKLOG,
- COMPACTION,
- DELAYED_DELIVERY,
- INACTIVE_TOPIC,
- DEDUPLICATION,
- MAX_CONSUMERS,
- MAX_PRODUCERS,
- DEDUPLICATION_SNAPSHOT,
- MAX_UNACKED,
- MAX_SUBSCRIPTIONS,
- OFFLOAD,
- PERSISTENCE,
- RATE,
- RETENTION,
- REPLICATION,
- REPLICATION_RATE,
- SCHEMA_COMPATIBILITY_STRATEGY,
- SUBSCRIPTION_AUTH_MODE,
- ENCRYPTION,
- TTL,
- MAX_TOPICS
+public class ResourceGroupResources extends BaseResources<ResourceGroup> {
+ public ResourceGroupResources(MetadataStoreExtended store, int operationTimeoutSec) {
+ super(store, ResourceGroup.class, operationTimeoutSec);
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 0b53838..e33ddfa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.admin.impl;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.RESOURCEGROUPS;
import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
import static org.apache.pulsar.common.policies.data.Policies.getBundles;
import com.google.common.collect.Lists;
@@ -2565,5 +2566,26 @@ public abstract class NamespacesBase extends AdminResource {
}
}
- private static final Logger log = LoggerFactory.getLogger(NamespacesBase.class);
+ protected void internalSetNamespaceResourceGroup(String rgName) {
+ validateNamespacePolicyOperation(namespaceName, PolicyName.RESOURCEGROUP, PolicyOperation.WRITE);
+ validatePoliciesReadOnlyAccess();
+
+ if (rgName != null) {
+ final String resourceGroupPath = AdminResource.path(RESOURCEGROUPS, rgName);
+ // check resourcegroup exists.
+ try {
+ if (!resourceGroupResources().exists(resourceGroupPath)) {
+ throw new RestException(Status.PRECONDITION_FAILED, "ResourceGroup does not exist");
+ }
+ } catch (Exception e) {
+ log.error("[{}] Invalid ResourceGroup {}: {}", clientAppId(), rgName, e);
+ throw new RestException(e);
+ }
+ }
+
+ internalSetPolicies("resource_group_name", rgName);
+ }
+
+
+ private static final Logger log = LoggerFactory.getLogger(NamespacesBase.class);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceGroupsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceGroupsBase.java
new file mode 100644
index 0000000..351f521
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceGroupsBase.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.impl;
+
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.RESOURCEGROUPS;
+import java.util.Iterator;
+import java.util.List;
+import javax.ws.rs.core.Response;
+import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.ResourceGroup;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class ResourceGroupsBase extends AdminResource {
+ protected List<String> internalGetResourceGroups() {
+ try {
+ validateSuperUserAccess();
+ return getListOfResourcegroups("abc");
+ } catch (KeeperException.NoNodeException e) {
+ log.warn("[{}] Failed to get ResourceGroups list ", clientAppId());
+ throw new RestException(Response.Status.NOT_FOUND, "Property does not exist");
+ } catch (Exception e) {
+ log.error("[{}] Failed to get ResourceGroups list: {}", clientAppId(), e);
+ throw new RestException(e);
+ }
+ }
+
+ protected ResourceGroup internalGetResourceGroup(String rgName) {
+ try {
+ final String resourceGroupPath = AdminResource.path(RESOURCEGROUPS, rgName);
+ ResourceGroup resourceGroup = resourceGroupResources().get(resourceGroupPath)
+ .orElseThrow(() -> new RestException(Response.Status.NOT_FOUND, "ResourceGroup does not exist"));
+ return resourceGroup;
+ } catch (RestException re) {
+ throw re;
+ } catch (Exception e) {
+ log.error("[{}] Failed to get ResourceGroup {}", clientAppId(), rgName, e);
+ throw new RestException(e);
+ }
+ }
+
+ protected void internalUpdateResourceGroup(String rgName, ResourceGroup rgConfig) {
+ final String resourceGroupPath = AdminResource.path(RESOURCEGROUPS, rgName);
+
+ try {
+ ResourceGroup resourceGroup = resourceGroupResources().get(resourceGroupPath).orElseThrow(() ->
+ new RestException(Response.Status.NOT_FOUND, "ResourceGroup does not exist"));
+
+ /*
+ * assuming read-modify-write
+ */
+ resourceGroup.setPublishRateInMsgs(rgConfig.getPublishRateInMsgs());
+ resourceGroup.setPublishRateInBytes(rgConfig.getPublishRateInBytes());
+ resourceGroup.setDispatchRateInMsgs(rgConfig.getDispatchRateInMsgs());
+ resourceGroup.setDispatchRateInBytes(rgConfig.getDispatchRateInBytes());
+
+ // write back the new ResourceGroup config.
+ resourceGroupResources().set(resourceGroupPath, r -> resourceGroup);
+ log.info("[{}] Successfully updated the ResourceGroup {}", clientAppId(), rgName);
+ } catch (RestException pfe) {
+ throw pfe;
+ } catch (Exception e) {
+ log.error("[{}] Failed to update configuration for ResourceGroup {}", clientAppId(), rgName, e);
+ throw new RestException(e);
+ }
+ }
+
+ protected void internalCreateResourceGroup(String rgName, ResourceGroup rgConfig) {
+ final String resourceGroupPath = AdminResource.path(RESOURCEGROUPS, rgName);
+ try {
+ resourceGroupResources().create(resourceGroupPath, rgConfig);
+ log.info("[{}] Created ResourceGroup {}", clientAppId(), rgName);
+ } catch (MetadataStoreException.AlreadyExistsException e) {
+ log.warn("[{}] Failed to create ResourceGroup {} - already exists", clientAppId(), rgName);
+ throw new RestException(Response.Status.CONFLICT, "ResourceGroup already exists");
+ } catch (Exception e) {
+ log.error("[{}] Failed to create ResourceGroup {}", clientAppId(), rgName, e);
+ throw new RestException(e);
+ }
+
+ }
+ protected void internalCreateOrUpdateResourceGroup(String rgName, ResourceGroup rgConfig) {
+ try {
+ validateSuperUserAccess();
+ checkNotNull(rgConfig);
+ /*
+ * see if ResourceGroup exists and treat the request as a update if it does.
+ */
+ final String resourceGroupPath = AdminResource.path(RESOURCEGROUPS, rgName);
+ boolean rgExists = false;
+ try {
+ rgExists = resourceGroupResources().exists(resourceGroupPath);
+ } catch (Exception e) {
+ log.error("[{}] Failed to create/update ResourceGroup {}: {}", clientAppId(), rgName, e);
+ }
+
+ try {
+ if (rgExists) {
+ internalUpdateResourceGroup(rgName, rgConfig);
+ } else {
+ internalCreateResourceGroup(rgName, rgConfig);
+ }
+ } catch (Exception e) {
+ log.error("[{}] Failed to create/update ResourceGroup {}: {}", clientAppId(), rgName, e);
+ throw new RestException(e);
+ }
+ } catch (Exception e) {
+ log.error("[{}] Failed to create/update ResourceGroup {}: {}", clientAppId(), rgName, e);
+ throw new RestException(e);
+ }
+ }
+
+ protected boolean internalCheckRgInUse(String rgName) {
+ List<String> tenants;
+ try {
+ tenants = tenantResources().getChildren(path(POLICIES));
+ Iterator<String> tenantsIterator = tenants.iterator();
+ while (tenantsIterator.hasNext()) {
+ String tenant = tenantsIterator.next();
+ List<String> namespaces = getListOfNamespaces(tenant);
+ Iterator<String> namespaceIterator = namespaces.iterator();
+ while (namespaceIterator.hasNext()) {
+ String namespace = namespaceIterator.next();
+ Policies policies = getNamespacePolicies(NamespaceName.get(namespace));
+ if (null != policies && rgName.equals(policies.resource_group_name)) {
+ return true;
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error("[{}] Failed to get tenant/namespace list {}: {}", clientAppId(), rgName, e);
+ throw new RestException(e);
+ }
+ return false;
+ }
+
+ protected void internalDeleteResourceGroup(String rgName) {
+ /*
+ * need to walk the namespaces and make sure it is not in use
+ */
+ try {
+ /*
+ * walk the namespaces and make sure it is not in use.
+ */
+ if (internalCheckRgInUse(rgName)) {
+ throw new RestException(Response.Status.PRECONDITION_FAILED, "ResourceGroup is in use");
+ }
+ final String globalZkResourceGroupPath = path(RESOURCEGROUPS, rgName);
+ resourceGroupResources().delete(globalZkResourceGroupPath);
+ log.info("[{}] Deleted ResourceGroup {}", clientAppId(), rgName);
+ } catch (Exception e) {
+ log.error("[{}] Failed to delete ResourceGroup {}.", clientAppId(), rgName, e);
+ throw new RestException(e);
+ }
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(ResourceGroupsBase.class);
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 9afcb24..315ef22 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -1718,5 +1718,46 @@ public class Namespaces extends NamespacesBase {
validateNamespaceName(tenant, namespace);
internalRemoveMaxTopicsPerNamespace();
}
+
+ @GET
+ @Path("/{tenant}/{namespace}/resourcegroup")
+ @ApiOperation(value = "Get the resourcegroup attached to the namespace")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
+ public String getNamespaceResourceGroup(@PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace) {
+ validateNamespaceName(tenant, namespace);
+ validateNamespacePolicyOperation(NamespaceName.get(tenant, namespace), PolicyName.RESOURCEGROUP,
+ PolicyOperation.READ);
+
+ Policies policies = getNamespacePolicies(namespaceName);
+ return policies.resource_group_name;
+ }
+
+ @POST
+ @Path("/{tenant}/{namespace}/resourcegroup")
+ @ApiOperation(value = "Set resourcegroup for a namespace")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
+ @ApiResponse(code = 412, message = "Invalid resourcegroup") })
+ public void setNamespaceResourceGroup(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
+ @ApiParam(value = "Name of resourcegroup", required = true) String rgName) {
+ validateNamespaceName(tenant, namespace);
+ internalSetNamespaceResourceGroup(rgName);
+ }
+
+ @DELETE
+ @Path("/{tenant}/{namespace}/resourcegroup")
+ @ApiOperation(value = "Delete resourcegroup for a namespace")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
+ @ApiResponse(code = 412, message = "Invalid resourcegroup")})
+ public void removeNamespaceResourceGroup(@PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace) {
+ validateNamespaceName(tenant, namespace);
+ internalSetNamespaceResourceGroup(null);
+ }
+
+
private static final Logger log = LoggerFactory.getLogger(Namespaces.class);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ResourceGroups.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ResourceGroups.java
new file mode 100644
index 0000000..f89ea9d
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ResourceGroups.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.v2;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import java.util.List;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import org.apache.pulsar.broker.admin.impl.ResourceGroupsBase;
+import org.apache.pulsar.common.policies.data.ResourceGroup;
+
+@Path("/resourcegroups")
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
+@Api(value = "/resourcegroups", description = "ResourceGroups admin apis", tags = "resourcegroups")
+public class ResourceGroups extends ResourceGroupsBase {
+
+ @GET
+ @ApiOperation(value = "Get the list of all the resourcegroups.",
+ response = String.class, responseContainer = "Set")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission")})
+ public List<String> getResourceGroups() {
+ return internalGetResourceGroups();
+ }
+
+ @GET
+ @Path("/{resourcegroup}")
+ @ApiOperation(value = "Get the rate limiters specified for a resourcegroup.", response = ResourceGroup.class)
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "ResourceGroup doesn't exist")})
+ public ResourceGroup getResourceGroup(@PathParam("resourcegroup") String resourcegroup) {
+ return internalGetResourceGroup(resourcegroup);
+ }
+
+ @PUT
+ @Path("/{resourcegroup}")
+ @ApiOperation(value = "Creates a new resourcegroup with the specified rate limiters")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "cluster doesn't exist")})
+ public void createOrUpdateResourceGroup(@PathParam("resourcegroup") String name,
+ @ApiParam(value = "Rate limiters for the resourcegroup")
+ ResourceGroup resourcegroup) {
+ internalCreateOrUpdateResourceGroup(name, resourcegroup);
+ }
+
+ @DELETE
+ @Path("/{resourcegroup}")
+ @ApiOperation(value = "Delete a resourcegroup.")
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "ResourceGroup doesn't exist"),
+ @ApiResponse(code = 409, message = "ResourceGroup is in use")})
+ public void deleteResourceGroup(@PathParam("resourcegroup") String resourcegroup) {
+ internalDeleteResourceGroup(resourcegroup);
+ }
+}
+
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 32b6b98..835988e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -23,6 +23,7 @@ import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.broker.admin.AdminResource.POLICIES_READONLY_FLAG_PATH;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.RESOURCEGROUPS;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.BoundType;
import com.google.common.collect.Lists;
@@ -63,6 +64,7 @@ import org.apache.pulsar.broker.resources.LocalPoliciesResources;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.NamespaceResources.IsolationPolicyResources;
import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.broker.resources.ResourceGroupResources;
import org.apache.pulsar.broker.resources.TenantResources;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
@@ -887,6 +889,10 @@ public abstract class PulsarWebResource {
return pulsar().getPulsarResources().getNamespaceResources();
}
+ protected ResourceGroupResources resourceGroupResources() {
+ return pulsar().getPulsarResources().getResourcegroupResources();
+ }
+
protected LocalPoliciesResources getLocalPolicies() {
return pulsar().getPulsarResources().getLocalPolicies();
}
@@ -1074,6 +1080,24 @@ public abstract class PulsarWebResource {
return namespaces;
}
+ /**
+ * Get the list of resourcegroups.
+ *
+ * @return the list of resourcegroups
+ */
+
+ protected List<String> getListOfResourcegroups(String property) throws Exception {
+ List<String> resourcegroups = Lists.newArrayList();
+
+ for (String resourcegroup : resourceGroupResources().getChildren(path(RESOURCEGROUPS))) {
+ resourcegroups.add(resourcegroup);
+ }
+
+ resourcegroups.sort(null);
+ return resourcegroups;
+ }
+
+
public static void deleteRecursive(BaseResources resources, final String pathRoot) throws MetadataStoreException {
PathUtils.validatePath(pathRoot);
List<String> tree = listSubTreeBFS(resources, pathRoot);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ResourceGroupsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ResourceGroupsTest.java
new file mode 100644
index 0000000..7c240d5
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ResourceGroupsTest.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.broker.admin.v2.ResourceGroups;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.ResourceGroup;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.*;
+
+public class ResourceGroupsTest extends MockedPulsarServiceBaseTest {
+ private ResourceGroups resourcegroups;
+ private List<String> expectedRgNames = Lists.newArrayList();
+ private final String testCluster = "test";
+ private final String testTenant = "test-tenant";
+ private final String testNameSpace = "test-tenant/test-namespace";
+
+
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ resourcegroups = spy(new ResourceGroups());
+ resourcegroups.setServletContext(new MockServletContext());
+ resourcegroups.setPulsar(pulsar);
+ doReturn(mockZooKeeper).when(resourcegroups).localZk();
+ doReturn(false).when(resourcegroups).isRequestHttps();
+ doReturn("test").when(resourcegroups).clientAppId();
+ doReturn(null).when(resourcegroups).originalPrincipal();
+ doReturn(null).when(resourcegroups).clientAuthData();
+
+ prepareData();
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testCrudResourceGroups() throws Exception {
+ // create with null resourcegroup should fail.
+ try {
+ resourcegroups.createOrUpdateResourceGroup("test-resourcegroup-invalid", null);
+ fail("should have failed");
+ } catch (RestException e){
+ //Ok.
+ }
+
+ // create resourcegroup with default values
+ ResourceGroup testResourceGroupOne = new ResourceGroup();
+ resourcegroups.createOrUpdateResourceGroup("test-resourcegroup-one", testResourceGroupOne);
+ expectedRgNames.add("test-resourcegroup-one");
+
+ // create resourcegroup with non default values.
+ ResourceGroup testResourceGroupTwo = new ResourceGroup();
+ testResourceGroupTwo.setDispatchRateInBytes(10000);
+ testResourceGroupTwo.setDispatchRateInMsgs(100);
+ testResourceGroupTwo.setPublishRateInMsgs(100);
+ testResourceGroupTwo.setPublishRateInBytes(10000);
+
+ resourcegroups.createOrUpdateResourceGroup("test-resourcegroup-two", testResourceGroupTwo);
+ expectedRgNames.add("test-resourcegroup-two");
+
+ // null resourcegroup update should fail.
+ try {
+ resourcegroups.createOrUpdateResourceGroup("test-resourcegroup-one", null);
+ fail("should have failed");
+ } catch (RestException e){
+ //Ok.
+ }
+
+ // update with some real values
+ ResourceGroup testResourceGroupOneUpdate = new ResourceGroup();
+ testResourceGroupOneUpdate.setDispatchRateInMsgs(50);
+ testResourceGroupOneUpdate.setDispatchRateInBytes(5000);
+ testResourceGroupOneUpdate.setPublishRateInMsgs(10);
+ testResourceGroupOneUpdate.setPublishRateInBytes(1000);
+ resourcegroups.createOrUpdateResourceGroup("test-resourcegroup-one", testResourceGroupOneUpdate);
+
+ // get a non existent resourcegroup
+ try {
+ resourcegroups.getResourceGroup("test-resourcegroup-invalid");
+ fail("should have failed");
+ } catch (RestException e) {
+ //Ok
+ }
+
+ // get list of all resourcegroups
+ List<String> gotRgNames = resourcegroups.getResourceGroups();
+ assertEquals(gotRgNames.size(), expectedRgNames.size());
+ Collections.sort(gotRgNames);
+ Collections.sort(expectedRgNames);
+ assertEquals(gotRgNames, expectedRgNames);
+
+ // delete a non existent resourcegroup
+ try {
+ resourcegroups.deleteResourceGroup("test-resourcegroup-invalid");
+ fail("should have failed");
+ } catch (RestException e) {
+ //Ok
+ }
+
+ // delete the ResourceGroups we created.
+ Iterator<String> rg_Iterator = expectedRgNames.iterator();
+ while (rg_Iterator.hasNext()) {
+ resourcegroups.deleteResourceGroup(rg_Iterator.next());
+ }
+ }
+
+ @Test
+ public void testNamespaceResourceGroup() throws Exception {
+ // create resourcegroup with non default values.
+ ResourceGroup testResourceGroupTwo = new ResourceGroup();
+ testResourceGroupTwo.setDispatchRateInBytes(10000);
+ testResourceGroupTwo.setDispatchRateInMsgs(100);
+ testResourceGroupTwo.setPublishRateInMsgs(100);
+ testResourceGroupTwo.setPublishRateInBytes(10000);
+
+ resourcegroups.createOrUpdateResourceGroup("test-resourcegroup-three", testResourceGroupTwo);
+ admin.namespaces().createNamespace(testNameSpace);
+ // set invalid ResourceGroup in namespace
+ try {
+ admin.namespaces().setNamespaceResourceGroup(testNameSpace, "test-resourcegroup-invalid");
+ fail("should have failed");
+ } catch (Exception e){
+ //Ok.
+ }
+ // set resourcegroup in namespace
+ admin.namespaces().setNamespaceResourceGroup(testNameSpace, "test-resourcegroup-three");
+ // try deleting the resourcegroup, should fail
+ try {
+ resourcegroups.deleteResourceGroup("test-resourcegroup-three");
+ } catch (RestException e) {
+ //Ok
+ }
+ // remove resourcegroup from namespace
+ admin.namespaces().removeNamespaceResourceGroup(testNameSpace);
+ resourcegroups.deleteResourceGroup("test-resourcegroup-three");
+
+ }
+
+ private void prepareData() throws PulsarAdminException {
+ admin.clusters().createCluster(testCluster, new ClusterData(pulsar.getBrokerServiceUrl()));
+ admin.tenants().createTenant(testTenant,
+ new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet(testCluster)));
+ }
+
+}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 703269a..9d43fc6 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -3897,4 +3897,95 @@ public interface Namespaces {
* Unexpected error
*/
CompletableFuture<Void> removeMaxTopicsPerNamespaceAsync(String namespace);
+
+ /**
+ * Get the ResourceGroup for a namespace.
+ * <p/>
+ * Response example:
+ *
+ * <pre>
+ * <code>60</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ *
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Namespace does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ * @return
+ */
+ String getNamespaceResourceGroup(String namespace) throws PulsarAdminException;
+
+ /**
+ * Get the ResourceGroup for a namespace asynchronously.
+ * <p/>
+ * Response example:
+ *
+ * <pre>
+ * <code>60</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ */
+ CompletableFuture<String> getNamespaceResourceGroupAsync(String namespace);
+
+ /**
+ * Set the ResourceGroup for a namespace.
+ * <p/>
+ * Request example:
+ *
+ * <pre>
+ * <code>60</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ * @param resourcegroupname
+ * ResourceGroup name
+ *
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Namespace does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void setNamespaceResourceGroup(String namespace, String resourcegroupname) throws PulsarAdminException;
+
+ /**
+ * Set the ResourceGroup for a namespace asynchronously.
+ * <p/>
+ * Request example:
+ *
+ * <pre>
+ * <code>60</code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ * @param resourcegroupname
+ * TTL values for all messages for all topics in this namespace
+ */
+ CompletableFuture<Void> setNamespaceResourceGroupAsync(String namespace, String resourcegroupname);
+
+ /**
+ * Remove the ResourceGroup on a namespace.
+ * @param namespace
+ * @throws PulsarAdminException
+ */
+ void removeNamespaceResourceGroup(String namespace) throws PulsarAdminException;
+
+ /**
+ * Remove the ResourceGroup on a namespace asynchronously.
+ * @param namespace
+ * @return
+ */
+ CompletableFuture<Void> removeNamespaceResourceGroupAsync(String namespace);
+
+
}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
index 58b4fd2..ed5e13d 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
@@ -54,6 +54,11 @@ public interface PulsarAdmin extends Closeable {
Tenants tenants();
/**
+ * @return the resourcegroups managements object
+ */
+ ResourceGroups resourcegroups();
+
+ /**
*
* @deprecated since 2.0. See {@link #tenants()}
*/
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ResourceGroups.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ResourceGroups.java
new file mode 100644
index 0000000..bbed3db
--- /dev/null
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ResourceGroups.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.common.policies.data.ResourceGroup;
+
+/**
+ * Admin interface for ResourceGroups management.
+ */
+
+public interface ResourceGroups {
+
+ /**
+ * Get the list of resourcegroups.
+ * <p/>
+ * Get the list of all the resourcegroup.
+ * <p/>
+ * Response Example:
+ *
+ * <pre>
+ * <code>["resourcegroup1",
+ * "resourcegroup2",
+ * "resourcegroup3"]</code>
+ * </pre>
+ *
+ * @throws PulsarAdminException.NotAuthorizedException Don't have admin permission
+ * @throws PulsarAdminException Unexpected error
+ */
+ List<String> getResourceGroups() throws PulsarAdminException;
+
+ /**
+ * Get the list of resourcegroups asynchronously.
+ * <p/>
+ * Get the list of all the resourcegrops.
+ * <p/>
+ * Response Example:
+ *
+ * <pre>
+ * <code>["resourcegroup1",
+ * "resourcegroup2",
+ * "resourcegroup3"]</code>
+ * </pre>
+ */
+ CompletableFuture<List<String>> getResourceGroupsAsync();
+
+
+ /**
+ * Get configuration for a resourcegroup.
+ * <p/>
+ * Get configuration specified for a resourcegroup.
+ * <p/>
+ * Response Example:
+ *
+ * <pre>
+ * <code>
+ * "publishRateInMsgs" : "value",
+ * "PublishRateInBytes" : "value",
+ * "DispatchRateInMsgs" : "value",
+ * "DispatchRateInBytes" : "value"
+ * </code>
+ * </pre>
+ *
+ * @param resourcegroup String resourcegroup
+ * @throws PulsarAdminException.NotAuthorizedException You don't have admin permission
+ * @throws PulsarAdminException.NotFoundException Resourcegroup does not exist
+ * @throws PulsarAdminException Unexpected error
+ * @see ResourceGroup
+ * <p>
+ * *
+ */
+ ResourceGroup getResourceGroup(String resourcegroup) throws PulsarAdminException;
+
+ /**
+ * Get policies for a namespace asynchronously.
+ * <p/>
+ * Get cnfiguration specified for a resourcegroup.
+ * <p/>
+ * Response example:
+ *
+ * <pre>
+ * <code>
+ * "publishRateInMsgs" : "value",
+ * "PublishRateInBytes" : "value",
+ * "DispatchRateInMsgs" : "value",
+ * "DspatchRateInBytes" : "value"
+ * </code>
+ * </pre>
+ *
+ * @param resourcegroup Namespace name
+ * @see ResourceGroup
+ */
+ CompletableFuture<ResourceGroup> getResourceGroupAsync(String resourcegroup);
+
+ /**
+ * Create a new resourcegroup.
+ * <p/>
+ * Creates a new reourcegroup with the configuration specified.
+ *
+ * @param name resourcegroup name
+ * @param resourcegroup ResourceGroup configuration
+ * @throws PulsarAdminException.NotAuthorizedException You don't have admin permission
+ * @throws PulsarAdminException.ConflictException Resourcegroup already exists
+ * @throws PulsarAdminException Unexpected error
+ */
+ void createResourceGroup(String name, ResourceGroup resourcegroup) throws PulsarAdminException;
+
+ /**
+ * Create a new resourcegroup.
+ * <p/>
+ * Creates a new resourcegroup with the configuration specified.
+ *
+ * @param name resourcegroup name
+ * @param resourcegroup ResourceGroup configuration.
+ */
+ CompletableFuture<Void> createResourceGroupAsync(String name, ResourceGroup resourcegroup);
+
+ /**
+ * Update the configuration for a ResourceGroup.
+ * <p/>
+ * This operation requires Pulsar super-user privileges.
+ *
+ * @param name resourcegroup name
+ * @param resourcegroup resourcegroup configuration
+ *
+ * @throws PulsarAdminException.NotAuthorizedException
+ * Don't have admin permission
+ * @throws PulsarAdminException.NotFoundException
+ * ResourceGroup does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void updateResourceGroup(String name, ResourceGroup resourcegroup) throws PulsarAdminException;
+
+ /**
+ * Update the configuration for a ResourceGroup.
+ * <p/>
+ * This operation requires Pulsar super-user privileges.
+ *
+ * @param name resourcegroup name
+ * @param resourcegroup resourcegroup configuration
+ */
+ CompletableFuture<Void> updateResourceGroupAsync(String name, ResourceGroup resourcegroup);
+
+
+ /**
+ * Delete an existing resourcegroup.
+ * <p/>
+ * The resourcegroup needs to unused and not attached to any entity.
+ *
+ * @param resourcegroup Resourcegroup name
+ * @throws PulsarAdminException.NotAuthorizedException You don't have admin permission
+ * @throws PulsarAdminException.NotFoundException Resourcegroup does not exist
+ * @throws PulsarAdminException.ConflictException Resourcegroup is in use
+ * @throws PulsarAdminException Unexpected error
+ */
+ void deleteResourceGroup(String resourcegroup) throws PulsarAdminException;
+
+ /**
+ * Delete an existing resourcegroup.
+ * <p/>
+ * The resourcegroup needs to unused and not attached to any entity.
+ *
+ * @param resourcegroup Resourcegroup name
+ */
+
+ CompletableFuture<Void> deleteResourceGroupAsync(String resourcegroup);
+
+}
\ No newline at end of file
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index e110645..da0738a 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -3365,6 +3365,87 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
return asyncDeleteRequest(path);
}
+
+ @Override
+ public String getNamespaceResourceGroup(String namespace) throws PulsarAdminException {
+ try {
+ return getNamespaceResourceGroupAsync(namespace).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<String> getNamespaceResourceGroupAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "resourcegroup");
+ final CompletableFuture<String> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<String>() {
+ @Override
+ public void completed(String rgName) {
+ future.complete(rgName);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public void setNamespaceResourceGroup(String namespace, String resourcegroupname) throws PulsarAdminException {
+ try {
+ setNamespaceResourceGroupAsync(namespace, resourcegroupname)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> setNamespaceResourceGroupAsync(String namespace, String resourcegroupname) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "resourcegroup");
+ return asyncPostRequest(path, Entity.entity(resourcegroupname, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
+ public void removeNamespaceResourceGroup(String namespace) throws PulsarAdminException {
+ try {
+ removeNamespaceResourceGroupAsync(namespace)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> removeNamespaceResourceGroupAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "resourcegroup");
+ return asyncDeleteRequest(path);
+ }
+
+
private WebTarget namespacePath(NamespaceName namespace, String... parts) {
final WebTarget base = namespace.isV2() ? adminV2Namespaces : adminNamespaces;
WebTarget namespacePath = base.path(namespace.toString());
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java
index f3ae151..8874902 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java
@@ -39,6 +39,7 @@ import org.apache.pulsar.client.admin.Packages;
import org.apache.pulsar.client.admin.Properties;
import org.apache.pulsar.client.admin.ProxyStats;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.ResourceGroups;
import org.apache.pulsar.client.admin.ResourceQuotas;
import org.apache.pulsar.client.admin.Schemas;
import org.apache.pulsar.client.admin.Sink;
@@ -81,6 +82,7 @@ public class PulsarAdminImpl implements PulsarAdmin {
private final BrokerStats brokerStats;
private final ProxyStats proxyStats;
private final Tenants tenants;
+ private final ResourceGroups resourcegroups;
private final Properties properties;
private final Namespaces namespaces;
private final Bookies bookies;
@@ -205,6 +207,7 @@ public class PulsarAdminImpl implements PulsarAdmin {
this.brokerStats = new BrokerStatsImpl(root, auth, readTimeoutMs);
this.proxyStats = new ProxyStatsImpl(root, auth, readTimeoutMs);
this.tenants = new TenantsImpl(root, auth, readTimeoutMs);
+ this.resourcegroups = new ResourceGroupsImpl(root, auth, readTimeoutMs);
this.properties = new TenantsImpl(root, auth, readTimeoutMs);
this.namespaces = new NamespacesImpl(root, auth, readTimeoutMs);
this.topics = new TopicsImpl(root, auth, readTimeoutMs);
@@ -306,6 +309,13 @@ public class PulsarAdminImpl implements PulsarAdmin {
}
/**
+ * @return the resourcegroups management object
+ */
+ public ResourceGroups resourcegroups() {
+ return resourcegroups;
+ }
+
+ /**
*
* @deprecated since 2.0. See {@link #tenants()}
*/
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceGroupsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceGroupsImpl.java
new file mode 100644
index 0000000..b099087
--- /dev/null
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceGroupsImpl.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin.internal;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.InvocationCallback;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.MediaType;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.ResourceGroups;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.common.policies.data.ResourceGroup;
+
+
+public class ResourceGroupsImpl extends BaseResource implements ResourceGroups {
+ private final WebTarget adminResourceGroups;
+
+ public ResourceGroupsImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
+ super(auth, readTimeoutMs);
+ adminResourceGroups = web.path("/admin/v2/resourcegroups");
+ }
+
+ @Override
+ public List<String> getResourceGroups() throws PulsarAdminException {
+ try {
+ return getResourceGroupsAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<List<String>> getResourceGroupsAsync() {
+ final CompletableFuture<List<String>> future = new CompletableFuture<>();
+ asyncGetRequest(adminResourceGroups,
+ new InvocationCallback<List<String>>() {
+ @Override
+ public void completed(List<String> resourcegroups) {
+ future.complete(resourcegroups);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public ResourceGroup getResourceGroup(String resourcegroup) throws PulsarAdminException {
+ try {
+ return getResourceGroupAsync(resourcegroup).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<ResourceGroup> getResourceGroupAsync(String name) {
+ WebTarget path = adminResourceGroups.path(name);
+ final CompletableFuture<ResourceGroup> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<ResourceGroup>() {
+ @Override
+ public void completed(ResourceGroup resourcegroup) {
+ future.complete(resourcegroup);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public void createResourceGroup(String name, ResourceGroup resourcegroup) throws PulsarAdminException {
+ try {
+ createResourceGroupAsync(name, resourcegroup)
+ .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> createResourceGroupAsync(String name, ResourceGroup resourcegroup) {
+ WebTarget path = adminResourceGroups.path(name);
+ return asyncPutRequest(path, Entity.entity(resourcegroup, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
+ public void updateResourceGroup(String name, ResourceGroup resourcegroup) throws PulsarAdminException {
+ try {
+ updateResourceGroupAsync(name, resourcegroup).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> updateResourceGroupAsync(String name, ResourceGroup resourcegroup) {
+ WebTarget path = adminResourceGroups.path(name);
+ return asyncPutRequest(path, Entity.entity(resourcegroup, MediaType.APPLICATION_JSON));
+ }
+
+ @Override
+ public void deleteResourceGroup(String name) throws PulsarAdminException {
+ try {
+ deleteResourceGroupAsync(name).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteResourceGroupAsync(String name) {
+ WebTarget path = adminResourceGroups.path(name);
+ return asyncDeleteRequest(path);
+ }
+}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 7a2e52d..0afe299 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -2086,6 +2086,45 @@ public class CmdNamespaces extends CmdBase {
}
}
+ @Parameters(commandDescription = "Get ResourceGroup for a namespace")
+ private class GetResourceGroup extends CliCommand {
+ @Parameter(description = "tenant/namespace\n", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ print(getAdmin().namespaces().getNamespaceResourceGroup(namespace));
+ }
+ }
+
+ @Parameters(commandDescription = "Set ResourceGroup for a namespace")
+ private class SetResourceGroup extends CliCommand {
+ @Parameter(description = "tenant/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "--resource-group-name", "-rgn" }, description = "ResourceGroup name", required = true)
+ private String rgName;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ getAdmin().namespaces().setNamespaceResourceGroup(namespace, rgName);
+ }
+ }
+
+ @Parameters(commandDescription = "Remove ResourceGroup from a namespace")
+ private class RemoveResourceGroup extends CliCommand {
+ @Parameter(description = "tenant/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ getAdmin().namespaces().removeNamespaceResourceGroup(namespace);
+ }
+ }
+
public CmdNamespaces(Supplier<PulsarAdmin> admin) {
super("namespaces", admin);
jcommander.addCommand("list", new GetNamespacesPerProperty());
@@ -2246,5 +2285,10 @@ public class CmdNamespaces extends CmdBase {
jcommander.addCommand("set-max-topics-per-namespace", new SetMaxTopicsPerNamespace());
jcommander.addCommand("get-max-topics-per-namespace", new GetMaxTopicsPerNamespace());
jcommander.addCommand("remove-max-topics-per-namespace", new RemoveMaxTopicsPerNamespace());
+
+ jcommander.addCommand("get-resource-group", new GetResourceGroup());
+ jcommander.addCommand("set-resource-group", new SetResourceGroup());
+ jcommander.addCommand("remove-resource-group", new RemoveResourceGroup());
+
}
}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdResourceGroups.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdResourceGroups.java
new file mode 100644
index 0000000..74ac764
--- /dev/null
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdResourceGroups.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.admin.cli;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.ResourceGroup;
+
+import java.util.function.Supplier;
+
+@Parameters(commandDescription = "Operations about ResourceGroups")
+public class CmdResourceGroups extends CmdBase {
+ @Parameters(commandDescription = "List the existing resourcegroups")
+ private class List extends CliCommand {
+ @Override
+ void run() throws PulsarAdminException {
+ print(getAdmin().resourcegroups().getResourceGroups());
+ }
+ }
+
+ @Parameters(commandDescription = "Gets the configuration of a resourcegroup")
+ private class Get extends CliCommand {
+ @Parameter(description = "resourcegroup-name", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String name = getOneArgument(params);
+ print(getAdmin().resourcegroups().getResourceGroup(name));
+ }
+ }
+ @Parameters(commandDescription = "Creates a new resourcegroup")
+ private class Create extends CliCommand {
+ @Parameter(description = "resourcegroup-name", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "--msg-publish-rate",
+ "-mp" }, description = "message-publish-rate (default -1 will be overwrite if not passed)\n", required = false)
+ private int publishRateInMsgs = -1;
+
+ @Parameter(names = { "--byte-publish-rate",
+ "-bp" }, description = "byte-publish-rate (default -1 will be overwrite if not passed)\n", required = false)
+ private long publishRateInBytes = -1;
+
+
+ @Parameter(names = { "--msg-dispatch-rate",
+ "-md" }, description = "message-dispatch-rate (default -1 will be overwrite if not passed)\n", required = false)
+ private int dispatchRateInMsgs = -1;
+
+ @Parameter(names = { "--byte-dispatch-rate",
+ "-bd" }, description = "byte-dispatch-rate (default -1 will be overwrite if not passed)\n", required = false)
+ private long dispatchRateInBytes = -1;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String name = getOneArgument(params);
+
+ ResourceGroup resourcegroup = new ResourceGroup();
+ resourcegroup.setDispatchRateInMsgs(dispatchRateInMsgs);
+ resourcegroup.setDispatchRateInBytes(dispatchRateInBytes);
+ resourcegroup.setPublishRateInMsgs(publishRateInMsgs);
+ resourcegroup.setPublishRateInBytes(publishRateInBytes);
+ getAdmin().resourcegroups().createResourceGroup(name, resourcegroup);
+ }
+ }
+
+ @Parameters(commandDescription = "Updates a resourcegroup")
+ private class Update extends CliCommand {
+ @Parameter(description = "resourcegroup-name", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "--msg-publish-rate",
+ "-mp" }, description = "message-publish-rate (default -1 will be overwrite if not passed)\n", required = false)
+ private int publishRateInMsgs = -1;
+
+ @Parameter(names = { "--byte-publish-rate",
+ "-bp" }, description = "byte-publish-rate (default -1 will be overwrite if not passed)\n", required = false)
+ private long publishRateInBytes = -1;
+
+
+ @Parameter(names = { "--msg-dispatch-rate",
+ "-md" }, description = "message-dispatch-rate (default -1 will be overwrite if not passed)\n", required = false)
+ private int dispatchRateInMsgs = -1;
+
+ @Parameter(names = { "--byte-dispatch-rate",
+ "-bd" }, description = "byte-dispatch-rate (default -1 will be overwrite if not passed)\n", required = false)
+ private long dispatchRateInBytes = -1;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String name = getOneArgument(params);
+
+ ResourceGroup resourcegroup = new ResourceGroup();
+ resourcegroup.setDispatchRateInMsgs(dispatchRateInMsgs);
+ resourcegroup.setDispatchRateInBytes(dispatchRateInBytes);
+ resourcegroup.setPublishRateInMsgs(publishRateInMsgs);
+ resourcegroup.setPublishRateInBytes(publishRateInBytes);
+
+ getAdmin().resourcegroups().updateResourceGroup(name, resourcegroup);
+ }
+ }
+
+ @Parameters(commandDescription = "Deletes an existing ResourceGroup")
+ private class Delete extends CliCommand {
+ @Parameter(description = "resourcegroup-name", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String name = getOneArgument(params);
+ getAdmin().resourcegroups().deleteResourceGroup(name);
+ }
+ }
+
+
+ public CmdResourceGroups(Supplier<PulsarAdmin> admin) {
+ super("resourcegroups", admin);
+ jcommander.addCommand("list", new CmdResourceGroups.List());
+ jcommander.addCommand("get", new CmdResourceGroups.Get());
+ jcommander.addCommand("create", new CmdResourceGroups.Create());
+ jcommander.addCommand("update", new CmdResourceGroups.Update());
+ jcommander.addCommand("delete", new CmdResourceGroups.Delete());
+ }
+
+
+}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
index d50f199..a62761b 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
@@ -126,6 +126,7 @@ public class PulsarAdminTool {
commandMap.put("brokers", CmdBrokers.class);
commandMap.put("broker-stats", CmdBrokerStats.class);
commandMap.put("tenants", CmdTenants.class);
+ commandMap.put("resourcegroups", CmdResourceGroups.class);
commandMap.put("properties", CmdTenants.CmdProperties.class); // deprecated, doesn't show in usage()
commandMap.put("namespaces", CmdNamespaces.class);
commandMap.put("topics", CmdTopics.class);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index 72d6f6f..c9ae529 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -124,6 +124,9 @@ public class Policies {
public Set<SubType> subscription_types_enabled = Sets.newHashSet();
+ @SuppressWarnings("checkstype:MemberName")
+ public String resource_group_name = null;
+
@Override
public int hashCode() {
return Objects.hash(auth_policies, replication_clusters,
@@ -145,7 +148,8 @@ public class Policies {
schema_compatibility_strategy,
is_allow_auto_update_schema,
offload_policies,
- subscription_types_enabled);
+ subscription_types_enabled,
+ resource_group_name);
}
@Override
@@ -187,7 +191,8 @@ public class Policies {
&& schema_compatibility_strategy == other.schema_compatibility_strategy
&& is_allow_auto_update_schema == other.is_allow_auto_update_schema
&& Objects.equals(offload_policies, other.offload_policies)
- && Objects.equals(subscription_types_enabled, other.subscription_types_enabled);
+ && Objects.equals(subscription_types_enabled, other.subscription_types_enabled)
+ && Objects.equals(resource_group_name, other.resource_group_name);
}
return false;
@@ -247,7 +252,8 @@ public class Policies {
.add("schema_compatibility_Strategy", schema_compatibility_strategy)
.add("is_allow_auto_update_Schema", is_allow_auto_update_schema)
.add("offload_policies", offload_policies)
- .add("subscription_types_enabled", subscription_types_enabled).toString();
+ .add("subscription_types_enabled", subscription_types_enabled)
+ .add("resource_group_name", resource_group_name).toString();
}
private static final long MAX_BUNDLES = ((long) 1) << 32;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
index 81cfb2a..7172d50 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
@@ -45,5 +45,6 @@ public enum PolicyName {
SUBSCRIPTION_AUTH_MODE,
ENCRYPTION,
TTL,
- MAX_TOPICS
+ MAX_TOPICS,
+ RESOURCEGROUP
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ResourceGroup.java
similarity index 63%
copy from pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ResourceGroup.java
index 81cfb2a..046c2d2 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ResourceGroup.java
@@ -16,34 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.pulsar.common.policies.data;
-/**
- * PolicyName authorization operations.
- */
-public enum PolicyName {
- ALL,
- ANTI_AFFINITY,
- BACKLOG,
- COMPACTION,
- DELAYED_DELIVERY,
- INACTIVE_TOPIC,
- DEDUPLICATION,
- MAX_CONSUMERS,
- MAX_PRODUCERS,
- DEDUPLICATION_SNAPSHOT,
- MAX_UNACKED,
- MAX_SUBSCRIPTIONS,
- OFFLOAD,
- PERSISTENCE,
- RATE,
- RETENTION,
- REPLICATION,
- REPLICATION_RATE,
- SCHEMA_COMPATIBILITY_STRATEGY,
- SUBSCRIPTION_AUTH_MODE,
- ENCRYPTION,
- TTL,
- MAX_TOPICS
+import com.google.common.base.MoreObjects;
+import lombok.Data;
+
+import java.util.Objects;
+
+@Data public class ResourceGroup {
+ private int publishRateInMsgs = -1;
+ private long publishRateInBytes = -1;
+ private int dispatchRateInMsgs = -1;
+ private long dispatchRateInBytes = -1;
}