You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by GitBox <gi...@apache.org> on 2022/03/13 13:45:14 UTC

[GitHub] [apisix] spacewander commented on a change in pull request #6599: feat: add tars discovery

spacewander commented on a change in pull request #6599:
URL: https://github.com/apache/apisix/pull/6599#discussion_r825449599



##########
File path: apisix/discovery/tars/init.lua
##########
@@ -0,0 +1,283 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local ngx = ngx
+local format = string.format
+local ipairs = ipairs
+local error = error
+local tonumber = tonumber
+local local_conf = require("apisix.core.config_local").local_conf()
+local core = require("apisix.core")
+local mysql = require("resty.mysql")
+local process = require("ngx.process")
+
+local full_query_sql = [[ select servant, group_concat(endpoint order by endpoint) as endpoints
+from t_server_conf left join t_adapter_conf tac using (application, server_name, node_name)
+where setting_state = 'active' and present_state = 'active'
+group by servant ]]
+
+local incremental_query_sql = [[
+select servant, (setting_state = 'active' and present_state = 'active') activated,
+group_concat(endpoint order by endpoint) endpoints
+from t_server_conf left join t_adapter_conf tac using (application, server_name, node_name)
+where (application, server_name) in
+(
+select application, server_name from t_server_conf
+where registry_timestamp > now() - interval %d second
+union
+select application, server_name from t_adapter_conf
+where registry_timestamp > now() - interval %d second
+)
+group by servant, activated order by activated desc ]]
+
+local _M = {
+    version = 0.1,
+}
+
+local endpoint_dict
+local default_weight
+
+local last_fetch_full_time = 0
+local last_fetch_error
+
+local endpoint_lrucache = core.lrucache.new({
+    ttl = 300,
+    count = 1024
+})
+
+local activated_buffer = core.table.new(10, 0)
+local nodes_buffer = core.table.new(0, 5)
+
+local endpoints_pattern = core.table.concat(
+        { [[tcp(\s*-[te]\s*(\S+)){0,2}\s*-([hpHP])\s*(\S+)(\s*-[teTE]\s*(\S+))]],
+          [[{0,2}\s*-([hpHP])\s*(\S+)(\s*-[teTE]\s*(\S+)){0,2}\s*(,|$)]] }
+)
+
+
+local function update_endpoint(servant, nodes)
+    local endpoint_content = core.json.encode(nodes, true)
+    local endpoint_version = ngx.crc32_long(endpoint_content)
+    core.log.debug("set servant ", servant, endpoint_content)
+    local _, err
+    _, err = endpoint_dict:safe_set(servant .. "#version", endpoint_version)
+    if err then
+        core.log.error("set endpoint version into discovery DICT failed, ", err)
+        return
+    end
+    _, err = endpoint_dict:safe_set(servant, endpoint_content)
+    if err then
+        core.log.error("set endpoint into discovery DICT failed, ", err)
+        endpoint_dict:delete(servant .. "#version")
+    end
+end
+
+
+local function delete_endpoint(servant)
+    core.log.info("delete servant ", servant)
+    endpoint_dict:delete(servant .. "#version")
+    endpoint_dict:delete(servant)
+end
+
+
+local function create_endpoint_lrucache(servant)
+    local endpoint_content = endpoint_dict:get_stale(servant)
+    if not endpoint_content then
+        core.log.error("get empty endpoint content from discovery DICT, servant: ", servant)
+        return nil
+    end
+
+    local endpoint = core.json.decode(endpoint_content)
+    if not endpoint then
+        core.log.error("decode endpoint content failed, content: ", endpoint_content)
+        return nil
+    end
+
+    return endpoint
+end
+
+
+local function get_endpoint(servant)
+    local endpoint_version = endpoint_dict:get_stale(servant .. "#version")
+    if not endpoint_version then
+        return nil
+    end
+
+    return endpoint_lrucache(servant, endpoint_version, create_endpoint_lrucache, servant)
+end
+
+
+local function extract_endpoint(query_result)
+    for _, p in ipairs(query_result) do
+        repeat
+            local servant = p.servant
+
+            if servant == ngx.null then
+                break
+            end
+
+            if p.activated == 1 then
+                activated_buffer[servant] = ngx.null
+            elseif p.activated == 0 then
+                if activated_buffer[servant] == nil then
+                    delete_endpoint(servant)
+                end
+                break
+            end
+
+            core.table.clear(nodes_buffer)
+            local iterator = ngx.re.gmatch(p.endpoints, endpoints_pattern, "jao")
+            while true do
+                local captures, err = iterator()
+                if err then
+                    core.log.error("gmatch failed, error: ", err, " , endpoints: ", p.endpoints)
+                    break
+                end
+
+                if not captures then
+                    break
+                end
+
+                local host, port
+                if captures[3] == "h" or captures[3] == "H" then
+                    host = captures[4]
+                    port = tonumber(captures[8])
+                else
+                    host = captures[8]
+                    port = tonumber(captures[4])
+                end
+
+                core.table.insert(nodes_buffer, {
+                    host = host,
+                    port = port,
+                    weight = default_weight,
+                })
+            end
+            update_endpoint(servant, nodes_buffer)
+        until true
+    end
+end
+
+
+local function fetch_full(db_cli)
+    local res, err, errcode, sqlstate = db_cli:query(full_query_sql)
+    if not res then
+        core.log.error("query failed, error: ", err, ", ", errcode, " ", sqlstate)
+        return err
+    end
+
+    endpoint_dict:flush_all()
+    extract_endpoint(res)
+
+    while err == "again" do
+        res, err, errcode, sqlstate = db_cli:read_result()
+        if not res then
+            if err then
+                core.log.error("read result failed, error: ", err, ", ", errcode, " ", sqlstate)
+                return err
+            end
+        end
+        extract_endpoint(res)
+    end
+    endpoint_dict:flush_expired()
+end
+
+
+local function fetch_incremental(db_cli)
+    local res, err, errcode, sqlstate = db_cli:query(incremental_query_sql)
+    if not res then
+        core.log.error("query failed, error: ", err, ", ", errcode, " ", sqlstate)
+        return err
+    end
+
+    core.table.clear(activated_buffer)
+    extract_endpoint(res)
+
+    while err == "again" do
+        res, err, errcode, sqlstate = db_cli:read_result()
+        if not res then
+            if err then
+                core.log.error("read result failed, error: ", err, ", ", errcode, " ", sqlstate)
+                return err
+            end
+            return err
+        end
+        extract_endpoint(res)
+    end
+end
+
+
+local function fetch_endpoint(premature, conf)
+    if premature then
+        return
+    end
+
+    local db_cli, err = mysql:new()
+    if not db_cli then
+        core.log.error("failed to instantiate mysql: ", err)
+        return
+    end
+    db_cli:set_timeout(3000)
+
+    local ok, err, errcode, sqlstate = db_cli:connect(conf.db_conf)
+    if not ok then
+        core.log.error("failed to connect mysql: ", err, ", ", errcode, ", ", sqlstate)
+        return
+    end
+
+    local now = ngx.time()
+
+    if last_fetch_error or last_fetch_full_time + conf.full_fetch_interval <= now then
+        last_fetch_full_time = now
+        last_fetch_error = fetch_full(db_cli)
+    else
+        last_fetch_error = fetch_incremental(db_cli)
+    end
+
+    if not last_fetch_error then
+        db_cli:set_keepalive(120 * 1000, 1)
+    end
+end
+
+
+function _M.nodes(servant)
+    return get_endpoint(servant)
+end
+
+
+function _M.init_worker()
+    -- TODO: maybe we can read dict name from discovery config
+    endpoint_dict = ngx.shared.discovery
+    if not endpoint_dict then
+        error("failed to get nginx shared dict: discovery, please check your APISIX version")
+    end
+
+    if process.type() ~= "privileged agent" then
+        return
+    end
+
+    local conf = local_conf.discovery.tars
+    default_weight = local_conf.discovery.tars.default_weight or 100

Review comment:
       ```suggestion
       default_weight = conf.default_weight
   ```
   The default weight is set already.

##########
File path: t/tars/discovery/tars.t
##########
@@ -0,0 +1,409 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+log_level('debug');
+no_root_location();
+no_shuffle();
+workers(4);
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    my $yaml_config = <<_EOC_;
+apisix:
+  node_listen: 1984
+  config_center: yaml
+  enable_admin: false
+discovery:
+  tars:
+    db_conf:
+      host: 127.0.0.1
+      port: 3306
+      database: db_tars
+      user: root
+      password: tars2022
+    full_fetch_interval: 90
+    incremental_fetch_interval: 5
+_EOC_
+
+    $block->set_value("yaml_config", $yaml_config);
+
+    my $apisix_yaml = $block->apisix_yaml // <<_EOC_;
+routes: []
+#END
+_EOC_
+
+    $block->set_value("apisix_yaml", $apisix_yaml);
+
+    my $config = $block->config // <<_EOC_;
+        location /count {
+            content_by_lua_block {
+              local core = require("apisix.core")
+              local d = require("apisix.discovery.tars")
+
+              ngx.sleep(11)
+
+              ngx.req.read_body()
+              local request_body = ngx.req.get_body_data()
+              local queries = core.json.decode(request_body)
+              local response_body = "{"
+              for _,query in ipairs(queries) do
+                local nodes = d.nodes(query)
+                if nodes==nil or #nodes==0 then
+                    response_body=response_body.." "..0
+                else
+                    response_body=response_body.." "..#nodes
+                end
+              end
+              ngx.say(response_body.." }")
+            }
+        }
+
+        location /nodes {
+            content_by_lua_block {
+              local core = require("apisix.core")
+              local d = require("apisix.discovery.tars")
+
+              ngx.sleep(11)
+
+              ngx.req.read_body()
+              local servant = ngx.req.get_body_data()
+              local response=""
+              local nodes = d.nodes(servant)
+              response="{"
+              for _,node in ipairs(nodes or {}) do
+                response=response..node.host..":"..node.port..","
+              end
+              response=response.."}"
+              ngx.say(response)
+            }
+        }
+
+        location /sql {
+            content_by_lua_block {
+                local mysql = require("resty.mysql")
+                local core = require("apisix.core")
+                local ipairs = ipairs
+
+                ngx.req.read_body()
+                local sql = ngx.req.get_body_data()
+                core.log.info("get sql ", sql)
+
+                local db_conf= {
+                  host="127.0.0.1",
+                  port=3306,
+                  database="db_tars",
+                  user="root",
+                  password="tars2022",
+                }
+
+                local db_cli, err = mysql:new()
+                if not db_cli then
+                  core.log.error("failed to instantiate mysql: ", err)
+                  return
+                end
+                db_cli:set_timeout(3000)
+
+                local ok, err, errcode, sqlstate = db_cli:connect(db_conf)
+                if not ok then
+                  core.log.error("failed to connect mysql: ", err, ", ", errcode, ", ", sqlstate)
+                  return
+                end
+
+                local res, err, errcode, sqlstate = db_cli:query(sql)
+                if not res then
+                   ngx.say("bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
+                   return
+                end
+                ngx.say("DONE")
+            }
+        }
+_EOC_
+
+    $block->set_value("config", $config);
+});
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: create initial server and servant
+--- timeout: 12
+--- yaml_config eval: $::yaml_config

Review comment:
       We already set the yaml_config?

##########
File path: apisix/discovery/tars/init.lua
##########
@@ -0,0 +1,283 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local ngx = ngx
+local format = string.format
+local ipairs = ipairs
+local error = error
+local tonumber = tonumber
+local local_conf = require("apisix.core.config_local").local_conf()
+local core = require("apisix.core")
+local mysql = require("resty.mysql")
+local process = require("ngx.process")
+
+local full_query_sql = [[ select servant, group_concat(endpoint order by endpoint) as endpoints
+from t_server_conf left join t_adapter_conf tac using (application, server_name, node_name)
+where setting_state = 'active' and present_state = 'active'
+group by servant ]]
+
+local incremental_query_sql = [[
+select servant, (setting_state = 'active' and present_state = 'active') activated,
+group_concat(endpoint order by endpoint) endpoints
+from t_server_conf left join t_adapter_conf tac using (application, server_name, node_name)
+where (application, server_name) in
+(
+select application, server_name from t_server_conf
+where registry_timestamp > now() - interval %d second
+union
+select application, server_name from t_adapter_conf
+where registry_timestamp > now() - interval %d second
+)
+group by servant, activated order by activated desc ]]
+
+local _M = {
+    version = 0.1,
+}
+
+local endpoint_dict
+local default_weight
+
+local last_fetch_full_time = 0
+local last_fetch_error
+
+local endpoint_lrucache = core.lrucache.new({
+    ttl = 300,
+    count = 1024
+})
+
+local activated_buffer = core.table.new(10, 0)
+local nodes_buffer = core.table.new(0, 5)
+
+local endpoints_pattern = core.table.concat(
+        { [[tcp(\s*-[te]\s*(\S+)){0,2}\s*-([hpHP])\s*(\S+)(\s*-[teTE]\s*(\S+))]],
+          [[{0,2}\s*-([hpHP])\s*(\S+)(\s*-[teTE]\s*(\S+)){0,2}\s*(,|$)]] }
+)
+
+
+local function update_endpoint(servant, nodes)
+    local endpoint_content = core.json.encode(nodes, true)
+    local endpoint_version = ngx.crc32_long(endpoint_content)
+    core.log.debug("set servant ", servant, endpoint_content)
+    local _, err
+    _, err = endpoint_dict:safe_set(servant .. "#version", endpoint_version)
+    if err then
+        core.log.error("set endpoint version into discovery DICT failed, ", err)
+        return
+    end
+    _, err = endpoint_dict:safe_set(servant, endpoint_content)
+    if err then
+        core.log.error("set endpoint into discovery DICT failed, ", err)
+        endpoint_dict:delete(servant .. "#version")
+    end
+end
+
+
+local function delete_endpoint(servant)
+    core.log.info("delete servant ", servant)
+    endpoint_dict:delete(servant .. "#version")
+    endpoint_dict:delete(servant)
+end
+
+
+local function create_endpoint_lrucache(servant)
+    local endpoint_content = endpoint_dict:get_stale(servant)
+    if not endpoint_content then
+        core.log.error("get empty endpoint content from discovery DICT, servant: ", servant)
+        return nil
+    end
+
+    local endpoint = core.json.decode(endpoint_content)
+    if not endpoint then
+        core.log.error("decode endpoint content failed, content: ", endpoint_content)
+        return nil
+    end
+
+    return endpoint
+end
+
+
+local function get_endpoint(servant)
+    local endpoint_version = endpoint_dict:get_stale(servant .. "#version")

Review comment:
       Could you tell me why get_statle is used instead of get?

##########
File path: apisix/discovery/tars/init.lua
##########
@@ -0,0 +1,283 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local ngx = ngx
+local format = string.format
+local ipairs = ipairs
+local error = error
+local tonumber = tonumber
+local local_conf = require("apisix.core.config_local").local_conf()
+local core = require("apisix.core")
+local mysql = require("resty.mysql")
+local process = require("ngx.process")
+
+local full_query_sql = [[ select servant, group_concat(endpoint order by endpoint) as endpoints
+from t_server_conf left join t_adapter_conf tac using (application, server_name, node_name)
+where setting_state = 'active' and present_state = 'active'
+group by servant ]]
+
+local incremental_query_sql = [[
+select servant, (setting_state = 'active' and present_state = 'active') activated,
+group_concat(endpoint order by endpoint) endpoints
+from t_server_conf left join t_adapter_conf tac using (application, server_name, node_name)
+where (application, server_name) in
+(
+select application, server_name from t_server_conf
+where registry_timestamp > now() - interval %d second
+union
+select application, server_name from t_adapter_conf
+where registry_timestamp > now() - interval %d second
+)
+group by servant, activated order by activated desc ]]
+
+local _M = {
+    version = 0.1,
+}
+
+local endpoint_dict
+local default_weight
+
+local last_fetch_full_time = 0
+local last_fetch_error
+
+local endpoint_lrucache = core.lrucache.new({
+    ttl = 300,
+    count = 1024
+})
+
+local activated_buffer = core.table.new(10, 0)
+local nodes_buffer = core.table.new(0, 5)
+
+local endpoints_pattern = core.table.concat(
+        { [[tcp(\s*-[te]\s*(\S+)){0,2}\s*-([hpHP])\s*(\S+)(\s*-[teTE]\s*(\S+))]],
+          [[{0,2}\s*-([hpHP])\s*(\S+)(\s*-[teTE]\s*(\S+)){0,2}\s*(,|$)]] }
+)
+
+
+local function update_endpoint(servant, nodes)
+    local endpoint_content = core.json.encode(nodes, true)
+    local endpoint_version = ngx.crc32_long(endpoint_content)
+    core.log.debug("set servant ", servant, endpoint_content)
+    local _, err
+    _, err = endpoint_dict:safe_set(servant .. "#version", endpoint_version)
+    if err then
+        core.log.error("set endpoint version into discovery DICT failed, ", err)
+        return
+    end
+    _, err = endpoint_dict:safe_set(servant, endpoint_content)
+    if err then
+        core.log.error("set endpoint into discovery DICT failed, ", err)
+        endpoint_dict:delete(servant .. "#version")
+    end
+end
+
+
+local function delete_endpoint(servant)
+    core.log.info("delete servant ", servant)
+    endpoint_dict:delete(servant .. "#version")
+    endpoint_dict:delete(servant)
+end
+
+
+local function create_endpoint_lrucache(servant)

Review comment:
       ```suggestion
   local function add_endpoint_to_lrucache(servant)
   ```

##########
File path: apisix/discovery/tars/init.lua
##########
@@ -0,0 +1,283 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local ngx = ngx
+local format = string.format
+local ipairs = ipairs
+local error = error
+local tonumber = tonumber
+local local_conf = require("apisix.core.config_local").local_conf()
+local core = require("apisix.core")
+local mysql = require("resty.mysql")
+local process = require("ngx.process")
+
+local full_query_sql = [[ select servant, group_concat(endpoint order by endpoint) as endpoints
+from t_server_conf left join t_adapter_conf tac using (application, server_name, node_name)
+where setting_state = 'active' and present_state = 'active'
+group by servant ]]
+
+local incremental_query_sql = [[
+select servant, (setting_state = 'active' and present_state = 'active') activated,
+group_concat(endpoint order by endpoint) endpoints
+from t_server_conf left join t_adapter_conf tac using (application, server_name, node_name)
+where (application, server_name) in
+(
+select application, server_name from t_server_conf
+where registry_timestamp > now() - interval %d second
+union
+select application, server_name from t_adapter_conf
+where registry_timestamp > now() - interval %d second
+)
+group by servant, activated order by activated desc ]]
+
+local _M = {
+    version = 0.1,
+}
+
+local endpoint_dict
+local default_weight
+
+local last_fetch_full_time = 0
+local last_fetch_error
+
+local endpoint_lrucache = core.lrucache.new({
+    ttl = 300,
+    count = 1024
+})
+
+local activated_buffer = core.table.new(10, 0)
+local nodes_buffer = core.table.new(0, 5)
+
+local endpoints_pattern = core.table.concat(

Review comment:
       Could you show us the data which is matched by this regex?

##########
File path: t/tars/discovery/tars.t
##########
@@ -0,0 +1,409 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+log_level('debug');
+no_root_location();
+no_shuffle();
+workers(4);
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    my $yaml_config = <<_EOC_;
+apisix:
+  node_listen: 1984
+  config_center: yaml
+  enable_admin: false
+discovery:
+  tars:
+    db_conf:
+      host: 127.0.0.1
+      port: 3306
+      database: db_tars
+      user: root
+      password: tars2022
+    full_fetch_interval: 90
+    incremental_fetch_interval: 5
+_EOC_
+
+    $block->set_value("yaml_config", $yaml_config);
+
+    my $apisix_yaml = $block->apisix_yaml // <<_EOC_;
+routes: []
+#END
+_EOC_
+
+    $block->set_value("apisix_yaml", $apisix_yaml);
+
+    my $config = $block->config // <<_EOC_;
+        location /count {
+            content_by_lua_block {
+              local core = require("apisix.core")
+              local d = require("apisix.discovery.tars")
+
+              ngx.sleep(11)
+
+              ngx.req.read_body()
+              local request_body = ngx.req.get_body_data()
+              local queries = core.json.decode(request_body)
+              local response_body = "{"
+              for _,query in ipairs(queries) do
+                local nodes = d.nodes(query)
+                if nodes==nil or #nodes==0 then
+                    response_body=response_body.." "..0
+                else
+                    response_body=response_body.." "..#nodes
+                end
+              end
+              ngx.say(response_body.." }")
+            }
+        }
+
+        location /nodes {
+            content_by_lua_block {
+              local core = require("apisix.core")
+              local d = require("apisix.discovery.tars")
+
+              ngx.sleep(11)
+
+              ngx.req.read_body()
+              local servant = ngx.req.get_body_data()
+              local response=""
+              local nodes = d.nodes(servant)
+              response="{"
+              for _,node in ipairs(nodes or {}) do
+                response=response..node.host..":"..node.port..","
+              end
+              response=response.."}"
+              ngx.say(response)
+            }
+        }
+
+        location /sql {
+            content_by_lua_block {
+                local mysql = require("resty.mysql")
+                local core = require("apisix.core")
+                local ipairs = ipairs
+
+                ngx.req.read_body()
+                local sql = ngx.req.get_body_data()
+                core.log.info("get sql ", sql)
+
+                local db_conf= {
+                  host="127.0.0.1",
+                  port=3306,
+                  database="db_tars",
+                  user="root",
+                  password="tars2022",
+                }
+
+                local db_cli, err = mysql:new()
+                if not db_cli then
+                  core.log.error("failed to instantiate mysql: ", err)
+                  return
+                end
+                db_cli:set_timeout(3000)
+
+                local ok, err, errcode, sqlstate = db_cli:connect(db_conf)
+                if not ok then
+                  core.log.error("failed to connect mysql: ", err, ", ", errcode, ", ", sqlstate)
+                  return
+                end
+
+                local res, err, errcode, sqlstate = db_cli:query(sql)
+                if not res then
+                   ngx.say("bad result: ", err, ": ", errcode, ": ", sqlstate, ".")
+                   return
+                end
+                ngx.say("DONE")
+            }
+        }
+_EOC_
+
+    $block->set_value("config", $config);
+});
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: create initial server and servant
+--- timeout: 12
+--- yaml_config eval: $::yaml_config
+--- request eval
+[
+"POST /sql
+truncate table t_server_conf",
+
+"POST /sql
+truncate table t_adapter_conf",
+
+"POST /sql
+insert into t_server_conf(application, server_name, node_name, registry_timestamp,
+                          template_name, setting_state, present_state, server_type)
+values ('A', 'AServer', '172.16.1.1', now(), 'taf-cpp', 'active', 'active', 'tars_cpp'),
+       ('B', 'BServer', '172.16.2.1', now(), 'taf-cpp', 'active', 'active', 'tars_cpp'),
+       ('C', 'CServer', '172.16.3.1', now(), 'taf-cpp', 'active', 'active', 'tars_cpp')",
+
+"POST /sql
+insert into t_adapter_conf(application, server_name, node_name, adapter_name, endpoint, servant)
+values ('A', 'AServer', '172.16.1.1', 'A.AServer.FirstObjAdapter',
+        'tcp -h 172.16.1.1 -p 10001 -e 0 -t 6000', 'A.AServer.FirstObj'),
+       ('B', 'BServer', '172.16.2.1', 'B.BServer.FirstObjAdapter',
+        'tcp -p 10001 -h 172.16.2.1 -e 0 -t 6000', 'B.BServer.FirstObj'),
+       ('C', 'CServer', '172.16.3.1', 'C.CServer.FirstObjAdapter',
+        'tcp -e 0 -h 172.16.3.1 -t 6000 -p 10001 ', 'C.CServer.FirstObj')",
+
+"GET /count
+[\"A.AServer.FirstObj\",\"B.BServer.FirstObj\", \"C.CServer.FirstObj\"]",
+
+]
+--- response_body eval
+[
+    "DONE\n",
+    "DONE\n",
+    "DONE\n",
+    "DONE\n",
+    "{ 1 1 1 }\n",
+]
+--- no_error_log

Review comment:
       Let's save the no_error_log like https://github.com/apache/apisix/blob/1ac7166e80fdd502cc6a8a60ef58f415d3c0e56c/t/plugin/gzip.t#L40 

##########
File path: apisix/discovery/tars/init.lua
##########
@@ -0,0 +1,283 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local ngx = ngx
+local format = string.format
+local ipairs = ipairs
+local error = error
+local tonumber = tonumber
+local local_conf = require("apisix.core.config_local").local_conf()
+local core = require("apisix.core")
+local mysql = require("resty.mysql")
+local process = require("ngx.process")
+
+local full_query_sql = [[ select servant, group_concat(endpoint order by endpoint) as endpoints
+from t_server_conf left join t_adapter_conf tac using (application, server_name, node_name)
+where setting_state = 'active' and present_state = 'active'
+group by servant ]]
+
+local incremental_query_sql = [[
+select servant, (setting_state = 'active' and present_state = 'active') activated,
+group_concat(endpoint order by endpoint) endpoints
+from t_server_conf left join t_adapter_conf tac using (application, server_name, node_name)
+where (application, server_name) in
+(
+select application, server_name from t_server_conf
+where registry_timestamp > now() - interval %d second
+union
+select application, server_name from t_adapter_conf
+where registry_timestamp > now() - interval %d second
+)
+group by servant, activated order by activated desc ]]
+
+local _M = {
+    version = 0.1,
+}
+
+local endpoint_dict
+local default_weight
+
+local last_fetch_full_time = 0
+local last_fetch_error
+
+local endpoint_lrucache = core.lrucache.new({
+    ttl = 300,
+    count = 1024
+})
+
+local activated_buffer = core.table.new(10, 0)
+local nodes_buffer = core.table.new(0, 5)
+
+local endpoints_pattern = core.table.concat(
+        { [[tcp(\s*-[te]\s*(\S+)){0,2}\s*-([hpHP])\s*(\S+)(\s*-[teTE]\s*(\S+))]],
+          [[{0,2}\s*-([hpHP])\s*(\S+)(\s*-[teTE]\s*(\S+)){0,2}\s*(,|$)]] }
+)
+
+
+local function update_endpoint(servant, nodes)
+    local endpoint_content = core.json.encode(nodes, true)
+    local endpoint_version = ngx.crc32_long(endpoint_content)
+    core.log.debug("set servant ", servant, endpoint_content)
+    local _, err
+    _, err = endpoint_dict:safe_set(servant .. "#version", endpoint_version)
+    if err then
+        core.log.error("set endpoint version into discovery DICT failed, ", err)
+        return
+    end
+    _, err = endpoint_dict:safe_set(servant, endpoint_content)
+    if err then
+        core.log.error("set endpoint into discovery DICT failed, ", err)
+        endpoint_dict:delete(servant .. "#version")
+    end
+end
+
+
+local function delete_endpoint(servant)
+    core.log.info("delete servant ", servant)
+    endpoint_dict:delete(servant .. "#version")
+    endpoint_dict:delete(servant)
+end
+
+
+local function create_endpoint_lrucache(servant)
+    local endpoint_content = endpoint_dict:get_stale(servant)
+    if not endpoint_content then
+        core.log.error("get empty endpoint content from discovery DICT, servant: ", servant)
+        return nil
+    end
+
+    local endpoint = core.json.decode(endpoint_content)
+    if not endpoint then
+        core.log.error("decode endpoint content failed, content: ", endpoint_content)
+        return nil
+    end
+
+    return endpoint
+end
+
+
+local function get_endpoint(servant)
+    local endpoint_version = endpoint_dict:get_stale(servant .. "#version")
+    if not endpoint_version then
+        return nil
+    end
+
+    return endpoint_lrucache(servant, endpoint_version, create_endpoint_lrucache, servant)
+end
+
+
+local function extract_endpoint(query_result)
+    for _, p in ipairs(query_result) do
+        repeat
+            local servant = p.servant
+
+            if servant == ngx.null then
+                break
+            end
+
+            if p.activated == 1 then
+                activated_buffer[servant] = ngx.null
+            elseif p.activated == 0 then
+                if activated_buffer[servant] == nil then
+                    delete_endpoint(servant)
+                end
+                break
+            end
+
+            core.table.clear(nodes_buffer)
+            local iterator = ngx.re.gmatch(p.endpoints, endpoints_pattern, "jao")
+            while true do
+                local captures, err = iterator()
+                if err then
+                    core.log.error("gmatch failed, error: ", err, " , endpoints: ", p.endpoints)
+                    break
+                end
+
+                if not captures then
+                    break
+                end
+
+                local host, port
+                if captures[3] == "h" or captures[3] == "H" then
+                    host = captures[4]
+                    port = tonumber(captures[8])
+                else
+                    host = captures[8]
+                    port = tonumber(captures[4])
+                end
+
+                core.table.insert(nodes_buffer, {
+                    host = host,
+                    port = port,
+                    weight = default_weight,
+                })
+            end
+            update_endpoint(servant, nodes_buffer)
+        until true
+    end
+end
+
+
+local function fetch_full(db_cli)
+    local res, err, errcode, sqlstate = db_cli:query(full_query_sql)
+    if not res then
+        core.log.error("query failed, error: ", err, ", ", errcode, " ", sqlstate)
+        return err
+    end
+
+    endpoint_dict:flush_all()
+    extract_endpoint(res)
+
+    while err == "again" do
+        res, err, errcode, sqlstate = db_cli:read_result()
+        if not res then
+            if err then
+                core.log.error("read result failed, error: ", err, ", ", errcode, " ", sqlstate)
+                return err
+            end
+        end
+        extract_endpoint(res)
+    end
+    endpoint_dict:flush_expired()
+end
+
+
+local function fetch_incremental(db_cli)
+    local res, err, errcode, sqlstate = db_cli:query(incremental_query_sql)
+    if not res then
+        core.log.error("query failed, error: ", err, ", ", errcode, " ", sqlstate)
+        return err
+    end
+
+    core.table.clear(activated_buffer)
+    extract_endpoint(res)
+
+    while err == "again" do
+        res, err, errcode, sqlstate = db_cli:read_result()
+        if not res then
+            if err then
+                core.log.error("read result failed, error: ", err, ", ", errcode, " ", sqlstate)
+                return err
+            end
+            return err
+        end
+        extract_endpoint(res)
+    end
+end
+
+
+local function fetch_endpoint(premature, conf)
+    if premature then
+        return
+    end
+
+    local db_cli, err = mysql:new()
+    if not db_cli then
+        core.log.error("failed to instantiate mysql: ", err)
+        return
+    end
+    db_cli:set_timeout(3000)
+
+    local ok, err, errcode, sqlstate = db_cli:connect(conf.db_conf)
+    if not ok then
+        core.log.error("failed to connect mysql: ", err, ", ", errcode, ", ", sqlstate)
+        return
+    end
+
+    local now = ngx.time()
+
+    if last_fetch_error or last_fetch_full_time + conf.full_fetch_interval <= now then
+        last_fetch_full_time = now
+        last_fetch_error = fetch_full(db_cli)
+    else
+        last_fetch_error = fetch_incremental(db_cli)
+    end
+
+    if not last_fetch_error then
+        db_cli:set_keepalive(120 * 1000, 1)
+    end
+end
+
+
+function _M.nodes(servant)
+    return get_endpoint(servant)
+end
+
+
+function _M.init_worker()
+    -- TODO: maybe we can read dict name from discovery config
+    endpoint_dict = ngx.shared.discovery

Review comment:
       Let's use a new shdict instead of hijacking authz-keycloak's

##########
File path: t/tars/discovery/tars.t
##########
@@ -0,0 +1,409 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+log_level('debug');

Review comment:
       Is it necessary to use debug level?

##########
File path: apisix/discovery/tars/init.lua
##########
@@ -0,0 +1,283 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local ngx = ngx
+local format = string.format
+local ipairs = ipairs
+local error = error
+local tonumber = tonumber
+local local_conf = require("apisix.core.config_local").local_conf()
+local core = require("apisix.core")
+local mysql = require("resty.mysql")
+local process = require("ngx.process")
+
+local full_query_sql = [[ select servant, group_concat(endpoint order by endpoint) as endpoints
+from t_server_conf left join t_adapter_conf tac using (application, server_name, node_name)
+where setting_state = 'active' and present_state = 'active'
+group by servant ]]
+
+local incremental_query_sql = [[
+select servant, (setting_state = 'active' and present_state = 'active') activated,
+group_concat(endpoint order by endpoint) endpoints
+from t_server_conf left join t_adapter_conf tac using (application, server_name, node_name)
+where (application, server_name) in
+(
+select application, server_name from t_server_conf
+where registry_timestamp > now() - interval %d second
+union
+select application, server_name from t_adapter_conf
+where registry_timestamp > now() - interval %d second
+)
+group by servant, activated order by activated desc ]]
+
+local _M = {
+    version = 0.1,
+}
+
+local endpoint_dict
+local default_weight
+
+local last_fetch_full_time = 0
+local last_fetch_error
+
+local endpoint_lrucache = core.lrucache.new({
+    ttl = 300,
+    count = 1024
+})
+
+local activated_buffer = core.table.new(10, 0)
+local nodes_buffer = core.table.new(0, 5)
+
+local endpoints_pattern = core.table.concat(
+        { [[tcp(\s*-[te]\s*(\S+)){0,2}\s*-([hpHP])\s*(\S+)(\s*-[teTE]\s*(\S+))]],
+          [[{0,2}\s*-([hpHP])\s*(\S+)(\s*-[teTE]\s*(\S+)){0,2}\s*(,|$)]] }
+)
+
+
+local function update_endpoint(servant, nodes)
+    local endpoint_content = core.json.encode(nodes, true)
+    local endpoint_version = ngx.crc32_long(endpoint_content)
+    core.log.debug("set servant ", servant, endpoint_content)
+    local _, err
+    _, err = endpoint_dict:safe_set(servant .. "#version", endpoint_version)
+    if err then
+        core.log.error("set endpoint version into discovery DICT failed, ", err)
+        return
+    end
+    _, err = endpoint_dict:safe_set(servant, endpoint_content)
+    if err then
+        core.log.error("set endpoint into discovery DICT failed, ", err)
+        endpoint_dict:delete(servant .. "#version")
+    end
+end
+
+
+local function delete_endpoint(servant)
+    core.log.info("delete servant ", servant)
+    endpoint_dict:delete(servant .. "#version")
+    endpoint_dict:delete(servant)
+end
+
+
+local function create_endpoint_lrucache(servant)
+    local endpoint_content = endpoint_dict:get_stale(servant)
+    if not endpoint_content then
+        core.log.error("get empty endpoint content from discovery DICT, servant: ", servant)
+        return nil
+    end
+
+    local endpoint = core.json.decode(endpoint_content)
+    if not endpoint then
+        core.log.error("decode endpoint content failed, content: ", endpoint_content)
+        return nil
+    end
+
+    return endpoint
+end
+
+
+local function get_endpoint(servant)
+    local endpoint_version = endpoint_dict:get_stale(servant .. "#version")
+    if not endpoint_version then
+        return nil
+    end
+
+    return endpoint_lrucache(servant, endpoint_version, create_endpoint_lrucache, servant)
+end
+
+
+local function extract_endpoint(query_result)
+    for _, p in ipairs(query_result) do
+        repeat
+            local servant = p.servant
+
+            if servant == ngx.null then
+                break
+            end
+
+            if p.activated == 1 then
+                activated_buffer[servant] = ngx.null
+            elseif p.activated == 0 then
+                if activated_buffer[servant] == nil then
+                    delete_endpoint(servant)
+                end
+                break
+            end
+
+            core.table.clear(nodes_buffer)
+            local iterator = ngx.re.gmatch(p.endpoints, endpoints_pattern, "jao")
+            while true do
+                local captures, err = iterator()
+                if err then
+                    core.log.error("gmatch failed, error: ", err, " , endpoints: ", p.endpoints)
+                    break
+                end
+
+                if not captures then
+                    break
+                end
+
+                local host, port
+                if captures[3] == "h" or captures[3] == "H" then
+                    host = captures[4]
+                    port = tonumber(captures[8])
+                else
+                    host = captures[8]
+                    port = tonumber(captures[4])
+                end
+
+                core.table.insert(nodes_buffer, {
+                    host = host,
+                    port = port,
+                    weight = default_weight,
+                })
+            end
+            update_endpoint(servant, nodes_buffer)
+        until true
+    end
+end
+
+
+local function fetch_full(db_cli)
+    local res, err, errcode, sqlstate = db_cli:query(full_query_sql)
+    if not res then
+        core.log.error("query failed, error: ", err, ", ", errcode, " ", sqlstate)
+        return err
+    end
+
+    endpoint_dict:flush_all()
+    extract_endpoint(res)
+
+    while err == "again" do
+        res, err, errcode, sqlstate = db_cli:read_result()
+        if not res then
+            if err then
+                core.log.error("read result failed, error: ", err, ", ", errcode, " ", sqlstate)
+                return err
+            end
+        end
+        extract_endpoint(res)
+    end
+    endpoint_dict:flush_expired()
+end
+
+
+local function fetch_incremental(db_cli)
+    local res, err, errcode, sqlstate = db_cli:query(incremental_query_sql)
+    if not res then
+        core.log.error("query failed, error: ", err, ", ", errcode, " ", sqlstate)
+        return err
+    end
+
+    core.table.clear(activated_buffer)
+    extract_endpoint(res)
+
+    while err == "again" do
+        res, err, errcode, sqlstate = db_cli:read_result()
+        if not res then
+            if err then
+                core.log.error("read result failed, error: ", err, ", ", errcode, " ", sqlstate)
+                return err
+            end
+            return err
+        end
+        extract_endpoint(res)
+    end
+end
+
+
+local function fetch_endpoint(premature, conf)
+    if premature then
+        return
+    end
+
+    local db_cli, err = mysql:new()
+    if not db_cli then
+        core.log.error("failed to instantiate mysql: ", err)
+        return
+    end
+    db_cli:set_timeout(3000)
+
+    local ok, err, errcode, sqlstate = db_cli:connect(conf.db_conf)
+    if not ok then
+        core.log.error("failed to connect mysql: ", err, ", ", errcode, ", ", sqlstate)
+        return
+    end
+
+    local now = ngx.time()
+
+    if last_fetch_error or last_fetch_full_time + conf.full_fetch_interval <= now then
+        last_fetch_full_time = now
+        last_fetch_error = fetch_full(db_cli)
+    else
+        last_fetch_error = fetch_incremental(db_cli)
+    end
+
+    if not last_fetch_error then
+        db_cli:set_keepalive(120 * 1000, 1)
+    end

Review comment:
       Need `close` if err?

##########
File path: apisix/discovery/tars/init.lua
##########
@@ -0,0 +1,283 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local ngx = ngx
+local format = string.format
+local ipairs = ipairs
+local error = error
+local tonumber = tonumber
+local local_conf = require("apisix.core.config_local").local_conf()
+local core = require("apisix.core")
+local mysql = require("resty.mysql")
+local process = require("ngx.process")
+
+local full_query_sql = [[ select servant, group_concat(endpoint order by endpoint) as endpoints
+from t_server_conf left join t_adapter_conf tac using (application, server_name, node_name)
+where setting_state = 'active' and present_state = 'active'
+group by servant ]]
+
+local incremental_query_sql = [[
+select servant, (setting_state = 'active' and present_state = 'active') activated,
+group_concat(endpoint order by endpoint) endpoints
+from t_server_conf left join t_adapter_conf tac using (application, server_name, node_name)
+where (application, server_name) in
+(
+select application, server_name from t_server_conf
+where registry_timestamp > now() - interval %d second
+union
+select application, server_name from t_adapter_conf
+where registry_timestamp > now() - interval %d second
+)
+group by servant, activated order by activated desc ]]
+
+local _M = {
+    version = 0.1,
+}
+
+local endpoint_dict
+local default_weight
+
+local last_fetch_full_time = 0
+local last_fetch_error
+
+local endpoint_lrucache = core.lrucache.new({
+    ttl = 300,
+    count = 1024
+})
+
+local activated_buffer = core.table.new(10, 0)
+local nodes_buffer = core.table.new(0, 5)
+
+local endpoints_pattern = core.table.concat(
+        { [[tcp(\s*-[te]\s*(\S+)){0,2}\s*-([hpHP])\s*(\S+)(\s*-[teTE]\s*(\S+))]],
+          [[{0,2}\s*-([hpHP])\s*(\S+)(\s*-[teTE]\s*(\S+)){0,2}\s*(,|$)]] }
+)
+
+
+local function update_endpoint(servant, nodes)
+    local endpoint_content = core.json.encode(nodes, true)
+    local endpoint_version = ngx.crc32_long(endpoint_content)
+    core.log.debug("set servant ", servant, endpoint_content)
+    local _, err
+    _, err = endpoint_dict:safe_set(servant .. "#version", endpoint_version)
+    if err then
+        core.log.error("set endpoint version into discovery DICT failed, ", err)
+        return
+    end
+    _, err = endpoint_dict:safe_set(servant, endpoint_content)
+    if err then
+        core.log.error("set endpoint into discovery DICT failed, ", err)
+        endpoint_dict:delete(servant .. "#version")
+    end
+end
+
+
+local function delete_endpoint(servant)
+    core.log.info("delete servant ", servant)
+    endpoint_dict:delete(servant .. "#version")
+    endpoint_dict:delete(servant)
+end
+
+
+local function create_endpoint_lrucache(servant)
+    local endpoint_content = endpoint_dict:get_stale(servant)
+    if not endpoint_content then
+        core.log.error("get empty endpoint content from discovery DICT, servant: ", servant)
+        return nil
+    end
+
+    local endpoint = core.json.decode(endpoint_content)

Review comment:
       Let's get the err and put it in the msg




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org