You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by sp...@apache.org on 2022/10/11 05:37:32 UTC

[apisix] branch master updated: feat: add consumer group (#7980)

This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e2cc4f34 feat: add consumer group (#7980)
7e2cc4f34 is described below

commit 7e2cc4f347e8fcd002f5afc52120e573757d50ed
Author: jinhua luo <ho...@163.com>
AuthorDate: Tue Oct 11 13:37:24 2022 +0800

    feat: add consumer group (#7980)
---
 apisix/admin/consumer_group.lua              | 195 ++++++++++
 apisix/admin/consumers.lua                   |  16 +
 apisix/admin/init.lua                        |   1 +
 apisix/constants.lua                         |   1 +
 apisix/consumer.lua                          |   1 +
 apisix/{constants.lua => consumer_group.lua} |  56 +--
 apisix/core/ctx.lua                          |   1 +
 apisix/init.lua                              |  14 +
 apisix/plugin.lua                            |  33 +-
 apisix/schema_def.lua                        |  15 +
 docs/en/latest/admin-api.md                  |  31 ++
 docs/en/latest/apisix-variable.md            |   1 +
 docs/en/latest/config.json                   |   1 +
 docs/en/latest/terminology/consumer-group.md | 100 +++++
 docs/en/latest/terminology/plugin.md         |   2 +-
 docs/zh/latest/admin-api.md                  |  31 ++
 docs/zh/latest/apisix-variable.md            |   1 +
 t/admin/consumer-group.t                     | 549 +++++++++++++++++++++++++++
 t/config-center-yaml/consumer-group.t        | 140 +++++++
 t/node/consumer-group.t                      | 312 +++++++++++++++
 20 files changed, 1471 insertions(+), 30 deletions(-)

diff --git a/apisix/admin/consumer_group.lua b/apisix/admin/consumer_group.lua
new file mode 100644
index 000000000..6e1e41df4
--- /dev/null
+++ b/apisix/admin/consumer_group.lua
@@ -0,0 +1,195 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local consumers = require("apisix.consumer").consumers
+local utils = require("apisix.admin.utils")
+local schema_plugin = require("apisix.admin.plugins").check_schema
+local type = type
+local tostring = tostring
+local ipairs = ipairs
+
+
+local _M = {
+    need_v3_filter = true,
+}
+
+
+local function check_conf(id, conf, need_id)
+    if not conf then
+        return nil, {error_msg = "missing configurations"}
+    end
+
+    id = id or conf.id
+    if need_id and not id then
+        return nil, {error_msg = "missing id"}
+    end
+
+    if not need_id and id then
+        return nil, {error_msg = "wrong id, do not need it"}
+    end
+
+    if need_id and conf.id and tostring(conf.id) ~= tostring(id) then
+        return nil, {error_msg = "wrong id"}
+    end
+
+    conf.id = id
+
+    core.log.info("conf: ", core.json.delay_encode(conf))
+    local ok, err = core.schema.check(core.schema.consumer_group, conf)
+    if not ok then
+        return nil, {error_msg = "invalid configuration: " .. err}
+    end
+
+    local ok, err = schema_plugin(conf.plugins)
+    if not ok then
+        return nil, {error_msg = err}
+    end
+
+    return true
+end
+
+
+function _M.put(id, conf)
+    local ok, err = check_conf(id, conf, true)
+    if not ok then
+        return 400, err
+    end
+
+    local key = "/consumer_groups/" .. id
+
+    local ok, err = utils.inject_conf_with_prev_conf("consumer_group", key, conf)
+    if not ok then
+        return 503, {error_msg = err}
+    end
+
+    local res, err = core.etcd.set(key, conf)
+    if not res then
+        core.log.error("failed to put consumer group[", key, "]: ", err)
+        return 503, {error_msg = err}
+    end
+
+    return res.status, res.body
+end
+
+
+function _M.get(id)
+    local key = "/consumer_groups"
+    if id then
+        key = key .. "/" .. id
+    end
+    local res, err = core.etcd.get(key, not id)
+    if not res then
+        core.log.error("failed to get consumer group[", key, "]: ", err)
+        return 503, {error_msg = err}
+    end
+
+    utils.fix_count(res.body, id)
+    return res.status, res.body
+end
+
+
+function _M.delete(id)
+    if not id then
+        return 400, {error_msg = "missing consumer group id"}
+    end
+
+    local consumers, consumers_ver = consumers()
+    if consumers_ver and consumers then
+        for _, consumer in ipairs(consumers) do
+            if type(consumer) == "table" and consumer.value
+               and consumer.value.group_id
+               and tostring(consumer.value.group_id) == id then
+                return 400, {error_msg = "can not delete this consumer group,"
+                                         .. " consumer [" .. consumer.value.id
+                                         .. "] is still using it now"}
+            end
+        end
+    end
+
+    local key = "/consumer_groups/" .. id
+    local res, err = core.etcd.delete(key)
+    if not res then
+        core.log.error("failed to delete consumer group[", key, "]: ", err)
+        return 503, {error_msg = err}
+    end
+
+
+    return res.status, res.body
+end
+
+
+function _M.patch(id, conf, sub_path)
+    if not id then
+        return 400, {error_msg = "missing consumer group id"}
+    end
+
+    if not conf then
+        return 400, {error_msg = "missing new configuration"}
+    end
+
+    if not sub_path or sub_path == "" then
+        if type(conf) ~= "table"  then
+            return 400, {error_msg = "invalid configuration"}
+        end
+    end
+
+    local key = "/consumer_groups/" .. id
+    local res_old, err = core.etcd.get(key)
+    if not res_old then
+        core.log.error("failed to get consumer group [", key, "]: ", err)
+        return 503, {error_msg = err}
+    end
+
+    if res_old.status ~= 200 then
+        return res_old.status, res_old.body
+    end
+    core.log.info("key: ", key, " old value: ",
+                  core.json.delay_encode(res_old, true))
+
+    local node_value = res_old.body.node.value
+    local modified_index = res_old.body.node.modifiedIndex
+
+    if sub_path and sub_path ~= "" then
+        local code, err, node_val = core.table.patch(node_value, sub_path, conf)
+        node_value = node_val
+        if code then
+            return code, err
+        end
+        utils.inject_timestamp(node_value, nil, true)
+    else
+        node_value = core.table.merge(node_value, conf)
+        utils.inject_timestamp(node_value, nil, conf)
+    end
+
+    core.log.info("new conf: ", core.json.delay_encode(node_value, true))
+
+    local ok, err = check_conf(id, node_value, true)
+    if not ok then
+        return 400, err
+    end
+
+    local res, err = core.etcd.atomic_set(key, node_value, nil, modified_index)
+    if not res then
+        core.log.error("failed to set new consumer group[", key, "]: ", err)
+        return 503, {error_msg = err}
+    end
+
+    return res.status, res.body
+end
+
+
+return _M
diff --git a/apisix/admin/consumers.lua b/apisix/admin/consumers.lua
index 77416dbb0..7ab0ec1e2 100644
--- a/apisix/admin/consumers.lua
+++ b/apisix/admin/consumers.lua
@@ -62,6 +62,22 @@ local function check_conf(username, conf)
         end
     end
 
+    if conf.group_id then
+        local key = "/consumer_groups/" .. conf.group_id
+        local res, err = core.etcd.get(key)
+        if not res then
+            return nil, {error_msg = "failed to fetch consumer group info by "
+                                     .. "consumer group id [" .. conf.group_id .. "]: "
+                                     .. err}
+        end
+
+        if res.status ~= 200 then
+            return nil, {error_msg = "failed to fetch consumer group info by "
+                                     .. "consumer group id [" .. conf.group_id .. "], "
+                                     .. "response code: " .. res.status}
+        end
+    end
+
     return conf.username
 end
 
diff --git a/apisix/admin/init.lua b/apisix/admin/init.lua
index bdf19da38..5827f8397 100644
--- a/apisix/admin/init.lua
+++ b/apisix/admin/init.lua
@@ -54,6 +54,7 @@ local resources = {
     stream_routes   = require("apisix.admin.stream_routes"),
     plugin_metadata = require("apisix.admin.plugin_metadata"),
     plugin_configs  = require("apisix.admin.plugin_config"),
+    consumer_groups  = require("apisix.admin.consumer_group"),
 }
 
 
diff --git a/apisix/constants.lua b/apisix/constants.lua
index 1c82ec3d4..9f9b62fc3 100644
--- a/apisix/constants.lua
+++ b/apisix/constants.lua
@@ -32,6 +32,7 @@ return {
         ["/global_rules"] = true,
         ["/protos"] = true,
         ["/plugin_configs"] = true,
+        ["/consumer_groups"] = true,
     },
     STREAM_ETCD_DIRECTORY = {
         ["/upstreams"] = true,
diff --git a/apisix/consumer.lua b/apisix/consumer.lua
index 5e25b7521..2eaf67d4e 100644
--- a/apisix/consumer.lua
+++ b/apisix/consumer.lua
@@ -80,6 +80,7 @@ end
 function _M.attach_consumer(ctx, consumer, conf)
     ctx.consumer = consumer
     ctx.consumer_name = consumer.consumer_name
+    ctx.consumer_group_id = consumer.group_id
     ctx.consumer_ver = conf.conf_version
 end
 
diff --git a/apisix/constants.lua b/apisix/consumer_group.lua
similarity index 53%
copy from apisix/constants.lua
copy to apisix/consumer_group.lua
index 1c82ec3d4..8c17bdd71 100644
--- a/apisix/constants.lua
+++ b/apisix/consumer_group.lua
@@ -14,30 +14,34 @@
 -- See the License for the specific language governing permissions and
 -- limitations under the License.
 --
-return {
-    RPC_ERROR = 0,
-    RPC_PREPARE_CONF = 1,
-    RPC_HTTP_REQ_CALL = 2,
-    RPC_EXTRA_INFO = 3,
-    RPC_HTTP_RESP_CALL = 4,
-    HTTP_ETCD_DIRECTORY = {
-        ["/upstreams"] = true,
-        ["/plugins"] = true,
-        ["/ssls"] = true,
-        ["/stream_routes"] = true,
-        ["/plugin_metadata"] = true,
-        ["/routes"] = true,
-        ["/services"] = true,
-        ["/consumers"] = true,
-        ["/global_rules"] = true,
-        ["/protos"] = true,
-        ["/plugin_configs"] = true,
-    },
-    STREAM_ETCD_DIRECTORY = {
-        ["/upstreams"] = true,
-        ["/plugins"] = true,
-        ["/ssls"] = true,
-        ["/stream_routes"] = true,
-        ["/plugin_metadata"] = true,
-    },
+local core = require("apisix.core")
+local plugin_checker = require("apisix.plugin").plugin_checker
+local error = error
+
+
+local consumer_groups
+
+
+local _M = {
 }
+
+
+function _M.init_worker()
+    local err
+    consumer_groups, err = core.config.new("/consumer_groups", {
+        automatic = true,
+        item_schema = core.schema.consumer_group,
+        checker = plugin_checker,
+    })
+    if not consumer_groups then
+        error("failed to sync /consumer_groups: " .. err)
+    end
+end
+
+
+function _M.get(id)
+    return consumer_groups:get(id)
+end
+
+
+return _M
diff --git a/apisix/core/ctx.lua b/apisix/core/ctx.lua
index 5bf3daa57..9b589b6b3 100644
--- a/apisix/core/ctx.lua
+++ b/apisix/core/ctx.lua
@@ -206,6 +206,7 @@ do
     local apisix_var_names = {
         balancer_ip = true,
         balancer_port = true,
+        consumer_group_id = true,
         consumer_name = true,
         route_id = true,
         route_name = true,
diff --git a/apisix/init.lua b/apisix/init.lua
index 3100cc1d2..3eb68765b 100644
--- a/apisix/init.lua
+++ b/apisix/init.lua
@@ -30,6 +30,7 @@ local core            = require("apisix.core")
 local conf_server     = require("apisix.conf_server")
 local plugin          = require("apisix.plugin")
 local plugin_config   = require("apisix.plugin_config")
+local consumer_group  = require("apisix.consumer_group")
 local script          = require("apisix.script")
 local service_fetch   = require("apisix.http.service").get
 local admin_init      = require("apisix.admin.init")
@@ -147,6 +148,7 @@ function _M.http_init_worker()
     require("apisix.http.service").init_worker()
     plugin_config.init_worker()
     require("apisix.consumer").init_worker()
+    consumer_group.init_worker()
 
     apisix_upstream.init_worker()
     require("apisix.plugins.ext-plugin.init").init_worker()
@@ -446,9 +448,21 @@ function _M.http_access_phase()
         plugin.run_plugin("rewrite", plugins, api_ctx)
         if api_ctx.consumer then
             local changed
+            local group_conf
+
+            if api_ctx.consumer.group_id then
+                group_conf = consumer_group.get(api_ctx.consumer.group_id)
+                if not group_conf then
+                    core.log.error("failed to fetch consumer group config by ",
+                        "id: ", api_ctx.consumer.group_id)
+                    return core.response.exit(503)
+                end
+            end
+
             route, changed = plugin.merge_consumer_route(
                 route,
                 api_ctx.consumer,
+                group_conf,
                 api_ctx
             )
 
diff --git a/apisix/plugin.lua b/apisix/plugin.lua
index 7c26ac4d7..e0f2dbdc0 100644
--- a/apisix/plugin.lua
+++ b/apisix/plugin.lua
@@ -626,7 +626,7 @@ function _M.merge_service_route(service_conf, route_conf)
 end
 
 
-local function merge_consumer_route(route_conf, consumer_conf)
+local function merge_consumer_route(route_conf, consumer_conf, consumer_group_conf)
     if not consumer_conf.plugins or
        core.table.nkeys(consumer_conf.plugins) == 0
     then
@@ -635,6 +635,20 @@ local function merge_consumer_route(route_conf, consumer_conf)
     end
 
     local new_route_conf = core.table.deepcopy(route_conf)
+
+    if consumer_group_conf then
+        for name, conf in pairs(consumer_group_conf.value.plugins) do
+            if not new_route_conf.value.plugins then
+                new_route_conf.value.plugins = {}
+            end
+
+            if new_route_conf.value.plugins[name] == nil then
+                conf._from_consumer = true
+            end
+            new_route_conf.value.plugins[name] = conf
+        end
+    end
+
     for name, conf in pairs(consumer_conf.plugins) do
         if not new_route_conf.value.plugins then
             new_route_conf.value.plugins = {}
@@ -651,20 +665,33 @@ local function merge_consumer_route(route_conf, consumer_conf)
 end
 
 
-function _M.merge_consumer_route(route_conf, consumer_conf, api_ctx)
+function _M.merge_consumer_route(route_conf, consumer_conf, consumer_group_conf, api_ctx)
     core.log.info("route conf: ", core.json.delay_encode(route_conf))
     core.log.info("consumer conf: ", core.json.delay_encode(consumer_conf))
+    core.log.info("consumer group conf: ", core.json.delay_encode(consumer_group_conf))
 
     local flag = route_conf.value.id .. "#" .. route_conf.modifiedIndex
                  .. "#" .. consumer_conf.id .. "#" .. consumer_conf.modifiedIndex
+
+    if consumer_group_conf then
+        flag = flag .. "#" .. consumer_group_conf.value.id
+            .. "#" .. consumer_group_conf.modifiedIndex
+    end
+
     local new_conf = merged_route(flag, nil,
-                        merge_consumer_route, route_conf, consumer_conf)
+                        merge_consumer_route, route_conf, consumer_conf, consumer_group_conf)
 
     api_ctx.conf_type = api_ctx.conf_type .. "&consumer"
     api_ctx.conf_version = api_ctx.conf_version .. "&" ..
                            api_ctx.consumer_ver
     api_ctx.conf_id = api_ctx.conf_id .. "&" .. api_ctx.consumer_name
 
+    if consumer_group_conf then
+        api_ctx.conf_type = api_ctx.conf_type .. "&consumer_group"
+        api_ctx.conf_version = api_ctx.conf_version .. "&" .. consumer_group_conf.modifiedIndex
+        api_ctx.conf_id = api_ctx.conf_id .. "&" .. consumer_group_conf.value.id
+    end
+
     return new_conf, new_conf ~= route_conf
 end
 
diff --git a/apisix/schema_def.lua b/apisix/schema_def.lua
index 59e23542d..1ec0ea602 100644
--- a/apisix/schema_def.lua
+++ b/apisix/schema_def.lua
@@ -698,6 +698,7 @@ _M.consumer = {
             type = "string", minLength = 1, maxLength = rule_name_def.maxLength,
             pattern = [[^[a-zA-Z0-9_]+$]]
         },
+        group_id = id_schema,
         plugins = plugins_schema,
         labels = labels_def,
         create_time = timestamp_def,
@@ -918,6 +919,20 @@ _M.plugin_config = {
 }
 
 
+_M.consumer_group = {
+    type = "object",
+    properties = {
+        id = id_schema,
+        desc = desc_def,
+        plugins = plugins_schema,
+        labels = labels_def,
+        create_time = timestamp_def,
+        update_time = timestamp_def
+    },
+    required = {"id", "plugins"},
+}
+
+
 _M.id_schema = id_schema
 
 
diff --git a/docs/en/latest/admin-api.md b/docs/en/latest/admin-api.md
index e8173e198..2bf4d23c1 100644
--- a/docs/en/latest/admin-api.md
+++ b/docs/en/latest/admin-api.md
@@ -105,6 +105,7 @@ $ curl "http://127.0.0.1:9180/apisix/admin/routes?page=1&page_size=10" \
 Resources that support paging queries:
 
 - Consumer
+- Consumer Group
 - Global Rules
 - Plugin Config
 - Proto
@@ -560,6 +561,7 @@ Consumers are users of services and can only be used in conjunction with a user
 | Parameter   | Required | Type        | Description                                                                                                        | Example                                          |
 | ----------- | -------- | ----------- | ------------------------------------------------------------------------------------------------------------------ | ------------------------------------------------ |
 | username    | True     | Name        | Name of the Consumer.                                                                                              |                                                  |
+| group_id    | False    | Name        | Group of the Consumer.                                                                                              |                                                  |
 | plugins     | False    | Plugin      | Plugins that are executed during the request/response cycle. See [Plugin](terminology/plugin.md) for more. |                                                  |
 | desc        | False    | Auxiliary   | Description of usage scenarios.                                                                                    | customer xxxx                                    |
 | labels      | False    | Match Rules | Attributes of the Consumer specified as key-value pairs.                                                           | {"version":"v2","build":"16","env":"production"} |
@@ -947,6 +949,35 @@ Sets Plugins which run globally. i.e these Plugins will be run before any Route/
 | create_time | False    | Epoch timestamp (in seconds) of the created time. If missing, this field will be populated automatically.             | 1602883670 |
 | update_time | False    | Epoch timestamp (in seconds) of the updated time. If missing, this field will be populated automatically.             | 1602883670 |
 
+## Consumer group
+
+**API**: /apisix/admin/consumer_groups/{id}
+
+Group of Plugins which can be reused across Consumers.
+
+### Request Methods
+
+| Method | Request URI                              | Request Body | Description                                                                                                                           |
+| ------ | ---------------------------------------- | ------------ | ------------------------------------------------------------------------------------------------------------------------------------- |
+| GET    | /apisix/admin/consumer_groups             | NULL         | Fetches a list of all Consumer groups.                                                                                                 |
+| GET    | /apisix/admin/consumer_groups/{id}        | NULL         | Fetches specified Consumer group by id.                                                                                                |
+| PUT    | /apisix/admin/consumer_groups/{id}        | {...}        | Creates a new Consumer group with the specified id.                                                                                    |
+| DELETE | /apisix/admin/consumer_groups/{id}        | NULL         | Removes the Consumer group with the specified id.                                                                                      |
+| PATCH  | /apisix/admin/consumer_groups/{id}        | {...}        | Updates the selected attributes of the specified, existing Consumer group. To delete an attribute, set value of attribute set to null. |
+| PATCH  | /apisix/admin/consumer_groups/{id}/{path} | {...}        | Updates the attribute specified in the path. The values of other attributes remain unchanged.                                         |
+
+### Request Body Parameters
+
+| Parameter   | Required | Description                                                                                                        | Example                                          |
+| ----------- | -------- | ------------------------------------------------------------------------------------------------------------------ | ------------------------------------------------ |
+| plugins     | True     | Plugins that are executed during the request/response cycle. See [Plugin](terminology/plugin.md) for more. |                                                  |
+| desc        | False    | Description of usage scenarios.                                                                                    | customer xxxx                                    |
+| labels      | False    | Attributes of the Consumer group specified as key-value pairs.                                                      | {"version":"v2","build":"16","env":"production"} |
+| create_time | False    | Epoch timestamp (in seconds) of the created time. If missing, this field will be populated automatically.             | 1602883670                                       |
+| update_time | False    | Epoch timestamp (in seconds) of the updated time. If missing, this field will be populated automatically.             | 1602883670                                       |
+
+[Back to TOC](#table-of-contents)
+
 ## Plugin config
 
 **API**: /apisix/admin/plugin_configs/{id}
diff --git a/docs/en/latest/apisix-variable.md b/docs/en/latest/apisix-variable.md
index 9c0f01b5e..939545f04 100644
--- a/docs/en/latest/apisix-variable.md
+++ b/docs/en/latest/apisix-variable.md
@@ -38,6 +38,7 @@ additional variables.
 | balancer_ip         | core       | The IP of picked upstream server.                                                   | 192.168.1.2    |
 | balancer_port       | core       | The port of picked upstream server.                                                 | 80             |
 | consumer_name       | core       | Username of Consumer.                                                               |                |
+| consumer_group_id   | core       | Group ID of Consumer.                                                               |                |
 | graphql_name        | core       | The [operation name](https://graphql.org/learn/queries/#operation-name) of GraphQL. | HeroComparison |
 | graphql_operation   | core       | The operation type of GraphQL.                                                      | mutation       |
 | graphql_root_fields | core       | The top level fields of GraphQL.                                                    | ["hero"]       |
diff --git a/docs/en/latest/config.json b/docs/en/latest/config.json
index 5f75c3249..d895f30e5 100644
--- a/docs/en/latest/config.json
+++ b/docs/en/latest/config.json
@@ -28,6 +28,7 @@
       "items": [
         "terminology/api-gateway",
         "terminology/consumer",
+        "terminology/consumer-group",
         "terminology/global-rule",
         "terminology/plugin",
         "terminology/plugin-config",
diff --git a/docs/en/latest/terminology/consumer-group.md b/docs/en/latest/terminology/consumer-group.md
new file mode 100644
index 000000000..8daedf11f
--- /dev/null
+++ b/docs/en/latest/terminology/consumer-group.md
@@ -0,0 +1,100 @@
+---
+title: Consumer Group
+keywords:
+  - API gateway
+  - Apache APISIX
+  - Consumer Group
+description: Consumer Group in Apache APISIX.
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+Consumer Groups are used to extract commonly used [Plugin](./plugin.md) configurations and can be bound directly to a [Consumer](./consumer.md).
+
+With consumer groups, you can define any number of plugins, e.g. rate limiting and apply them to a set of consumers,
+instead of managing each consumer individually.
+
+While configuring the same plugin for the same route, only one copy of the configuration is valid.
+
+The example below illustrates how to create a Consumer Group and bind it to a Consumer:
+
+```shell
+curl http://127.0.0.1:9180/apisix/admin/consumer_groups/company_a -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
+{
+    "plugins": {
+        "limit-count": {
+            "count": 200,
+            "time_window": 60,
+            "rejected_code": 503,
+            "group": "$consumer_group_id"
+        }
+    }
+}'
+```
+
+```shell
+curl http://127.0.0.1:9180/apisix/admin/consumers -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
+{
+    "username": "jack",
+    "plugins": {
+        "key-auth": {
+            "key": "auth-one"
+        }
+    },
+    "group_id": "company_a"
+}'
+```
+
+When APISIX can't find the Consumer Group with the `group_id`, the Admin API is terminated with a status code of `400`.
+
+If a Consumer already has the `plugins` field configured, the plugins in the Consumer Group will effectively be merged into it. The same plugin in the Consumer Group will not override the one configured directly in the Consumer.
+
+For example, if we configure a Consumer Group as shown below:
+
+```json
+{
+    "id": "bar",
+    "plugins": {
+        "response-rewrite": {
+            "body": "hello"
+        }
+    }
+}
+```
+
+To a Consumer as shown below.
+
+```json
+{
+    "username": "foo",
+    "group_id": "bar",
+    "plugins": {
+        "basic-auth": {
+            "username": "foo",
+            "password": "bar"
+        },
+        "response-rewrite": {
+            "body": "world"
+        }
+    }
+}
+```
+
+Then the `body` in `response-rewrite` keeps `world`.
diff --git a/docs/en/latest/terminology/plugin.md b/docs/en/latest/terminology/plugin.md
index fd2656b2d..53227904a 100644
--- a/docs/en/latest/terminology/plugin.md
+++ b/docs/en/latest/terminology/plugin.md
@@ -27,7 +27,7 @@ This represents the configuration of the plugins that are executed during the HT
 
 :::note
 
-While configuring the same plugin, only one copy of the configuration is valid. The order of precedence is always `Consumer` > `Route` > `Plugin Config` > `Service`.
+While configuring the same plugin, only one copy of the configuration is valid. The order of precedence is always `Consumer` > `Consumer Group` > `Route` > `plugin_config` > `Service`.
 
 :::
 
diff --git a/docs/zh/latest/admin-api.md b/docs/zh/latest/admin-api.md
index b5a2ff19c..ab465324e 100644
--- a/docs/zh/latest/admin-api.md
+++ b/docs/zh/latest/admin-api.md
@@ -107,6 +107,7 @@ $ curl "http://127.0.0.1:9180/apisix/admin/routes?page=1&page_size=10" \
 目前支持分页查询的资源如下:
 
 - Consumer
+- Consumer Group
 - Global Rules
 - Plugin Config
 - Proto
@@ -568,6 +569,7 @@ HTTP/1.1 200 OK
 | 名字        | 可选项 | 类型     | 说明                                                                                                                             | 示例                                             |
 | ----------- | ------ | -------- | -------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------ |
 | username    | 必需   | 辅助     | Consumer 名称。                                                                                                                  |                                                  |
+| group_id    | 可选   | 辅助     | Consumer Group 名称。                                                                                                                  |                                                  |
 | plugins     | 可选   | Plugin   | 该 Consumer 对应的插件配置,它的优先级是最高的:Consumer > Route > Service。对于具体插件配置,可以参考 [Plugins](#plugin) 章节。 |                                                  |
 | desc        | 可选   | 辅助     | consumer 描述                                                                                                                    |                                                  |
 | labels      | 可选   | 匹配规则 | 标识附加属性的键值对                                                                                                             | {"version":"v2","build":"16","env":"production"} |
@@ -960,6 +962,35 @@ ssl 对象 json 配置内容:
 
 [Back to TOC](#目录)
 
+## Consumer Group
+
+*地址*:/apisix/admin/consumer_groups/{id}
+
+*说明*:配置一组可以在 Consumer 间复用的插件。
+
+### 请求方法
+
+| 名字   | 请求 uri                                 | 请求 body | 说明                                                                                                                                                                                     |
+| ------ | ---------------------------------------- | --------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
+| GET    | /apisix/admin/consumer_groups             | 无        | 获取资源列表                                                                                                                                                                             |
+| GET    | /apisix/admin/consumer_groups/{id}        | 无        | 获取资源                                                                                                                                                                                 |
+| PUT    | /apisix/admin/consumer_groups/{id}        | {...}     | 根据 id 创建资源                                                                                                                                                                         |
+| DELETE | /apisix/admin/consumer_groups/{id}        | 无        | 删除资源                                                                                                                                                                                 |
+| PATCH  | /apisix/admin/consumer_groups/{id}        | {...}     | 标准 PATCH ,修改已有 Consumer Group 的部分属性,其他不涉及的属性会原样保留;如果你要删除某个属性,将该属性的值设置为 null 即可删除;特别地,当需要修改属性的值为数组时,该属性将全量更新 |
+| PATCH  | /apisix/admin/consumer_groups/{id}/{path} | {...}     | SubPath PATCH,通过 {path} 指定 Consumer Group 要更新的属性,全量更新该属性的数据,其他不涉及的属性会原样保留。                                                                           |
+
+### body 请求参数
+
+| 名字      | 可选项   | 类型 | 说明        | 示例 |
+|---------|---------|----|-----------|----|
+|plugins  | 必需 |Plugin| 详见 [Plugin](terminology/plugin.md) ||
+|desc     | 可选 | 辅助 | 标识描述、使用场景等 |customer xxxx|
+|labels   | 可选 | 辅助 | 标识附加属性的键值对 |{"version":"v2","build":"16","env":"production"}|
+|create_time| 可选 | 辅助 | 单位为秒的 epoch 时间戳,如果不指定则自动创建 |1602883670|
+|update_time| 可选 | 辅助 | 单位为秒的 epoch 时间戳,如果不指定则自动创建 |1602883670|
+
+[Back to TOC](#目录)
+
 ## Plugin Config
 
 *地址*:/apisix/admin/plugin_configs/{id}
diff --git a/docs/zh/latest/apisix-variable.md b/docs/zh/latest/apisix-variable.md
index 194d0d567..358b327d9 100644
--- a/docs/zh/latest/apisix-variable.md
+++ b/docs/zh/latest/apisix-variable.md
@@ -37,6 +37,7 @@ APISIX 除了支持 [NGINX 变量](http://nginx.org/en/docs/varindex.html)外,
 | balancer_ip         | core       | 上游服务器的 IP 地址。                                                            | 192.168.1.2      |
 | balancer_port       | core       | 上游服务器的端口。                                                                | 80               |
 | consumer_name       | core       | 消费者的名称。                                                                    |                  |
+| consumer_group_id   | core       | 消费者所在的组的 ID。                                                            |                  |
 | graphql_name        | core       | GraphQL 的 [operation name](https://graphql.org/learn/queries/#operation-name)。 | HeroComparison   |
 | graphql_operation   | core       | GraphQL 的操作类型。                                                              | mutation         |
 | graphql_root_fields | core       | GraphQL 最高级别的字段。                                                          | ["hero"]          |
diff --git a/t/admin/consumer-group.t b/t/admin/consumer-group.t
new file mode 100644
index 000000000..176ed9f8a
--- /dev/null
+++ b/t/admin/consumer-group.t
@@ -0,0 +1,549 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+no_long_string();
+no_root_location();
+no_shuffle();
+log_level("info");
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    if (!$block->request) {
+        $block->set_value("request", "GET /t");
+    }
+
+    if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
+        $block->set_value("no_error_log", "[error]\n[alert]");
+    }
+});
+
+run_tests;
+
+__DATA__
+
+=== TEST 1: PUT
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local etcd = require("apisix.core.etcd")
+            local code, body = t('/apisix/admin/consumer_groups/company_a',
+                ngx.HTTP_PUT,
+                [[{
+                    "plugins": {
+                        "limit-count": {
+                            "count": 2,
+                            "time_window": 60,
+                            "rejected_code": 503,
+                            "key": "remote_addr"
+                        }
+                    }
+                }]],
+                [[{
+                    "value": {
+                        "plugins": {
+                            "limit-count": {
+                                "count": 2,
+                                "time_window": 60,
+                                "rejected_code": 503,
+                                "key": "remote_addr"
+                            }
+                        }
+                    },
+                    "key": "/apisix/consumer_groups/company_a"
+                }]]
+                )
+
+            ngx.status = code
+            ngx.say(body)
+
+            local res = assert(etcd.get('/consumer_groups/company_a'))
+            local create_time = res.body.node.value.create_time
+            assert(create_time ~= nil, "create_time is nil")
+            local update_time = res.body.node.value.update_time
+            assert(update_time ~= nil, "update_time is nil")
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 2: GET
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/consumer_groups/company_a',
+                ngx.HTTP_GET,
+                nil,
+                [[{
+                    "value": {
+                        "plugins": {
+                            "limit-count": {
+                                "count": 2,
+                                "time_window": 60,
+                                "rejected_code": 503,
+                                "key": "remote_addr"
+                            }
+                        }
+                    },
+                    "key": "/apisix/consumer_groups/company_a"
+                }]]
+                )
+
+            ngx.status = code
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 3: GET all
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/consumer_groups',
+                ngx.HTTP_GET,
+                nil,
+                [[{
+                    "total": 1,
+                    "list": [
+                        {
+                            "key": "/apisix/consumer_groups/company_a",
+                            "value": {
+                                "plugins": {
+                                    "limit-count": {
+                                    "time_window": 60,
+                                    "policy": "local",
+                                    "count": 2,
+                                    "key": "remote_addr",
+                                    "rejected_code": 503
+                                    }
+                                }
+                            }
+                        }
+                    ]
+                }]]
+                )
+
+            ngx.status = code
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 4: PATCH
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local etcd = require("apisix.core.etcd")
+            local res = assert(etcd.get('/consumer_groups/company_a'))
+            local prev_create_time = res.body.node.value.create_time
+            assert(prev_create_time ~= nil, "create_time is nil")
+            local prev_update_time = res.body.node.value.update_time
+            assert(prev_update_time ~= nil, "update_time is nil")
+            ngx.sleep(1)
+
+            local code, body = t('/apisix/admin/consumer_groups/company_a',
+                ngx.HTTP_PATCH,
+                [[{
+                    "plugins": {
+                    "limit-count": {
+                        "count": 3,
+                        "time_window": 60,
+                        "rejected_code": 503,
+                        "key": "remote_addr"
+                    }
+                }}]],
+                [[{
+                    "value": {
+                        "plugins": {
+                            "limit-count": {
+                                "count": 3,
+                                "time_window": 60,
+                                "rejected_code": 503,
+                                "key": "remote_addr"
+                            }
+                        }
+                    },
+                    "key": "/apisix/consumer_groups/company_a"
+                }]]
+                )
+
+            ngx.status = code
+            ngx.say(body)
+
+            local res = assert(etcd.get('/consumer_groups/company_a'))
+            local create_time = res.body.node.value.create_time
+            assert(prev_create_time == create_time, "create_time mismatched")
+            local update_time = res.body.node.value.update_time
+            assert(update_time ~= nil, "update_time is nil")
+            assert(prev_update_time ~= update_time, "update_time should be changed")
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 5: PATCH (sub path)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local etcd = require("apisix.core.etcd")
+            local res = assert(etcd.get('/consumer_groups/company_a'))
+            local prev_create_time = res.body.node.value.create_time
+            assert(prev_create_time ~= nil, "create_time is nil")
+            local prev_update_time = res.body.node.value.update_time
+            assert(prev_update_time ~= nil, "update_time is nil")
+            ngx.sleep(1)
+
+            local code, body = t('/apisix/admin/consumer_groups/company_a/plugins',
+                ngx.HTTP_PATCH,
+                [[{
+                    "limit-count": {
+                        "count": 2,
+                        "time_window": 60,
+                        "rejected_code": 503,
+                        "key": "remote_addr"
+                    }
+                }]],
+                [[{
+                    "value": {
+                        "plugins": {
+                            "limit-count": {
+                                "count": 2,
+                                "time_window": 60,
+                                "rejected_code": 503,
+                                "key": "remote_addr"
+                            }
+                        }
+                    },
+                    "key": "/apisix/consumer_groups/company_a"
+                }]]
+                )
+
+            ngx.status = code
+            ngx.say(body)
+
+            local res = assert(etcd.get('/consumer_groups/company_a'))
+            local create_time = res.body.node.value.create_time
+            assert(prev_create_time == create_time, "create_time mismatched")
+            local update_time = res.body.node.value.update_time
+            assert(update_time ~= nil, "update_time is nil")
+            assert(prev_update_time ~= update_time, "update_time should be changed")
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 6: invalid plugin
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/consumer_groups/company_a',
+                ngx.HTTP_PUT,
+                [[{
+                    "plugins": {
+                        "limit-count": {
+                            "rejected_code": 503,
+                            "time_window": 60,
+                            "key": "remote_addr"
+                        }
+                    }
+                }]]
+                )
+
+            ngx.status = code
+            ngx.print(body)
+        }
+    }
+--- response_body
+{"error_msg":"failed to check the configuration of plugin limit-count err: property \"count\" is required"}
+--- error_code: 400
+
+
+
+=== TEST 7: PUT (with non-plugin fields)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local etcd = require("apisix.core.etcd")
+            local code, body = t('/apisix/admin/consumer_groups/company_a',
+                ngx.HTTP_PUT,
+                [[{
+                    "plugins": {
+                        "limit-count": {
+                            "count": 2,
+                            "time_window": 60,
+                            "rejected_code": 503,
+                            "key": "remote_addr"
+                        }
+                    },
+                    "labels": {
+                        "你好": "世界"
+                    },
+                    "desc": "blah"
+                }]],
+                [[{
+                    "value": {
+                        "plugins": {
+                            "limit-count": {
+                                "count": 2,
+                                "time_window": 60,
+                                "rejected_code": 503,
+                                "key": "remote_addr"
+                            }
+                        },
+                        "labels": {
+                            "你好": "世界"
+                        },
+                        "desc": "blah"
+                    },
+                    "key": "/apisix/consumer_groups/company_a"
+                }]]
+                )
+
+            ngx.status = code
+            ngx.say(body)
+
+            local res = assert(etcd.get('/consumer_groups/company_a'))
+            local create_time = res.body.node.value.create_time
+            assert(create_time ~= nil, "create_time is nil")
+            local update_time = res.body.node.value.update_time
+            assert(update_time ~= nil, "update_time is nil")
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 8: GET (with non-plugin fields)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/consumer_groups/company_a',
+                ngx.HTTP_GET,
+                nil,
+                [[{
+                    "value": {
+                        "plugins": {
+                            "limit-count": {
+                                "count": 2,
+                                "time_window": 60,
+                                "rejected_code": 503,
+                                "key": "remote_addr"
+                            }
+                        },
+                        "labels": {
+                            "你好": "世界"
+                        },
+                        "desc": "blah"
+                    },
+                    "key": "/apisix/consumer_groups/company_a"
+                }]]
+                )
+
+            ngx.status = code
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 9: invalid non-plugin fields
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/consumer_groups/company_a',
+                ngx.HTTP_PUT,
+                [[{
+                    "labels": "a",
+                    "plugins": {
+                    }
+                }]]
+                )
+
+            ngx.status = code
+            ngx.print(body)
+        }
+    }
+--- response_body
+{"error_msg":"invalid configuration: property \"labels\" validation failed: wrong type: expected object, got string"}
+--- error_code: 400
+
+
+
+=== TEST 10: set consumer-group
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local etcd = require("apisix.core.etcd")
+            local code, body = t('/apisix/admin/consumer_groups/company_a',
+                ngx.HTTP_PUT,
+                [[{
+                    "plugins": {
+                        "limit-count": {
+                            "count": 2,
+                            "time_window": 60,
+                            "rejected_code": 503,
+                            "key": "remote_addr"
+                        }
+                    }
+                }]]
+                )
+
+            ngx.status = code
+            ngx.say(body)
+
+            local res = assert(etcd.get('/consumer_groups/company_a'))
+            local create_time = res.body.node.value.create_time
+            assert(create_time ~= nil, "create_time is nil")
+            local update_time = res.body.node.value.update_time
+            assert(update_time ~= nil, "update_time is nil")
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 11: add consumer with group
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/consumers/foobar',
+                ngx.HTTP_PUT,
+                [[{
+                    "username": "foobar",
+                    "plugins": {
+                        "key-auth": {
+                            "key": "auth-two"
+                        }
+                    },
+                    "group_id": "company_a"
+                }]]
+                )
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 12: delete-consumer group failed
+--- config
+    location /t {
+        content_by_lua_block {
+            ngx.sleep(0.3)
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/consumer_groups/company_a',
+                 ngx.HTTP_DELETE
+            )
+            ngx.print(body)
+        }
+    }
+--- response_body
+{"error_msg":"can not delete this consumer group, consumer [foobar] is still using it now"}
+
+
+
+=== TEST 13: delete consumer
+--- config
+    location /t {
+        content_by_lua_block {
+            ngx.sleep(0.3)
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/consumers/foobar',
+                 ngx.HTTP_DELETE
+            )
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 14: delete consumer-group
+--- config
+    location /t {
+        content_by_lua_block {
+            ngx.sleep(0.3)
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/consumer_groups/company_a',
+                 ngx.HTTP_DELETE
+            )
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 15: add consumer with invalid group
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/consumers/foobar',
+                ngx.HTTP_PUT,
+                [[{
+                    "username": "foobar",
+                    "plugins": {
+                        "key-auth": {
+                            "key": "auth-two"
+                        }
+                    },
+                    "group_id": "invalid_group"
+                }]]
+                )
+            assert(code >= 300)
+            ngx.say(body)
+        }
+    }
+--- response_body_like
+.*failed to fetch consumer group info by consumer group id.*
diff --git a/t/config-center-yaml/consumer-group.t b/t/config-center-yaml/consumer-group.t
new file mode 100644
index 000000000..ff6f6a64c
--- /dev/null
+++ b/t/config-center-yaml/consumer-group.t
@@ -0,0 +1,140 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+log_level('info');
+no_root_location();
+no_shuffle();
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    my $yaml_config = $block->yaml_config // <<_EOC_;
+apisix:
+    node_listen: 1984
+deployment:
+    role: data_plane
+    role_data_plane:
+        config_provider: yaml
+_EOC_
+
+    $block->set_value("yaml_config", $yaml_config);
+
+    my $routes = <<_EOC_;
+routes:
+  -
+    uri: /hello
+    plugins:
+        key-auth:
+    upstream:
+        nodes:
+            "127.0.0.1:1980": 1
+        type: roundrobin
+#END
+_EOC_
+
+    $block->set_value("apisix_yaml", $block->apisix_yaml . $routes);
+
+    if (!$block->request) {
+        $block->set_value("request", "GET /hello?apikey=one");
+    }
+
+    if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
+        $block->set_value("no_error_log", "[error]");
+    }
+});
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: sanity
+--- apisix_yaml
+consumer_groups:
+    - id: foobar
+      plugins:
+          response-rewrite:
+              body: "hello\n"
+consumers:
+    - username: one
+      group_id: foobar
+      plugins:
+          key-auth:
+              key: one
+#END
+--- response_body
+hello
+
+
+
+=== TEST 2: consumer group not found
+--- apisix_yaml
+consumers:
+   - username: one
+     group_id: invalid_group
+     plugins:
+       key-auth:
+         key: one
+#END
+--- error_code: 503
+--- error_log
+failed to fetch consumer group config by id: invalid_group
+
+
+
+=== TEST 3: plugin priority
+--- apisix_yaml
+consumer_groups:
+    - id: foobar
+      plugins:
+        response-rewrite:
+          body: "hello\n"
+consumers:
+  - username: one
+    group_id: foobar
+    plugins:
+      key-auth:
+        key: one
+      response-rewrite:
+        body: "world\n"
+#END
+--- response_body
+world
+
+
+
+=== TEST 4: invalid plugin
+--- apisix_yaml
+consumer_groups:
+    - id: foobar
+      plugins:
+        example-plugin:
+          skey: "s"
+        response-rewrite:
+          body: "hello\n"
+consumers:
+  - username: one
+    group_id: foobar
+    plugins:
+      key-auth:
+        key: one
+#END
+--- error_code: 503
+--- error_log
+failed to check the configuration of plugin example-plugin
+failed to fetch consumer group config by id: foobar
diff --git a/t/node/consumer-group.t b/t/node/consumer-group.t
new file mode 100644
index 000000000..5f863ad00
--- /dev/null
+++ b/t/node/consumer-group.t
@@ -0,0 +1,312 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+log_level('warn');
+no_root_location();
+no_shuffle();
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    if (!$block->request) {
+        $block->set_value("request", "GET /t");
+    }
+
+    if (!$block->error_log && !$block->no_error_log) {
+        $block->set_value("no_error_log", "[error]");
+    }
+});
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: consumer group usage
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+
+            local code, err = t('/apisix/admin/consumer_groups/bar',
+                ngx.HTTP_PUT,
+                [[{
+                    "plugins": {
+                        "response-rewrite": {
+                            "body": "hello"
+                        }
+                    }
+                }]]
+            )
+            if code > 300 then
+                ngx.log(ngx.ERR, err)
+                return
+            end
+
+            local code, body = t('/apisix/admin/consumers',
+                ngx.HTTP_PUT,
+                [[{
+                    "username": "foo",
+                    "group_id": "bar",
+                    "plugins": {
+                        "basic-auth": {
+                            "username": "foo",
+                            "password": "bar"
+                        }
+                    }
+                }]]
+            )
+            if code >= 300 then
+                ngx.status = code
+                ngx.say(body)
+                return
+            end
+
+            local code, err = t('/apisix/admin/routes/1',
+                ngx.HTTP_PUT,
+                [[{
+                    "uri": "/hello",
+                    "upstream": {
+                        "nodes": {
+                            "127.0.0.1:1980": 1
+                        },
+                        "type": "roundrobin"
+                    },
+                    "plugins": {
+                        "basic-auth": {}
+                    }
+                }]]
+            )
+            if code > 300 then
+                ngx.log(ngx.ERR, err)
+                return
+            end
+            ngx.sleep(0.5)
+
+            local http = require "resty.http"
+            local httpc = http.new()
+            local uri = "http://127.0.0.1:" .. ngx.var.server_port
+                        .. "/hello"
+            local headers = {
+                ["Authorization"] = "Basic Zm9vOmJhcg=="
+            }
+            local res, err = httpc:request_uri(uri, {headers = headers})
+            ngx.say(res.body)
+
+            local code, err = t('/apisix/admin/consumer_groups/bar',
+                ngx.HTTP_PATCH,
+                [[{
+                    "plugins": {
+                        "response-rewrite": {
+                            "body": "world"
+                        }
+                    }
+                }]]
+            )
+            if code > 300 then
+                ngx.log(ngx.ERR, err)
+                return
+            end
+            ngx.sleep(0.1)
+
+            local res, err = httpc:request_uri(uri, {headers = headers})
+            ngx.say(res.body)
+        }
+    }
+--- response_body
+hello
+world
+
+
+
+=== TEST 2: validated plugins configuration via incremental sync (malformed data)
+--- config
+    location /t {
+        content_by_lua_block {
+            local http = require "resty.http"
+            local core = require("apisix.core")
+
+            assert(core.etcd.set("/consumer_groups/bar",
+                {id = "bar", plugins = { ["uri-blocker"] = { block_rules =  1 }}}
+            ))
+            -- wait for sync
+            ngx.sleep(0.6)
+
+            assert(core.etcd.delete("/consumer_groups/bar"))
+        }
+    }
+--- error_log
+property "block_rules" validation failed
+
+
+
+=== TEST 3: don't override the plugin in the consumer
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+
+            local code, err = t('/apisix/admin/consumer_groups/bar',
+                ngx.HTTP_PUT,
+                [[{
+                    "plugins": {
+                        "response-rewrite": {
+                            "body": "hello"
+                        }
+                    }
+                }]]
+            )
+            if code > 300 then
+                ngx.log(ngx.ERR, err)
+                return
+            end
+
+            local code, body = t('/apisix/admin/consumers',
+                ngx.HTTP_PUT,
+                [[{
+                    "username": "foo",
+                    "group_id": "bar",
+                    "plugins": {
+                        "basic-auth": {
+                            "username": "foo",
+                            "password": "bar"
+                        },
+                        "response-rewrite": {
+                            "body": "world"
+                        }
+                    }
+                }]]
+            )
+            if code >= 300 then
+                ngx.status = code
+                ngx.say(body)
+                return
+            end
+
+            local code, err = t('/apisix/admin/routes/1',
+                ngx.HTTP_PUT,
+                [[{
+                    "uri": "/hello",
+                    "upstream": {
+                        "nodes": {
+                            "127.0.0.1:1980": 1
+                        },
+                        "type": "roundrobin"
+                    },
+                    "plugins": {
+                        "basic-auth": {}
+                    }
+                }]]
+            )
+            if code > 300 then
+                ngx.log(ngx.ERR, err)
+                return
+            end
+            ngx.sleep(0.1)
+
+            local http = require "resty.http"
+            local httpc = http.new()
+            local uri = "http://127.0.0.1:" .. ngx.var.server_port
+                        .. "/hello"
+            local headers = {
+                ["Authorization"] = "Basic Zm9vOmJhcg=="
+            }
+            local res, err = httpc:request_uri(uri, {headers = headers})
+            ngx.say(res.body)
+        }
+    }
+--- response_body
+world
+
+
+
+=== TEST 4: check consumer_group_id var
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+
+            local code, err = t('/apisix/admin/consumer_groups/bar',
+                ngx.HTTP_PUT,
+                [[{
+                    "plugins": {
+                        "serverless-post-function": {
+                            "phase": "access",
+                            "functions" : ["return function(_, ctx) ngx.say(ctx.var.consumer_group_id); ngx.exit(200); end"]
+                        }
+                    }
+                }]]
+            )
+            if code > 300 then
+                ngx.log(ngx.ERR, err)
+                return
+            end
+
+            local code, body = t('/apisix/admin/consumers',
+                ngx.HTTP_PUT,
+                [[{
+                    "username": "foo",
+                    "group_id": "bar",
+                    "plugins": {
+                        "basic-auth": {
+                            "username": "foo",
+                            "password": "bar"
+                        }
+                    }
+                }]]
+            )
+            if code >= 300 then
+                ngx.status = code
+                ngx.say(body)
+                return
+            end
+
+            local code, err = t('/apisix/admin/routes/1',
+                ngx.HTTP_PUT,
+                [[{
+                    "uri": "/hello",
+                    "upstream": {
+                        "nodes": {
+                            "127.0.0.1:1980": 1
+                        },
+                        "type": "roundrobin"
+                    },
+                    "plugins": {
+                        "basic-auth": {}
+                    }
+                }]]
+            )
+            if code > 300 then
+                ngx.log(ngx.ERR, err)
+                return
+            end
+            ngx.sleep(0.5)
+
+            local http = require "resty.http"
+            local httpc = http.new()
+            local uri = "http://127.0.0.1:" .. ngx.var.server_port
+                        .. "/hello"
+            local headers = {
+                ["Authorization"] = "Basic Zm9vOmJhcg=="
+            }
+            local res, err = httpc:request_uri(uri, {headers = headers})
+            ngx.print(res.body)
+        }
+    }
+--- response_body
+bar