You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/04/18 18:03:06 UTC

[pulsar] branch master updated: [PIP-82] [pulsar-broker] CRUD support for ResourceGroup (#10218)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 86e89ac  [PIP-82] [pulsar-broker] CRUD support for ResourceGroup (#10218)
86e89ac is described below

commit 86e89acee615dcc31ba6eebb38cf1749466daafa
Author: ravi-vaidyanathan <79...@users.noreply.github.com>
AuthorDate: Sun Apr 18 11:02:00 2021 -0700

    [PIP-82] [pulsar-broker] CRUD support for ResourceGroup (#10218)
    
    * CRUD support for ResourceGroup
    
    * remove ZK cache, use lombok @data
    
    Co-authored-by: Ravi Vaidyanathan <rv...@splunk.com>
    Co-authored-by: Matteo Merli <mm...@apache.org>
---
 .../broker/cache/ConfigurationCacheService.java    |   2 +
 .../pulsar/broker/resources/PulsarResources.java   |   2 +
 .../broker/resources/ResourceGroupResources.java   |  35 +---
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  24 ++-
 .../broker/admin/impl/ResourceGroupsBase.java      | 180 ++++++++++++++++++++
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  41 +++++
 .../pulsar/broker/admin/v2/ResourceGroups.java     |  82 +++++++++
 .../pulsar/broker/web/PulsarWebResource.java       |  24 +++
 .../pulsar/broker/admin/ResourceGroupsTest.java    | 181 ++++++++++++++++++++
 .../org/apache/pulsar/client/admin/Namespaces.java |  91 ++++++++++
 .../apache/pulsar/client/admin/PulsarAdmin.java    |   5 +
 .../apache/pulsar/client/admin/ResourceGroups.java | 186 +++++++++++++++++++++
 .../client/admin/internal/NamespacesImpl.java      |  81 +++++++++
 .../client/admin/internal/PulsarAdminImpl.java     |  10 ++
 .../client/admin/internal/ResourceGroupsImpl.java  | 169 +++++++++++++++++++
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java |  44 +++++
 .../apache/pulsar/admin/cli/CmdResourceGroups.java | 145 ++++++++++++++++
 .../apache/pulsar/admin/cli/PulsarAdminTool.java   |   1 +
 .../pulsar/common/policies/data/Policies.java      |  12 +-
 .../pulsar/common/policies/data/PolicyName.java    |   3 +-
 .../data/{PolicyName.java => ResourceGroup.java}   |  38 ++---
 21 files changed, 1295 insertions(+), 61 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java
index 33b30f2..fa29bdf 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java
@@ -28,6 +28,7 @@ import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.FailureDomain;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.ResourceGroup;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -65,6 +66,7 @@ public class ConfigurationCacheService {
     private PulsarResources pulsarResources;
 
     public static final String POLICIES = "policies";
+    public static final String RESOURCEGROUPS = "resourcegroups";
     public static final String FAILURE_DOMAIN = "failureDomain";
     public final String CLUSTER_FAILURE_DOMAIN_ROOT;
     public static final String POLICIES_ROOT = "/admin/policies";
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
index 48db47f..fa4853a 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
@@ -33,6 +33,7 @@ public class PulsarResources {
     public static final int DEFAULT_OPERATION_TIMEOUT_SEC = 30;
     private TenantResources tenantResources;
     private ClusterResources clusterResources;
+    private ResourceGroupResources resourcegroupResources;
     private NamespaceResources namespaceResources;
     private DynamicConfigurationResources dynamicConfigResources;
     private LocalPoliciesResources localPolicies;
@@ -49,6 +50,7 @@ public class PulsarResources {
             tenantResources = new TenantResources(configurationMetadataStore, operationTimeoutSec);
             clusterResources = new ClusterResources(configurationMetadataStore, operationTimeoutSec);
             namespaceResources = new NamespaceResources(configurationMetadataStore, operationTimeoutSec);
+            resourcegroupResources = new ResourceGroupResources(configurationMetadataStore, operationTimeoutSec);
         }
         if (localMetadataStore != null) {
             dynamicConfigResources = new DynamicConfigurationResources(localMetadataStore, operationTimeoutSec);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ResourceGroupResources.java
similarity index 60%
copy from pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
copy to pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ResourceGroupResources.java
index 81cfb2a..7d6bf6f 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ResourceGroupResources.java
@@ -16,34 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.pulsar.broker.resources;
 
-package org.apache.pulsar.common.policies.data;
+import org.apache.pulsar.common.policies.data.ResourceGroup;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 
-/**
- * PolicyName authorization operations.
- */
-public enum PolicyName {
-    ALL,
-    ANTI_AFFINITY,
-    BACKLOG,
-    COMPACTION,
-    DELAYED_DELIVERY,
-    INACTIVE_TOPIC,
-    DEDUPLICATION,
-    MAX_CONSUMERS,
-    MAX_PRODUCERS,
-    DEDUPLICATION_SNAPSHOT,
-    MAX_UNACKED,
-    MAX_SUBSCRIPTIONS,
-    OFFLOAD,
-    PERSISTENCE,
-    RATE,
-    RETENTION,
-    REPLICATION,
-    REPLICATION_RATE,
-    SCHEMA_COMPATIBILITY_STRATEGY,
-    SUBSCRIPTION_AUTH_MODE,
-    ENCRYPTION,
-    TTL,
-    MAX_TOPICS
+public class ResourceGroupResources extends BaseResources<ResourceGroup> {
+    public ResourceGroupResources(MetadataStoreExtended store, int operationTimeoutSec) {
+        super(store, ResourceGroup.class, operationTimeoutSec);
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 0b53838..e33ddfa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.admin.impl;
 
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.RESOURCEGROUPS;
 import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
 import static org.apache.pulsar.common.policies.data.Policies.getBundles;
 import com.google.common.collect.Lists;
@@ -2565,5 +2566,26 @@ public abstract class NamespacesBase extends AdminResource {
        }
    }
 
-   private static final Logger log = LoggerFactory.getLogger(NamespacesBase.class);
+    protected void internalSetNamespaceResourceGroup(String rgName) {
+        validateNamespacePolicyOperation(namespaceName, PolicyName.RESOURCEGROUP, PolicyOperation.WRITE);
+        validatePoliciesReadOnlyAccess();
+
+        if (rgName != null) {
+            final String resourceGroupPath = AdminResource.path(RESOURCEGROUPS, rgName);
+            // check resourcegroup exists.
+            try {
+                if (!resourceGroupResources().exists(resourceGroupPath)) {
+                    throw new RestException(Status.PRECONDITION_FAILED, "ResourceGroup does not exist");
+                }
+            } catch (Exception e) {
+                log.error("[{}] Invalid ResourceGroup {}: {}", clientAppId(), rgName, e);
+                throw new RestException(e);
+            }
+        }
+
+        internalSetPolicies("resource_group_name", rgName);
+    }
+
+
+    private static final Logger log = LoggerFactory.getLogger(NamespacesBase.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceGroupsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceGroupsBase.java
new file mode 100644
index 0000000..351f521
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceGroupsBase.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.impl;
+
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.RESOURCEGROUPS;
+import java.util.Iterator;
+import java.util.List;
+import javax.ws.rs.core.Response;
+import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.ResourceGroup;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class ResourceGroupsBase extends AdminResource {
+    protected List<String> internalGetResourceGroups() {
+        try {
+            validateSuperUserAccess();
+            return getListOfResourcegroups("abc");
+        } catch (KeeperException.NoNodeException e) {
+            log.warn("[{}] Failed to get ResourceGroups list ", clientAppId());
+            throw new RestException(Response.Status.NOT_FOUND, "Property does not exist");
+        } catch (Exception e) {
+            log.error("[{}] Failed to get ResourceGroups list: {}", clientAppId(), e);
+            throw new RestException(e);
+        }
+    }
+
+    protected ResourceGroup internalGetResourceGroup(String rgName) {
+        try {
+            final String resourceGroupPath = AdminResource.path(RESOURCEGROUPS, rgName);
+            ResourceGroup resourceGroup = resourceGroupResources().get(resourceGroupPath)
+                    .orElseThrow(() -> new RestException(Response.Status.NOT_FOUND, "ResourceGroup does not exist"));
+            return resourceGroup;
+        } catch (RestException re) {
+            throw re;
+        } catch (Exception e) {
+            log.error("[{}] Failed to get ResourceGroup  {}", clientAppId(), rgName, e);
+            throw new RestException(e);
+        }
+    }
+
+    protected void internalUpdateResourceGroup(String rgName, ResourceGroup rgConfig) {
+        final String resourceGroupPath = AdminResource.path(RESOURCEGROUPS, rgName);
+
+        try {
+            ResourceGroup resourceGroup = resourceGroupResources().get(resourceGroupPath).orElseThrow(() ->
+                    new RestException(Response.Status.NOT_FOUND, "ResourceGroup does not exist"));
+
+            /*
+             * assuming read-modify-write
+             */
+            resourceGroup.setPublishRateInMsgs(rgConfig.getPublishRateInMsgs());
+            resourceGroup.setPublishRateInBytes(rgConfig.getPublishRateInBytes());
+            resourceGroup.setDispatchRateInMsgs(rgConfig.getDispatchRateInMsgs());
+            resourceGroup.setDispatchRateInBytes(rgConfig.getDispatchRateInBytes());
+
+            // write back the new ResourceGroup config.
+            resourceGroupResources().set(resourceGroupPath, r -> resourceGroup);
+            log.info("[{}] Successfully updated the ResourceGroup {}", clientAppId(), rgName);
+        } catch (RestException pfe) {
+            throw pfe;
+        } catch (Exception e) {
+            log.error("[{}] Failed to update configuration for ResourceGroup {}", clientAppId(), rgName, e);
+            throw new RestException(e);
+        }
+    }
+
+    protected void internalCreateResourceGroup(String rgName, ResourceGroup rgConfig) {
+        final String resourceGroupPath = AdminResource.path(RESOURCEGROUPS, rgName);
+        try {
+            resourceGroupResources().create(resourceGroupPath, rgConfig);
+            log.info("[{}] Created ResourceGroup {}", clientAppId(), rgName);
+        } catch (MetadataStoreException.AlreadyExistsException e) {
+            log.warn("[{}] Failed to create ResourceGroup {} - already exists", clientAppId(), rgName);
+            throw new RestException(Response.Status.CONFLICT, "ResourceGroup already exists");
+        } catch (Exception e) {
+            log.error("[{}] Failed to create ResourceGroup {}", clientAppId(), rgName, e);
+            throw new RestException(e);
+        }
+
+    }
+    protected void internalCreateOrUpdateResourceGroup(String rgName, ResourceGroup rgConfig) {
+        try {
+            validateSuperUserAccess();
+            checkNotNull(rgConfig);
+            /*
+             * see if ResourceGroup exists and treat the request as a update if it does.
+             */
+            final String resourceGroupPath = AdminResource.path(RESOURCEGROUPS, rgName);
+            boolean rgExists = false;
+            try {
+                rgExists = resourceGroupResources().exists(resourceGroupPath);
+            } catch (Exception e) {
+                log.error("[{}] Failed to create/update ResourceGroup {}: {}", clientAppId(), rgName, e);
+            }
+
+            try {
+                if (rgExists) {
+                    internalUpdateResourceGroup(rgName, rgConfig);
+                } else {
+                    internalCreateResourceGroup(rgName, rgConfig);
+                }
+            } catch (Exception e) {
+                log.error("[{}] Failed to create/update ResourceGroup {}: {}", clientAppId(), rgName, e);
+                throw new RestException(e);
+            }
+        } catch (Exception e) {
+            log.error("[{}] Failed to create/update ResourceGroup {}: {}", clientAppId(), rgName, e);
+            throw new RestException(e);
+        }
+    }
+
+    protected boolean internalCheckRgInUse(String rgName) {
+        List<String> tenants;
+        try {
+            tenants = tenantResources().getChildren(path(POLICIES));
+            Iterator<String> tenantsIterator = tenants.iterator();
+            while (tenantsIterator.hasNext()) {
+                String tenant = tenantsIterator.next();
+                List<String> namespaces = getListOfNamespaces(tenant);
+                Iterator<String> namespaceIterator = namespaces.iterator();
+                while (namespaceIterator.hasNext()) {
+                    String namespace = namespaceIterator.next();
+                    Policies policies = getNamespacePolicies(NamespaceName.get(namespace));
+                    if (null != policies && rgName.equals(policies.resource_group_name)) {
+                        return true;
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.error("[{}] Failed to get tenant/namespace list {}: {}", clientAppId(), rgName, e);
+            throw new RestException(e);
+        }
+        return false;
+    }
+
+    protected void internalDeleteResourceGroup(String rgName) {
+        /*
+         * need to walk the namespaces and make sure it is not in use
+         */
+        try {
+            /*
+             * walk the namespaces and make sure it is not in use.
+             */
+            if (internalCheckRgInUse(rgName)) {
+                throw new RestException(Response.Status.PRECONDITION_FAILED, "ResourceGroup is in use");
+            }
+            final String globalZkResourceGroupPath = path(RESOURCEGROUPS, rgName);
+            resourceGroupResources().delete(globalZkResourceGroupPath);
+            log.info("[{}] Deleted ResourceGroup {}", clientAppId(), rgName);
+        } catch (Exception e) {
+            log.error("[{}] Failed to delete ResourceGroup {}.", clientAppId(), rgName, e);
+            throw new RestException(e);
+        }
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(ResourceGroupsBase.class);
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 9afcb24..315ef22 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -1718,5 +1718,46 @@ public class Namespaces extends NamespacesBase {
         validateNamespaceName(tenant, namespace);
         internalRemoveMaxTopicsPerNamespace();
     }
+
+    @GET
+    @Path("/{tenant}/{namespace}/resourcegroup")
+    @ApiOperation(value = "Get the resourcegroup attached to the namespace")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
+    public String getNamespaceResourceGroup(@PathParam("tenant") String tenant,
+                                      @PathParam("namespace") String namespace) {
+        validateNamespaceName(tenant, namespace);
+        validateNamespacePolicyOperation(NamespaceName.get(tenant, namespace), PolicyName.RESOURCEGROUP,
+                PolicyOperation.READ);
+
+        Policies policies = getNamespacePolicies(namespaceName);
+        return policies.resource_group_name;
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/resourcegroup")
+    @ApiOperation(value = "Set resourcegroup for a namespace")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
+            @ApiResponse(code = 412, message = "Invalid resourcegroup") })
+    public void setNamespaceResourceGroup(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
+                                          @ApiParam(value = "Name of resourcegroup", required = true) String rgName) {
+        validateNamespaceName(tenant, namespace);
+        internalSetNamespaceResourceGroup(rgName);
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/resourcegroup")
+    @ApiOperation(value = "Delete resourcegroup for a namespace")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
+            @ApiResponse(code = 412, message = "Invalid resourcegroup")})
+    public void removeNamespaceResourceGroup(@PathParam("tenant") String tenant,
+                                          @PathParam("namespace") String namespace) {
+        validateNamespaceName(tenant, namespace);
+        internalSetNamespaceResourceGroup(null);
+    }
+
+
     private static final Logger log = LoggerFactory.getLogger(Namespaces.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ResourceGroups.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ResourceGroups.java
new file mode 100644
index 0000000..f89ea9d
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ResourceGroups.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.v2;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import java.util.List;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import org.apache.pulsar.broker.admin.impl.ResourceGroupsBase;
+import org.apache.pulsar.common.policies.data.ResourceGroup;
+
+@Path("/resourcegroups")
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
+@Api(value = "/resourcegroups", description = "ResourceGroups admin apis", tags = "resourcegroups")
+public class ResourceGroups extends ResourceGroupsBase {
+
+    @GET
+    @ApiOperation(value = "Get the list of all the resourcegroups.",
+            response = String.class, responseContainer = "Set")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission")})
+    public List<String> getResourceGroups() {
+        return internalGetResourceGroups();
+    }
+
+    @GET
+    @Path("/{resourcegroup}")
+    @ApiOperation(value = "Get the rate limiters specified for a resourcegroup.", response = ResourceGroup.class)
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "ResourceGroup doesn't exist")})
+    public ResourceGroup getResourceGroup(@PathParam("resourcegroup") String resourcegroup) {
+        return internalGetResourceGroup(resourcegroup);
+    }
+
+    @PUT
+    @Path("/{resourcegroup}")
+    @ApiOperation(value = "Creates a new resourcegroup with the specified rate limiters")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "cluster doesn't exist")})
+    public void createOrUpdateResourceGroup(@PathParam("resourcegroup") String name,
+                                    @ApiParam(value = "Rate limiters for the resourcegroup")
+                                            ResourceGroup resourcegroup) {
+        internalCreateOrUpdateResourceGroup(name, resourcegroup);
+    }
+
+    @DELETE
+    @Path("/{resourcegroup}")
+    @ApiOperation(value = "Delete a resourcegroup.")
+    @ApiResponses(value = {
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "ResourceGroup doesn't exist"),
+            @ApiResponse(code = 409, message = "ResourceGroup is in use")})
+    public void deleteResourceGroup(@PathParam("resourcegroup") String resourcegroup) {
+        internalDeleteResourceGroup(resourcegroup);
+    }
+}
+
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 32b6b98..835988e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -23,6 +23,7 @@ import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.pulsar.broker.admin.AdminResource.POLICIES_READONLY_FLAG_PATH;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+import static org.apache.pulsar.broker.cache.ConfigurationCacheService.RESOURCEGROUPS;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.BoundType;
 import com.google.common.collect.Lists;
@@ -63,6 +64,7 @@ import org.apache.pulsar.broker.resources.LocalPoliciesResources;
 import org.apache.pulsar.broker.resources.NamespaceResources;
 import org.apache.pulsar.broker.resources.NamespaceResources.IsolationPolicyResources;
 import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.broker.resources.ResourceGroupResources;
 import org.apache.pulsar.broker.resources.TenantResources;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
@@ -887,6 +889,10 @@ public abstract class PulsarWebResource {
         return pulsar().getPulsarResources().getNamespaceResources();
     }
 
+    protected ResourceGroupResources resourceGroupResources() {
+        return pulsar().getPulsarResources().getResourcegroupResources();
+    }
+
     protected LocalPoliciesResources getLocalPolicies() {
         return pulsar().getPulsarResources().getLocalPolicies();
     }
@@ -1074,6 +1080,24 @@ public abstract class PulsarWebResource {
         return namespaces;
     }
 
+    /**
+     * Get the list of resourcegroups.
+     *
+     * @return the list of resourcegroups
+     */
+
+    protected List<String> getListOfResourcegroups(String property) throws Exception {
+        List<String> resourcegroups = Lists.newArrayList();
+
+        for (String resourcegroup : resourceGroupResources().getChildren(path(RESOURCEGROUPS))) {
+            resourcegroups.add(resourcegroup);
+        }
+
+        resourcegroups.sort(null);
+        return resourcegroups;
+    }
+
+
     public static void deleteRecursive(BaseResources resources, final String pathRoot) throws MetadataStoreException {
         PathUtils.validatePath(pathRoot);
         List<String> tree = listSubTreeBFS(resources, pathRoot);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ResourceGroupsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ResourceGroupsTest.java
new file mode 100644
index 0000000..7c240d5
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/ResourceGroupsTest.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.broker.admin.v2.ResourceGroups;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.ResourceGroup;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.*;
+
+public class ResourceGroupsTest extends MockedPulsarServiceBaseTest  {
+    private ResourceGroups resourcegroups;
+    private List<String> expectedRgNames = Lists.newArrayList();
+    private final String testCluster = "test";
+    private final String testTenant = "test-tenant";
+    private final String testNameSpace = "test-tenant/test-namespace";
+
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        resourcegroups = spy(new ResourceGroups());
+        resourcegroups.setServletContext(new MockServletContext());
+        resourcegroups.setPulsar(pulsar);
+        doReturn(mockZooKeeper).when(resourcegroups).localZk();
+        doReturn(false).when(resourcegroups).isRequestHttps();
+        doReturn("test").when(resourcegroups).clientAppId();
+        doReturn(null).when(resourcegroups).originalPrincipal();
+        doReturn(null).when(resourcegroups).clientAuthData();
+
+        prepareData();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testCrudResourceGroups() throws Exception {
+        // create with null resourcegroup should fail.
+        try {
+            resourcegroups.createOrUpdateResourceGroup("test-resourcegroup-invalid", null);
+            fail("should have failed");
+        } catch (RestException e){
+            //Ok.
+        }
+
+        // create resourcegroup with default values
+        ResourceGroup testResourceGroupOne = new ResourceGroup();
+        resourcegroups.createOrUpdateResourceGroup("test-resourcegroup-one", testResourceGroupOne);
+        expectedRgNames.add("test-resourcegroup-one");
+
+        // create resourcegroup with non default values.
+        ResourceGroup testResourceGroupTwo = new ResourceGroup();
+        testResourceGroupTwo.setDispatchRateInBytes(10000);
+        testResourceGroupTwo.setDispatchRateInMsgs(100);
+        testResourceGroupTwo.setPublishRateInMsgs(100);
+        testResourceGroupTwo.setPublishRateInBytes(10000);
+
+        resourcegroups.createOrUpdateResourceGroup("test-resourcegroup-two", testResourceGroupTwo);
+        expectedRgNames.add("test-resourcegroup-two");
+
+        // null resourcegroup update should fail.
+        try {
+            resourcegroups.createOrUpdateResourceGroup("test-resourcegroup-one", null);
+            fail("should have failed");
+        } catch (RestException e){
+            //Ok.
+        }
+
+        // update with some real values
+        ResourceGroup testResourceGroupOneUpdate = new ResourceGroup();
+        testResourceGroupOneUpdate.setDispatchRateInMsgs(50);
+        testResourceGroupOneUpdate.setDispatchRateInBytes(5000);
+        testResourceGroupOneUpdate.setPublishRateInMsgs(10);
+        testResourceGroupOneUpdate.setPublishRateInBytes(1000);
+        resourcegroups.createOrUpdateResourceGroup("test-resourcegroup-one", testResourceGroupOneUpdate);
+
+        // get a non existent resourcegroup
+        try {
+            resourcegroups.getResourceGroup("test-resourcegroup-invalid");
+            fail("should have failed");
+        } catch (RestException e) {
+            //Ok
+        }
+
+        // get list of all resourcegroups
+        List<String> gotRgNames = resourcegroups.getResourceGroups();
+        assertEquals(gotRgNames.size(), expectedRgNames.size());
+        Collections.sort(gotRgNames);
+        Collections.sort(expectedRgNames);
+        assertEquals(gotRgNames, expectedRgNames);
+
+        // delete a non existent resourcegroup
+        try {
+            resourcegroups.deleteResourceGroup("test-resourcegroup-invalid");
+            fail("should have failed");
+        } catch (RestException e) {
+            //Ok
+        }
+
+        // delete the ResourceGroups we created.
+        Iterator<String> rg_Iterator = expectedRgNames.iterator();
+        while (rg_Iterator.hasNext()) {
+            resourcegroups.deleteResourceGroup(rg_Iterator.next());
+        }
+    }
+
+    @Test
+    public void testNamespaceResourceGroup() throws Exception {
+        // create resourcegroup with non default values.
+        ResourceGroup testResourceGroupTwo = new ResourceGroup();
+        testResourceGroupTwo.setDispatchRateInBytes(10000);
+        testResourceGroupTwo.setDispatchRateInMsgs(100);
+        testResourceGroupTwo.setPublishRateInMsgs(100);
+        testResourceGroupTwo.setPublishRateInBytes(10000);
+
+        resourcegroups.createOrUpdateResourceGroup("test-resourcegroup-three", testResourceGroupTwo);
+        admin.namespaces().createNamespace(testNameSpace);
+        // set invalid ResourceGroup in namespace
+        try {
+            admin.namespaces().setNamespaceResourceGroup(testNameSpace, "test-resourcegroup-invalid");
+            fail("should have failed");
+        } catch (Exception e){
+            //Ok.
+        }
+        // set resourcegroup in namespace
+        admin.namespaces().setNamespaceResourceGroup(testNameSpace, "test-resourcegroup-three");
+        // try deleting the resourcegroup, should fail
+        try {
+            resourcegroups.deleteResourceGroup("test-resourcegroup-three");
+        } catch (RestException e) {
+            //Ok
+        }
+        // remove resourcegroup from namespace
+        admin.namespaces().removeNamespaceResourceGroup(testNameSpace);
+        resourcegroups.deleteResourceGroup("test-resourcegroup-three");
+
+    }
+
+    private void prepareData() throws PulsarAdminException {
+        admin.clusters().createCluster(testCluster, new ClusterData(pulsar.getBrokerServiceUrl()));
+        admin.tenants().createTenant(testTenant,
+                new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet(testCluster)));
+    }
+
+}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 703269a..9d43fc6 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -3897,4 +3897,95 @@ public interface Namespaces {
      *              Unexpected error
      */
     CompletableFuture<Void> removeMaxTopicsPerNamespaceAsync(String namespace);
+
+    /**
+     * Get the ResourceGroup for a namespace.
+     * <p/>
+     * Response example:
+     *
+     * <pre>
+     * <code>60</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     * @return
+     */
+    String getNamespaceResourceGroup(String namespace) throws PulsarAdminException;
+
+    /**
+     * Get the ResourceGroup for a namespace asynchronously.
+     * <p/>
+     * Response example:
+     *
+     * <pre>
+     * <code>60</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     */
+    CompletableFuture<String> getNamespaceResourceGroupAsync(String namespace);
+
+    /**
+     * Set the ResourceGroup for a namespace.
+     * <p/>
+     * Request example:
+     *
+     * <pre>
+     * <code>60</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @param resourcegroupname
+     *            ResourceGroup name
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void setNamespaceResourceGroup(String namespace, String resourcegroupname) throws PulsarAdminException;
+
+    /**
+     * Set the ResourceGroup for a namespace asynchronously.
+     * <p/>
+     * Request example:
+     *
+     * <pre>
+     * <code>60</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @param resourcegroupname
+     *            TTL values for all messages for all topics in this namespace
+     */
+    CompletableFuture<Void> setNamespaceResourceGroupAsync(String namespace, String resourcegroupname);
+
+    /**
+     * Remove the ResourceGroup on  a namespace.
+     * @param namespace
+     * @throws PulsarAdminException
+     */
+    void removeNamespaceResourceGroup(String namespace) throws PulsarAdminException;
+
+    /**
+     * Remove the ResourceGroup on a namespace asynchronously.
+     * @param namespace
+     * @return
+     */
+    CompletableFuture<Void> removeNamespaceResourceGroupAsync(String namespace);
+
+
 }
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
index 58b4fd2..ed5e13d 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
@@ -54,6 +54,11 @@ public interface PulsarAdmin extends Closeable {
     Tenants tenants();
 
     /**
+     * @return the resourcegroups managements object
+     */
+    ResourceGroups resourcegroups();
+
+    /**
      *
      * @deprecated since 2.0. See {@link #tenants()}
      */
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ResourceGroups.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ResourceGroups.java
new file mode 100644
index 0000000..bbed3db
--- /dev/null
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ResourceGroups.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.common.policies.data.ResourceGroup;
+
+/**
+ * Admin interface for ResourceGroups management.
+ */
+
+public interface ResourceGroups {
+
+        /**
+         * Get the list of resourcegroups.
+         * <p/>
+         * Get the list of all the resourcegroup.
+         * <p/>
+         * Response Example:
+         *
+         * <pre>
+         * <code>["resourcegroup1",
+         *  "resourcegroup2",
+         *  "resourcegroup3"]</code>
+         * </pre>
+         *
+         * @throws PulsarAdminException.NotAuthorizedException Don't have admin permission
+         * @throws PulsarAdminException                        Unexpected error
+         */
+        List<String> getResourceGroups() throws PulsarAdminException;
+
+        /**
+         * Get the list of resourcegroups asynchronously.
+         * <p/>
+         * Get the list of all the resourcegrops.
+         * <p/>
+         * Response Example:
+         *
+         * <pre>
+         * <code>["resourcegroup1",
+         *  "resourcegroup2",
+         *  "resourcegroup3"]</code>
+         * </pre>
+         */
+        CompletableFuture<List<String>> getResourceGroupsAsync();
+
+
+        /**
+         * Get configuration for a resourcegroup.
+         * <p/>
+         * Get configuration specified for a resourcegroup.
+         * <p/>
+         * Response Example:
+         *
+         * <pre>
+         * <code>
+         *     "publishRateInMsgs" : "value",
+         *     "PublishRateInBytes" : "value",
+         *     "DispatchRateInMsgs" : "value",
+         *     "DispatchRateInBytes" : "value"
+         * </code>
+         * </pre>
+         *
+         * @param resourcegroup String resourcegroup
+         * @throws PulsarAdminException.NotAuthorizedException You don't have admin permission
+         * @throws PulsarAdminException.NotFoundException      Resourcegroup does not exist
+         * @throws PulsarAdminException                        Unexpected error
+         * @see ResourceGroup
+         * <p>
+         * *
+         */
+        ResourceGroup getResourceGroup(String resourcegroup) throws PulsarAdminException;
+
+        /**
+         * Get policies for a namespace asynchronously.
+         * <p/>
+         * Get cnfiguration specified for a resourcegroup.
+         * <p/>
+         * Response example:
+         *
+         * <pre>
+         * <code>
+         *     "publishRateInMsgs" : "value",
+         *     "PublishRateInBytes" : "value",
+         *     "DispatchRateInMsgs" : "value",
+         *     "DspatchRateInBytes" : "value"
+         * </code>
+         * </pre>
+         *
+         * @param resourcegroup Namespace name
+         * @see ResourceGroup
+         */
+        CompletableFuture<ResourceGroup> getResourceGroupAsync(String resourcegroup);
+
+        /**
+         * Create a new resourcegroup.
+         * <p/>
+         * Creates a new reourcegroup with the configuration specified.
+         *
+         * @param name       resourcegroup name
+         * @param resourcegroup ResourceGroup configuration
+         * @throws PulsarAdminException.NotAuthorizedException You don't have admin permission
+         * @throws PulsarAdminException.ConflictException      Resourcegroup already exists
+         * @throws PulsarAdminException                        Unexpected error
+         */
+        void createResourceGroup(String name, ResourceGroup resourcegroup) throws PulsarAdminException;
+
+        /**
+         * Create a new resourcegroup.
+         * <p/>
+         * Creates a new resourcegroup with the configuration specified.
+         *
+         * @param name           resourcegroup name
+         * @param resourcegroup ResourceGroup configuration.
+         */
+        CompletableFuture<Void> createResourceGroupAsync(String name, ResourceGroup resourcegroup);
+
+        /**
+         * Update the configuration for a ResourceGroup.
+         * <p/>
+         * This operation requires Pulsar super-user privileges.
+         *
+         * @param name          resourcegroup name
+         * @param resourcegroup resourcegroup configuration
+         *
+         * @throws PulsarAdminException.NotAuthorizedException
+         *             Don't have admin permission
+         * @throws PulsarAdminException.NotFoundException
+         *             ResourceGroup does not exist
+         * @throws PulsarAdminException
+         *             Unexpected error
+         */
+        void updateResourceGroup(String name, ResourceGroup resourcegroup) throws PulsarAdminException;
+
+        /**
+         * Update the configuration for a ResourceGroup.
+         * <p/>
+         * This operation requires Pulsar super-user privileges.
+         *
+         * @param name          resourcegroup name
+         * @param resourcegroup resourcegroup configuration
+         */
+        CompletableFuture<Void> updateResourceGroupAsync(String name, ResourceGroup resourcegroup);
+
+
+        /**
+         * Delete an existing resourcegroup.
+         * <p/>
+         * The resourcegroup needs to unused and not attached to any entity.
+         *
+         * @param resourcegroup Resourcegroup name
+         * @throws PulsarAdminException.NotAuthorizedException You don't have admin permission
+         * @throws PulsarAdminException.NotFoundException      Resourcegroup does not exist
+         * @throws PulsarAdminException.ConflictException      Resourcegroup is in use
+         * @throws PulsarAdminException                        Unexpected error
+         */
+        void deleteResourceGroup(String resourcegroup) throws PulsarAdminException;
+
+        /**
+         * Delete an existing resourcegroup.
+         * <p/>
+         * The resourcegroup needs to unused and not attached to any entity.
+         *
+         * @param resourcegroup Resourcegroup name
+         */
+
+        CompletableFuture<Void> deleteResourceGroupAsync(String resourcegroup);
+
+}
\ No newline at end of file
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index e110645..da0738a 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -3365,6 +3365,87 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
         return asyncDeleteRequest(path);
     }
 
+
+    @Override
+    public String getNamespaceResourceGroup(String namespace) throws PulsarAdminException {
+        try {
+            return getNamespaceResourceGroupAsync(namespace).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<String> getNamespaceResourceGroupAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "resourcegroup");
+        final CompletableFuture<String> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<String>() {
+                    @Override
+                    public void completed(String rgName) {
+                        future.complete(rgName);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
+    public void setNamespaceResourceGroup(String namespace, String resourcegroupname) throws PulsarAdminException {
+        try {
+            setNamespaceResourceGroupAsync(namespace, resourcegroupname)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> setNamespaceResourceGroupAsync(String namespace, String resourcegroupname) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "resourcegroup");
+        return asyncPostRequest(path, Entity.entity(resourcegroupname, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public void removeNamespaceResourceGroup(String namespace) throws PulsarAdminException {
+        try {
+            removeNamespaceResourceGroupAsync(namespace)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> removeNamespaceResourceGroupAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "resourcegroup");
+        return asyncDeleteRequest(path);
+    }
+
+
     private WebTarget namespacePath(NamespaceName namespace, String... parts) {
         final WebTarget base = namespace.isV2() ? adminV2Namespaces : adminNamespaces;
         WebTarget namespacePath = base.path(namespace.toString());
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java
index f3ae151..8874902 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java
@@ -39,6 +39,7 @@ import org.apache.pulsar.client.admin.Packages;
 import org.apache.pulsar.client.admin.Properties;
 import org.apache.pulsar.client.admin.ProxyStats;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.ResourceGroups;
 import org.apache.pulsar.client.admin.ResourceQuotas;
 import org.apache.pulsar.client.admin.Schemas;
 import org.apache.pulsar.client.admin.Sink;
@@ -81,6 +82,7 @@ public class PulsarAdminImpl implements PulsarAdmin {
     private final BrokerStats brokerStats;
     private final ProxyStats proxyStats;
     private final Tenants tenants;
+    private final ResourceGroups resourcegroups;
     private final Properties properties;
     private final Namespaces namespaces;
     private final Bookies bookies;
@@ -205,6 +207,7 @@ public class PulsarAdminImpl implements PulsarAdmin {
         this.brokerStats = new BrokerStatsImpl(root, auth, readTimeoutMs);
         this.proxyStats = new ProxyStatsImpl(root, auth, readTimeoutMs);
         this.tenants = new TenantsImpl(root, auth, readTimeoutMs);
+        this.resourcegroups = new ResourceGroupsImpl(root, auth, readTimeoutMs);
         this.properties = new TenantsImpl(root, auth, readTimeoutMs);
         this.namespaces = new NamespacesImpl(root, auth, readTimeoutMs);
         this.topics = new TopicsImpl(root, auth, readTimeoutMs);
@@ -306,6 +309,13 @@ public class PulsarAdminImpl implements PulsarAdmin {
     }
 
     /**
+     * @return the resourcegroups management object
+     */
+    public ResourceGroups resourcegroups() {
+        return resourcegroups;
+    }
+
+    /**
      *
      * @deprecated since 2.0. See {@link #tenants()}
      */
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceGroupsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceGroupsImpl.java
new file mode 100644
index 0000000..b099087
--- /dev/null
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ResourceGroupsImpl.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin.internal;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.InvocationCallback;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.MediaType;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.ResourceGroups;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.common.policies.data.ResourceGroup;
+
+
+public class ResourceGroupsImpl extends BaseResource implements ResourceGroups {
+    private final WebTarget adminResourceGroups;
+
+    public ResourceGroupsImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
+        super(auth, readTimeoutMs);
+        adminResourceGroups = web.path("/admin/v2/resourcegroups");
+    }
+
+    @Override
+    public List<String> getResourceGroups() throws PulsarAdminException {
+        try {
+            return getResourceGroupsAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<List<String>> getResourceGroupsAsync() {
+        final CompletableFuture<List<String>> future = new CompletableFuture<>();
+        asyncGetRequest(adminResourceGroups,
+                new InvocationCallback<List<String>>() {
+                    @Override
+                    public void completed(List<String> resourcegroups) {
+                        future.complete(resourcegroups);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
+    public ResourceGroup getResourceGroup(String resourcegroup) throws PulsarAdminException {
+        try {
+            return getResourceGroupAsync(resourcegroup).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<ResourceGroup> getResourceGroupAsync(String name) {
+        WebTarget path = adminResourceGroups.path(name);
+        final CompletableFuture<ResourceGroup> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<ResourceGroup>() {
+                    @Override
+                    public void completed(ResourceGroup resourcegroup) {
+                        future.complete(resourcegroup);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
+    public void createResourceGroup(String name, ResourceGroup resourcegroup) throws PulsarAdminException {
+        try {
+            createResourceGroupAsync(name, resourcegroup)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> createResourceGroupAsync(String name, ResourceGroup resourcegroup) {
+        WebTarget path = adminResourceGroups.path(name);
+        return asyncPutRequest(path, Entity.entity(resourcegroup, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public void updateResourceGroup(String name, ResourceGroup resourcegroup) throws PulsarAdminException {
+        try {
+            updateResourceGroupAsync(name, resourcegroup).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> updateResourceGroupAsync(String name, ResourceGroup resourcegroup) {
+        WebTarget path = adminResourceGroups.path(name);
+        return asyncPutRequest(path, Entity.entity(resourcegroup, MediaType.APPLICATION_JSON));
+    }
+
+    @Override
+    public void deleteResourceGroup(String name) throws PulsarAdminException {
+        try {
+            deleteResourceGroupAsync(name).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteResourceGroupAsync(String name) {
+        WebTarget path = adminResourceGroups.path(name);
+        return asyncDeleteRequest(path);
+    }
+}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 7a2e52d..0afe299 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -2086,6 +2086,45 @@ public class CmdNamespaces extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Get ResourceGroup for a namespace")
+    private class GetResourceGroup extends CliCommand {
+        @Parameter(description = "tenant/namespace\n", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            print(getAdmin().namespaces().getNamespaceResourceGroup(namespace));
+        }
+    }
+
+    @Parameters(commandDescription = "Set ResourceGroup for a namespace")
+    private class SetResourceGroup extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--resource-group-name", "-rgn" }, description = "ResourceGroup name", required = true)
+        private String rgName;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            getAdmin().namespaces().setNamespaceResourceGroup(namespace, rgName);
+        }
+    }
+
+    @Parameters(commandDescription = "Remove ResourceGroup from a namespace")
+    private class RemoveResourceGroup extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            getAdmin().namespaces().removeNamespaceResourceGroup(namespace);
+        }
+    }
+
     public CmdNamespaces(Supplier<PulsarAdmin> admin) {
         super("namespaces", admin);
         jcommander.addCommand("list", new GetNamespacesPerProperty());
@@ -2246,5 +2285,10 @@ public class CmdNamespaces extends CmdBase {
         jcommander.addCommand("set-max-topics-per-namespace", new SetMaxTopicsPerNamespace());
         jcommander.addCommand("get-max-topics-per-namespace", new GetMaxTopicsPerNamespace());
         jcommander.addCommand("remove-max-topics-per-namespace", new RemoveMaxTopicsPerNamespace());
+
+        jcommander.addCommand("get-resource-group", new GetResourceGroup());
+        jcommander.addCommand("set-resource-group", new SetResourceGroup());
+        jcommander.addCommand("remove-resource-group", new RemoveResourceGroup());
+
     }
 }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdResourceGroups.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdResourceGroups.java
new file mode 100644
index 0000000..74ac764
--- /dev/null
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdResourceGroups.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.admin.cli;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.ResourceGroup;
+
+import java.util.function.Supplier;
+
+@Parameters(commandDescription = "Operations about ResourceGroups")
+public class CmdResourceGroups extends CmdBase {
+    @Parameters(commandDescription = "List the existing resourcegroups")
+    private class List extends CliCommand {
+        @Override
+        void run() throws PulsarAdminException {
+            print(getAdmin().resourcegroups().getResourceGroups());
+        }
+    }
+
+    @Parameters(commandDescription = "Gets the configuration of a resourcegroup")
+    private class Get extends CliCommand {
+        @Parameter(description = "resourcegroup-name", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String name = getOneArgument(params);
+            print(getAdmin().resourcegroups().getResourceGroup(name));
+        }
+    }
+    @Parameters(commandDescription = "Creates a new resourcegroup")
+    private class Create extends CliCommand {
+        @Parameter(description = "resourcegroup-name", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--msg-publish-rate",
+                "-mp" }, description = "message-publish-rate (default -1 will be overwrite if not passed)\n", required = false)
+        private int publishRateInMsgs = -1;
+
+        @Parameter(names = { "--byte-publish-rate",
+                "-bp" }, description = "byte-publish-rate (default -1 will be overwrite if not passed)\n", required = false)
+        private long publishRateInBytes = -1;
+
+
+        @Parameter(names = { "--msg-dispatch-rate",
+                "-md" }, description = "message-dispatch-rate (default -1 will be overwrite if not passed)\n", required = false)
+        private int dispatchRateInMsgs = -1;
+
+        @Parameter(names = { "--byte-dispatch-rate",
+                "-bd" }, description = "byte-dispatch-rate (default -1 will be overwrite if not passed)\n", required = false)
+        private long dispatchRateInBytes = -1;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String name = getOneArgument(params);
+
+            ResourceGroup resourcegroup = new ResourceGroup();
+            resourcegroup.setDispatchRateInMsgs(dispatchRateInMsgs);
+            resourcegroup.setDispatchRateInBytes(dispatchRateInBytes);
+            resourcegroup.setPublishRateInMsgs(publishRateInMsgs);
+            resourcegroup.setPublishRateInBytes(publishRateInBytes);
+            getAdmin().resourcegroups().createResourceGroup(name, resourcegroup);
+        }
+    }
+
+    @Parameters(commandDescription = "Updates a resourcegroup")
+    private class Update extends CliCommand {
+        @Parameter(description = "resourcegroup-name", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--msg-publish-rate",
+                "-mp" }, description = "message-publish-rate (default -1 will be overwrite if not passed)\n", required = false)
+        private int publishRateInMsgs = -1;
+
+        @Parameter(names = { "--byte-publish-rate",
+                "-bp" }, description = "byte-publish-rate (default -1 will be overwrite if not passed)\n", required = false)
+        private long publishRateInBytes = -1;
+
+
+        @Parameter(names = { "--msg-dispatch-rate",
+                "-md" }, description = "message-dispatch-rate (default -1 will be overwrite if not passed)\n", required = false)
+        private int dispatchRateInMsgs = -1;
+
+        @Parameter(names = { "--byte-dispatch-rate",
+                "-bd" }, description = "byte-dispatch-rate (default -1 will be overwrite if not passed)\n", required = false)
+        private long dispatchRateInBytes = -1;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String name = getOneArgument(params);
+
+            ResourceGroup resourcegroup = new ResourceGroup();
+            resourcegroup.setDispatchRateInMsgs(dispatchRateInMsgs);
+            resourcegroup.setDispatchRateInBytes(dispatchRateInBytes);
+            resourcegroup.setPublishRateInMsgs(publishRateInMsgs);
+            resourcegroup.setPublishRateInBytes(publishRateInBytes);
+
+            getAdmin().resourcegroups().updateResourceGroup(name, resourcegroup);
+        }
+    }
+
+    @Parameters(commandDescription = "Deletes an existing ResourceGroup")
+    private class Delete extends CliCommand {
+        @Parameter(description = "resourcegroup-name", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String name = getOneArgument(params);
+            getAdmin().resourcegroups().deleteResourceGroup(name);
+        }
+    }
+
+
+    public CmdResourceGroups(Supplier<PulsarAdmin> admin) {
+        super("resourcegroups", admin);
+        jcommander.addCommand("list", new CmdResourceGroups.List());
+        jcommander.addCommand("get", new CmdResourceGroups.Get());
+        jcommander.addCommand("create", new CmdResourceGroups.Create());
+        jcommander.addCommand("update", new CmdResourceGroups.Update());
+        jcommander.addCommand("delete", new CmdResourceGroups.Delete());
+    }
+
+
+}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
index d50f199..a62761b 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
@@ -126,6 +126,7 @@ public class PulsarAdminTool {
         commandMap.put("brokers", CmdBrokers.class);
         commandMap.put("broker-stats", CmdBrokerStats.class);
         commandMap.put("tenants", CmdTenants.class);
+        commandMap.put("resourcegroups", CmdResourceGroups.class);
         commandMap.put("properties", CmdTenants.CmdProperties.class); // deprecated, doesn't show in usage()
         commandMap.put("namespaces", CmdNamespaces.class);
         commandMap.put("topics", CmdTopics.class);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index 72d6f6f..c9ae529 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -124,6 +124,9 @@ public class Policies {
 
     public Set<SubType> subscription_types_enabled = Sets.newHashSet();
 
+    @SuppressWarnings("checkstype:MemberName")
+    public String resource_group_name = null;
+
     @Override
     public int hashCode() {
         return Objects.hash(auth_policies, replication_clusters,
@@ -145,7 +148,8 @@ public class Policies {
                 schema_compatibility_strategy,
                 is_allow_auto_update_schema,
                 offload_policies,
-                subscription_types_enabled);
+                subscription_types_enabled,
+                resource_group_name);
     }
 
     @Override
@@ -187,7 +191,8 @@ public class Policies {
                     && schema_compatibility_strategy == other.schema_compatibility_strategy
                     && is_allow_auto_update_schema == other.is_allow_auto_update_schema
                     && Objects.equals(offload_policies, other.offload_policies)
-                    && Objects.equals(subscription_types_enabled, other.subscription_types_enabled);
+                    && Objects.equals(subscription_types_enabled, other.subscription_types_enabled)
+                    && Objects.equals(resource_group_name, other.resource_group_name);
         }
 
         return false;
@@ -247,7 +252,8 @@ public class Policies {
                 .add("schema_compatibility_Strategy", schema_compatibility_strategy)
                 .add("is_allow_auto_update_Schema", is_allow_auto_update_schema)
                 .add("offload_policies", offload_policies)
-                .add("subscription_types_enabled", subscription_types_enabled).toString();
+                .add("subscription_types_enabled", subscription_types_enabled)
+                .add("resource_group_name", resource_group_name).toString();
     }
 
     private static final long MAX_BUNDLES = ((long) 1) << 32;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
index 81cfb2a..7172d50 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
@@ -45,5 +45,6 @@ public enum PolicyName {
     SUBSCRIPTION_AUTH_MODE,
     ENCRYPTION,
     TTL,
-    MAX_TOPICS
+    MAX_TOPICS,
+    RESOURCEGROUP
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ResourceGroup.java
similarity index 63%
copy from pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ResourceGroup.java
index 81cfb2a..046c2d2 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PolicyName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ResourceGroup.java
@@ -16,34 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.pulsar.common.policies.data;
 
-/**
- * PolicyName authorization operations.
- */
-public enum PolicyName {
-    ALL,
-    ANTI_AFFINITY,
-    BACKLOG,
-    COMPACTION,
-    DELAYED_DELIVERY,
-    INACTIVE_TOPIC,
-    DEDUPLICATION,
-    MAX_CONSUMERS,
-    MAX_PRODUCERS,
-    DEDUPLICATION_SNAPSHOT,
-    MAX_UNACKED,
-    MAX_SUBSCRIPTIONS,
-    OFFLOAD,
-    PERSISTENCE,
-    RATE,
-    RETENTION,
-    REPLICATION,
-    REPLICATION_RATE,
-    SCHEMA_COMPATIBILITY_STRATEGY,
-    SUBSCRIPTION_AUTH_MODE,
-    ENCRYPTION,
-    TTL,
-    MAX_TOPICS
+import com.google.common.base.MoreObjects;
+import lombok.Data;
+
+import java.util.Objects;
+
+@Data public class ResourceGroup {
+    private int publishRateInMsgs = -1;
+    private long publishRateInBytes = -1;
+    private int dispatchRateInMsgs = -1;
+    private long dispatchRateInBytes = -1;
 }