You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by sp...@apache.org on 2022/01/05 02:52:55 UTC

[apisix] branch release/2.10 updated: feat: release 2.10.3 (#5958)

This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch release/2.10
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/release/2.10 by this push:
     new 21d7673  feat: release 2.10.3 (#5958)
21d7673 is described below

commit 21d7673c6e8ff995677456cdebc8ded5afbb3d0a
Author: Zeping Bai <bz...@apache.org>
AuthorDate: Wed Jan 5 10:52:45 2022 +0800

    feat: release 2.10.3 (#5958)
    
    Co-authored-by: kaihaojiang <ka...@tencent.com>
    Co-authored-by: 罗泽轩 <sp...@gmail.com>
    Co-authored-by: jack.fu <ja...@yijinin.com>
    Co-authored-by: sunaowei <su...@neild>
    Co-authored-by: cache-missing <90...@users.noreply.github.com>
    Co-authored-by: tzssangglass <tz...@gmail.com>
    Co-authored-by: 帅进超 <sh...@gmail.com>
    Co-authored-by: leslie <59...@users.noreply.github.com>
    Co-authored-by: jackfu <ja...@gmail.com>
    Co-authored-by: S96EA <ne...@163.com>
---
 .github/workflows/build.yml          |   1 -
 CHANGELOG.md                         |  21 +++++
 apisix/cli/ngx_tpl.lua               |   3 +
 apisix/core/version.lua              |   2 +-
 apisix/patch.lua                     |  47 ++++++++++++
 apisix/plugin.lua                    |  18 +++--
 apisix/plugins/cors.lua              |   4 +-
 apisix/plugins/ext-plugin/init.lua   | 103 +++++++++++++++++++++----
 apisix/plugins/http-logger.lua       |   3 +-
 apisix/plugins/kafka-logger.lua      |   3 +-
 apisix/plugins/proxy-rewrite.lua     |   4 +-
 apisix/plugins/sls-logger.lua        |   3 +-
 apisix/plugins/slslog/rfc5424.lua    |   9 +--
 apisix/plugins/syslog.lua            |   4 +-
 apisix/plugins/tcp-logger.lua        |   4 +-
 apisix/plugins/ua-restriction.lua    |  31 +++++++-
 apisix/plugins/udp-logger.lua        |   4 +-
 apisix/schema_def.lua                |  22 +++++-
 apisix/stream/plugins/mqtt-proxy.lua |  95 ++++++++++++++---------
 apisix/utils/log-util.lua            |  13 ++++
 conf/config-default.yaml             |   1 +
 docs/en/latest/config.json           |   2 +-
 docs/en/latest/health-check.md       |   8 +-
 docs/en/latest/how-to-build.md       |  14 ++--
 docs/zh/latest/CHANGELOG.md          |  21 +++++
 docs/zh/latest/config.json           |   2 +-
 docs/zh/latest/health-check.md       |   8 +-
 docs/zh/latest/how-to-build.md       |  14 ++--
 t/APISIX.pm                          |   1 +
 t/admin/health-check.t               | 114 +++++++++++++++-------------
 t/cli/test_admin.sh                  | 111 +++++++++++++++++++++++++++
 t/lib/ext-plugin.lua                 |   2 +-
 t/plugin/cors.t                      |  69 +++++++++++++++++
 t/plugin/ext-plugin/conf_token.t     | 143 +++++++++++++++++++++++++++++++++++
 t/plugin/ext-plugin/sanity.t         |   2 +
 t/plugin/proxy-rewrite2.t            |  33 ++++++++
 t/plugin/sls-logger.t                |  76 +++++++++++++------
 t/plugin/ua-restriction.t            |  54 +++++++++++++
 t/stream-plugin/mqtt-proxy.t         | 135 +++++++++++++++++++++++++++++++++
 39 files changed, 1017 insertions(+), 187 deletions(-)

diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index 0ea9817..eaca242 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -25,7 +25,6 @@ jobs:
           - linux_openresty
           - linux_openresty_1_17
           - linux_tengine
-          - linux_apisix_master_luarocks
           - linux_apisix_current_luarocks
           - linux_openresty_mtls
 
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 752cb0e..a6edb03 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -23,6 +23,7 @@ title: Changelog
 
 ## Table of Contents
 
+- [2.10.3](#2103)
 - [2.10.2](#2102)
 - [2.10.1](#2101)
 - [2.10.0](#2100)
@@ -48,6 +49,26 @@ title: Changelog
 - [0.7.0](#070)
 - [0.6.0](#060)
 
+## 2.10.3
+
+### Bugfix
+
+- fix: str concat in error call [#5540](https://github.com/apache/apisix/pull/5540)
+- fix: ignore changes of /apisix/plugins/ [#5558](https://github.com/apache/apisix/pull/5558)
+- fix: invalid error after passive health check is changed [#5589](https://github.com/apache/apisix/pull/5589)
+- fix(batch-processor): we didn't free stale object actually [#5700](https://github.com/apache/apisix/pull/5700)
+- fix(patch): add global `math.randomseed` patch support [#5682](https://github.com/apache/apisix/pull/5682)
+- fix(log-rotate): after enabling compression collect log exceptions [#5715](https://github.com/apache/apisix/pull/5715)
+- feat(ext-plugin): avoid sending conf request more times [#5183](https://github.com/apache/apisix/pull/5183)
+- feat: use lock to ensure fetching token from shdict always [#5263](https://github.com/apache/apisix/pull/5263)
+- fix(ext-plugin): don't use stale key [#5782](https://github.com/apache/apisix/pull/5782)
+- fix(mqtt-proxy): client id can be empty [#5816](https://github.com/apache/apisix/pull/5816)
+- fix(sls-logger): log entry unable get millisecond timestamp [#5820](https://github.com/apache/apisix/pull/5820)
+- fix(ua-restriction): refine plugin configuration check logic [#5728](https://github.com/apache/apisix/pull/5728)
+- fix(cors): compatible with scenarios where origin is modified [#5890](https://github.com/apache/apisix/pull/5890)
+- fix(proxy-rewrite): make sure proxy-rewrite update the core.request.header cache [#5914](https://github.com/apache/apisix/pull/5914)
+- fix(mqtt): handle properties for MQTT 5 [#5916](https://github.com/apache/apisix/pull/5916)
+
 ## 2.10.2
 
 ### Bugfix
diff --git a/apisix/cli/ngx_tpl.lua b/apisix/cli/ngx_tpl.lua
index 07ad5ea..6b78764 100644
--- a/apisix/cli/ngx_tpl.lua
+++ b/apisix/cli/ngx_tpl.lua
@@ -191,6 +191,9 @@ http {
     # for authz-keycloak
     lua_shared_dict access-tokens {* http.lua_shared_dict["access-tokens"] *}; # cache for service account access tokens
 
+    # for ext-plugin
+    lua_shared_dict ext-plugin {* http.lua_shared_dict["ext-plugin"] *}; # cache for ext-plugin
+
     # for custom shared dict
     {% if http.custom_lua_shared_dict then %}
     {% for cache_key, cache_size in pairs(http.custom_lua_shared_dict) do %}
diff --git a/apisix/core/version.lua b/apisix/core/version.lua
index 1761fb5..92f3f15 100644
--- a/apisix/core/version.lua
+++ b/apisix/core/version.lua
@@ -15,5 +15,5 @@
 -- limitations under the License.
 --
 return {
-    VERSION = "2.10.2"
+    VERSION = "2.10.3"
 }
diff --git a/apisix/patch.lua b/apisix/patch.lua
index 51cb14b..69506e6 100644
--- a/apisix/patch.lua
+++ b/apisix/patch.lua
@@ -20,18 +20,23 @@ local ipmatcher = require("resty.ipmatcher")
 local socket = require("socket")
 local unix_socket = require("socket.unix")
 local ssl = require("ssl")
+local ngx = ngx
 local get_phase = ngx.get_phase
 local ngx_socket = ngx.socket
 local original_tcp = ngx.socket.tcp
 local original_udp = ngx.socket.udp
 local concat_tab = table.concat
+local debug = debug
 local new_tab = require("table.new")
 local log = ngx.log
 local WARN = ngx.WARN
 local ipairs = ipairs
 local select = select
 local setmetatable = setmetatable
+local string = string
+local table = table
 local type = type
+local tonumber = tonumber
 
 
 local config_local
@@ -86,6 +91,48 @@ do
 end
 
 
+do -- `math.randomseed` patch
+    -- `math.random` generates PRND(pseudo-random numbers) from the seed set by `math.randomseed`
+    -- Many module libraries use `ngx.time` and `ngx.worker.pid` to generate seeds which may
+    -- loss randomness in container env (where pids are identical, e.g. root pid is 1)
+    -- Kubernetes may launch multi instance with deployment RS at the same time, `ngx.time` may
+    -- get same return in the pods.
+    -- Therefore, this global patch enforce entire framework to use
+    -- the best-practice PRND generates.
+
+    local resty_random = require("resty.random")
+    local math_randomseed = math.randomseed
+    local seeded = {}
+
+    -- make linter happy
+    -- luacheck: ignore
+    math.randomseed = function()
+        local worker_pid = ngx.worker.pid()
+
+        -- check seed mark
+        if seeded[worker_pid] then
+            log(ngx.DEBUG, debug.traceback("Random seed has been inited", 2))
+            return
+        end
+
+        -- generate randomseed
+        -- chose 6 from APISIX's SIX, 256 ^ 6 should do the trick
+        -- it shouldn't be large than 16 to prevent overflow.
+        local random_bytes = resty_random.bytes(6)
+        local t = {}
+
+        for i = 1, #random_bytes do
+            t[i] = string.byte(random_bytes, i)
+        end
+
+        local s = table.concat(t)
+
+        math_randomseed(tonumber(s))
+        seeded[worker_pid] = true
+    end
+end -- do
+
+
 local patch_udp_socket
 do
     local old_udp_sock_setpeername
diff --git a/apisix/plugin.lua b/apisix/plugin.lua
index 1e060cd..7139a0b 100644
--- a/apisix/plugin.lua
+++ b/apisix/plugin.lua
@@ -234,7 +234,7 @@ function _M.load(config)
         local_conf, err = core.config.local_conf(true)
         if not local_conf then
             -- the error is unrecoverable, so we need to raise it
-            error("failed to load the configuration file: ", err)
+            error("failed to load the configuration file: " .. err)
         end
 
         http_plugin_names = local_conf.plugins
@@ -245,13 +245,15 @@ function _M.load(config)
         stream_plugin_names = {}
         local plugins_conf = config.value
         -- plugins_conf can be nil when another instance writes into etcd key "/apisix/plugins/"
-        if plugins_conf then
-            for _, conf in ipairs(plugins_conf) do
-                if conf.stream then
-                    core.table.insert(stream_plugin_names, conf.name)
-                else
-                    core.table.insert(http_plugin_names, conf.name)
-                end
+        if not plugins_conf then
+            return local_plugins
+        end
+
+        for _, conf in ipairs(plugins_conf) do
+            if conf.stream then
+                core.table.insert(stream_plugin_names, conf.name)
+            else
+                core.table.insert(http_plugin_names, conf.name)
             end
         end
     end
diff --git a/apisix/plugins/cors.lua b/apisix/plugins/cors.lua
index 0c6f901..6375173 100644
--- a/apisix/plugins/cors.lua
+++ b/apisix/plugins/cors.lua
@@ -227,6 +227,8 @@ end
 
 
 function _M.rewrite(conf, ctx)
+    -- save the original request origin as it may be changed at other phase
+    ctx.original_request_origin = core.request.header(ctx, "Origin")
     if ctx.var.request_method == "OPTIONS" then
         return 200
     end
@@ -234,7 +236,7 @@ end
 
 
 function _M.header_filter(conf, ctx)
-    local req_origin = core.request.header(ctx, "Origin")
+    local req_origin =  ctx.original_request_origin
     -- Try allow_origins first, if mismatched, try allow_origins_by_regex.
     local allow_origins
     allow_origins = process_with_allow_origins(conf, ctx, req_origin)
diff --git a/apisix/plugins/ext-plugin/init.lua b/apisix/plugins/ext-plugin/init.lua
index 0e1ec2a..a8882ab 100644
--- a/apisix/plugins/ext-plugin/init.lua
+++ b/apisix/plugins/ext-plugin/init.lua
@@ -40,6 +40,7 @@ if is_http then
     ngx_pipe = require("ngx.pipe")
     events = require("resty.worker.events")
 end
+local resty_lock = require("resty.lock")
 local resty_signal = require "resty.signal"
 local bit = require("bit")
 local band = bit.band
@@ -63,11 +64,18 @@ local type = type
 
 
 local events_list
-local lrucache = core.lrucache.new({
-    type = "plugin",
-    invalid_stale = true,
-    ttl = helper.get_conf_token_cache_time(),
-})
+
+local function new_lrucache()
+    return core.lrucache.new({
+        type = "plugin",
+        invalid_stale = true,
+        ttl = helper.get_conf_token_cache_time(),
+    })
+end
+local lrucache = new_lrucache()
+
+local shdict_name = "ext-plugin"
+local shdict = ngx.shared[shdict_name]
 
 local schema = {
     type = "object",
@@ -293,14 +301,74 @@ local function handle_extra_info(ctx, input)
 end
 
 
+local function fetch_token(key)
+    if shdict then
+        return shdict:get(key)
+    else
+        core.log.error('shm "ext-plugin" not found')
+        return nil
+    end
+end
+
+
+local function store_token(key, token)
+    if shdict then
+        local exp = helper.get_conf_token_cache_time()
+        -- early expiry, lrucache in critical state sends prepare_conf_req as original behaviour
+        exp = exp * 0.9
+        local success, err, forcible = shdict:set(key, token, exp)
+        if not success then
+            core.log.error("ext-plugin:failed to set conf token, err: ", err)
+        end
+        if forcible then
+            core.log.warn("ext-plugin:set valid items forcibly overwritten")
+        end
+    else
+        core.log.error('shm "ext-plugin" not found')
+    end
+end
+
+
+local function flush_token()
+    if shdict then
+        core.log.warn("flush conf token in shared dict")
+        shdict:flush_all()
+    else
+        core.log.error('shm "ext-plugin" not found')
+    end
+end
+
+
 local rpc_call
 local rpc_handlers = {
     nil,
     function (conf, ctx, sock, unique_key)
+        local token = fetch_token(unique_key)
+        if token then
+            core.log.info("fetch token from shared dict, token: ", token)
+            return token
+        end
+
+        local lock, err = resty_lock:new(shdict_name)
+        if not lock then
+            return nil, "failed to create lock: " .. err
+        end
+
+        local elapsed, err = lock:lock("prepare_conf")
+        if not elapsed then
+            return nil, "failed to acquire the lock: " .. err
+        end
+
+        local token = fetch_token(unique_key)
+        if token then
+            lock:unlock()
+            core.log.info("fetch token from shared dict, token: ", token)
+            return token
+        end
+
         builder:Clear()
 
         local key = builder:CreateString(unique_key)
-
         local conf_vec
         if conf.conf then
             local len = #conf.conf
@@ -331,23 +399,30 @@ local rpc_handlers = {
 
         local ok, err = send(sock, constants.RPC_PREPARE_CONF, builder:Output())
         if not ok then
+            lock:unlock()
             return nil, "failed to send RPC_PREPARE_CONF: " .. err
         end
 
         local ty, resp = receive(sock)
         if ty == nil then
+            lock:unlock()
             return nil, "failed to receive RPC_PREPARE_CONF: " .. resp
         end
 
         if ty ~= constants.RPC_PREPARE_CONF then
+            lock:unlock()
             return nil, "failed to receive RPC_PREPARE_CONF: unexpected type " .. ty
         end
 
         local buf = flatbuffers.binaryArray.New(resp)
         local pcr = prepare_conf_resp.GetRootAsResp(buf, 0)
-        local token = pcr:ConfToken()
+        token = pcr:ConfToken()
 
         core.log.notice("get conf token: ", token, " conf: ", core.json.delay_encode(conf.conf))
+        store_token(unique_key, token)
+
+        lock:unlock()
+
         return token
     end,
     function (conf, ctx, sock, entry)
@@ -471,7 +546,6 @@ local rpc_handlers = {
         local buf = flatbuffers.binaryArray.New(resp)
         local call_resp = http_req_call_resp.GetRootAsResp(buf, 0)
         local action_type = call_resp:ActionType()
-
         if action_type == http_req_call_action.Stop then
             local action = call_resp:Action()
             local stop = http_req_call_stop.New()
@@ -588,15 +662,14 @@ rpc_call = function (ty, conf, ctx, ...)
 end
 
 
-local function create_lrucache()
+local function recreate_lrucache()
+    flush_token()
+
     if lrucache then
         core.log.warn("flush conf token lrucache")
     end
 
-    lrucache = core.lrucache.new({
-        type = "plugin",
-        ttl = helper.get_conf_token_cache_time(),
-    })
+    lrucache = new_lrucache()
 end
 
 
@@ -620,7 +693,7 @@ function _M.communicate(conf, ctx, plugin_name)
         end
 
         core.log.warn("refresh cache and try again")
-        create_lrucache()
+        recreate_lrucache()
     end
 
     core.log.error(err)
@@ -717,7 +790,7 @@ function _M.init_worker()
     )
 
     -- flush cache when runner exited
-    events.register(create_lrucache, events_list._source, events_list.runner_exit)
+    events.register(recreate_lrucache, events_list._source, events_list.runner_exit)
 
     -- note that the runner is run under the same user as the Nginx master
     if process.type() == "privileged agent" then
diff --git a/apisix/plugins/http-logger.lua b/apisix/plugins/http-logger.lua
index 3d2e7f1..60e41df 100644
--- a/apisix/plugins/http-logger.lua
+++ b/apisix/plugins/http-logger.lua
@@ -25,6 +25,7 @@ local plugin          = require("apisix.plugin")
 local ngx      = ngx
 local tostring = tostring
 local ipairs   = ipairs
+local pairs    = pairs
 local timer_at = ngx.timer.at
 
 local plugin_name = "http-logger"
@@ -150,7 +151,7 @@ local function remove_stale_objects(premature)
         return
     end
 
-    for key, batch in ipairs(buffers) do
+    for key, batch in pairs(buffers) do
         if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
             core.log.warn("removing batch processor stale object, conf: ",
                           core.json.delay_encode(key))
diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua
index b680bd4..45b1822 100644
--- a/apisix/plugins/kafka-logger.lua
+++ b/apisix/plugins/kafka-logger.lua
@@ -23,7 +23,6 @@ local plugin = require("apisix.plugin")
 local math     = math
 local pairs    = pairs
 local type     = type
-local ipairs   = ipairs
 local plugin_name = "kafka-logger"
 local stale_timer_running = false
 local timer_at = ngx.timer.at
@@ -138,7 +137,7 @@ local function remove_stale_objects(premature)
         return
     end
 
-    for key, batch in ipairs(buffers) do
+    for key, batch in pairs(buffers) do
         if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
             core.log.warn("removing batch processor stale object, conf: ",
                           core.json.delay_encode(key))
diff --git a/apisix/plugins/proxy-rewrite.lua b/apisix/plugins/proxy-rewrite.lua
index 397d6d0..4ad7c07 100644
--- a/apisix/plugins/proxy-rewrite.lua
+++ b/apisix/plugins/proxy-rewrite.lua
@@ -193,8 +193,8 @@ function _M.rewrite(conf, ctx)
 
     local field_cnt = #conf.headers_arr
     for i = 1, field_cnt, 2 do
-        ngx.req.set_header(conf.headers_arr[i],
-                           core.utils.resolve_var(conf.headers_arr[i+1], ctx.var))
+        core.request.set_header(ctx, conf.headers_arr[i],
+                                core.utils.resolve_var(conf.headers_arr[i+1], ctx.var))
     end
 end
 
diff --git a/apisix/plugins/sls-logger.lua b/apisix/plugins/sls-logger.lua
index daeaebb..797b85f 100644
--- a/apisix/plugins/sls-logger.lua
+++ b/apisix/plugins/sls-logger.lua
@@ -26,6 +26,7 @@ local tcp = ngx.socket.tcp
 local buffers = {}
 local tostring = tostring
 local ipairs = ipairs
+local pairs = pairs
 local table = table
 local schema = {
     type = "object",
@@ -116,7 +117,7 @@ local function remove_stale_objects(premature)
         return
     end
 
-    for key, batch in ipairs(buffers) do
+    for key, batch in pairs(buffers) do
         if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
             core.log.warn("removing batch processor stale object, route id:", tostring(key))
             buffers[key] = nil
diff --git a/apisix/plugins/slslog/rfc5424.lua b/apisix/plugins/slslog/rfc5424.lua
index ff73d19..5d09a58 100644
--- a/apisix/plugins/slslog/rfc5424.lua
+++ b/apisix/plugins/slslog/rfc5424.lua
@@ -78,16 +78,15 @@ local Severity = {
     DEBUG = LOG_DEBUG,
 }
 
-local os_date = os.date
-local ngx = ngx
-local rfc5424_timestamp_format = "!%Y-%m-%dT%H:%M:%S.000Z"
+local log_util = require("apisix.utils.log-util")
+
+
 local _M = { version = 0.1 }
 
 function _M.encode(facility, severity, hostname, appname, pid, project,
                    logstore, access_key_id, access_key_secret, msg)
     local pri = (Facility[facility] * 8 + Severity[severity])
-    ngx.update_time()
-    local t = os_date(rfc5424_timestamp_format, ngx.now())
+    local t = log_util.get_rfc3339_zulu_timestamp()
     if not hostname then
         hostname = "-"
     end
diff --git a/apisix/plugins/syslog.lua b/apisix/plugins/syslog.lua
index a3ec822..3eed59f 100644
--- a/apisix/plugins/syslog.lua
+++ b/apisix/plugins/syslog.lua
@@ -22,7 +22,7 @@ local logger_socket = require("resty.logger.socket")
 local plugin_name = "syslog"
 local ngx = ngx
 local buffers = {}
-local ipairs   = ipairs
+local pairs = pairs
 local stale_timer_running = false;
 local timer_at = ngx.timer.at
 
@@ -121,7 +121,7 @@ local function remove_stale_objects(premature)
         return
     end
 
-    for key, batch in ipairs(buffers) do
+    for key, batch in pairs(buffers) do
         if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
             core.log.warn("removing batch processor stale object, conf: ",
                           core.json.delay_encode(key))
diff --git a/apisix/plugins/tcp-logger.lua b/apisix/plugins/tcp-logger.lua
index dc4cb21..8d678b3 100644
--- a/apisix/plugins/tcp-logger.lua
+++ b/apisix/plugins/tcp-logger.lua
@@ -22,7 +22,7 @@ local tostring = tostring
 local buffers = {}
 local ngx = ngx
 local tcp = ngx.socket.tcp
-local ipairs   = ipairs
+local pairs = pairs
 local stale_timer_running = false
 local timer_at = ngx.timer.at
 
@@ -106,7 +106,7 @@ local function remove_stale_objects(premature)
         return
     end
 
-    for key, batch in ipairs(buffers) do
+    for key, batch in pairs(buffers) do
         if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
             core.log.warn("removing batch processor stale object, conf: ",
                           core.json.delay_encode(key))
diff --git a/apisix/plugins/ua-restriction.lua b/apisix/plugins/ua-restriction.lua
index 3683a15..ec74e75 100644
--- a/apisix/plugins/ua-restriction.lua
+++ b/apisix/plugins/ua-restriction.lua
@@ -16,6 +16,7 @@
 --
 local ipairs = ipairs
 local core = require("apisix.core")
+local re_compile = require("resty.core.regex").re_match_compile
 local stringx = require('pl.stringx')
 local type = type
 local str_strip = stringx.strip
@@ -36,11 +37,19 @@ local schema = {
         },
         allowlist = {
             type = "array",
-            minItems = 1
+            minItems = 1,
+            items = {
+                type = "string",
+                minLength = 1,
+            }
         },
         denylist = {
             type = "array",
-            minItems = 1
+            minItems = 1,
+            items = {
+                type = "string",
+                minLength = 1,
+            }
         },
         message = {
             type = "string",
@@ -88,6 +97,24 @@ function _M.check_schema(conf)
         return false, err
     end
 
+    if conf.allowlist then
+        for _, re_rule in ipairs(conf.allowlist) do
+            ok, err = re_compile(re_rule, "j")
+            if not ok then
+                return false, err
+            end
+        end
+    end
+
+    if conf.denylist then
+        for _, re_rule in ipairs(conf.denylist) do
+            ok, err = re_compile(re_rule, "j")
+            if not ok then
+                return false, err
+            end
+        end
+    end
+
     return true
 end
 
diff --git a/apisix/plugins/udp-logger.lua b/apisix/plugins/udp-logger.lua
index 06f903e..97e6552 100644
--- a/apisix/plugins/udp-logger.lua
+++ b/apisix/plugins/udp-logger.lua
@@ -22,7 +22,7 @@ local tostring = tostring
 local buffers = {}
 local ngx = ngx
 local udp = ngx.socket.udp
-local ipairs   = ipairs
+local pairs = pairs
 local stale_timer_running = false;
 local timer_at = ngx.timer.at
 
@@ -90,7 +90,7 @@ local function remove_stale_objects(premature)
         return
     end
 
-    for key, batch in ipairs(buffers) do
+    for key, batch in pairs(buffers) do
         if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
             core.log.warn("removing batch processor stale object, conf: ",
                           core.json.delay_encode(key))
diff --git a/apisix/schema_def.lua b/apisix/schema_def.lua
index adddf15..62b0836 100644
--- a/apisix/schema_def.lua
+++ b/apisix/schema_def.lua
@@ -237,7 +237,7 @@ local health_checker = {
                         },
                         successes = {
                             type = "integer",
-                            minimum = 1,
+                            minimum = 0,
                             maximum = 254,
                             default = 5
                         }
@@ -259,24 +259,38 @@ local health_checker = {
                         },
                         tcp_failures = {
                             type = "integer",
-                            minimum = 1,
+                            minimum = 0,
                             maximum = 254,
                             default = 2
                         },
                         timeouts = {
                             type = "integer",
-                            minimum = 1,
+                            minimum = 0,
                             maximum = 254,
                             default = 7
                         },
                         http_failures = {
                             type = "integer",
-                            minimum = 1,
+                            minimum = 0,
                             maximum = 254,
                             default = 5
                         },
                     }
                 }
+            },
+            default = {
+                type = "http",
+                healthy = {
+                    http_statuses = { 200, 201, 202, 203, 204, 205, 206, 207, 208, 226,
+                                      300, 301, 302, 303, 304, 305, 306, 307, 308 },
+                    successes = 0,
+                },
+                unhealthy = {
+                    http_statuses = { 429, 500, 503 },
+                    tcp_failures = 0,
+                    timeouts = 0,
+                    http_failures = 0,
+                },
             }
         }
     },
diff --git a/apisix/stream/plugins/mqtt-proxy.lua b/apisix/stream/plugins/mqtt-proxy.lua
index f5df120..e829388 100644
--- a/apisix/stream/plugins/mqtt-proxy.lua
+++ b/apisix/stream/plugins/mqtt-proxy.lua
@@ -67,27 +67,39 @@ function _M.check_schema(conf)
 end
 
 
-local function parse_mqtt(data)
-    local res = {}
-    res.packet_type_flags_byte = str_byte(data, 1, 1)
-    if res.packet_type_flags_byte < 16 or res.packet_type_flags_byte > 32 then
-        return nil, "Received unexpected MQTT packet type+flags: "
-                    .. res.packet_type_flags_byte
-    end
-
-    local parsed_pos = 1
-    res.remaining_len = 0
+local function decode_variable_byte_int(data, offset)
     local multiplier = 1
-    for i = 2, 5 do
-        parsed_pos = i
+    local len = 0
+    local pos
+    for i = offset, offset + 3 do
+        pos = i
         local byte = str_byte(data, i, i)
-        res.remaining_len = res.remaining_len + bit.band(byte, 127) * multiplier
+        len = len + bit.band(byte, 127) * multiplier
         multiplier = multiplier * 128
         if bit.band(byte, 128) == 0 then
             break
         end
     end
 
+    return len, pos
+end
+
+
+local function parse_msg_hdr(data)
+    local packet_type_flags_byte = str_byte(data, 1, 1)
+    if packet_type_flags_byte < 16 or packet_type_flags_byte > 32 then
+        return nil, nil,
+            "Received unexpected MQTT packet type+flags: " .. packet_type_flags_byte
+    end
+
+    local len, pos = decode_variable_byte_int(data, 2)
+    return len, pos
+end
+
+
+local function parse_mqtt(data, parsed_pos)
+    local res = {}
+
     local protocol_len = str_byte(data, parsed_pos + 1, parsed_pos + 1) * 256
                          + str_byte(data, parsed_pos + 2, parsed_pos + 2)
     parsed_pos = parsed_pos + 2
@@ -96,10 +108,15 @@ local function parse_mqtt(data)
 
     res.protocol_ver = str_byte(data, parsed_pos + 1, parsed_pos + 1)
     parsed_pos = parsed_pos + 1
-    if res.protocol_ver == 4 then
-        parsed_pos = parsed_pos + 3
-    elseif res.protocol_ver == 5 then
-        parsed_pos = parsed_pos + 9
+
+    -- skip control flags & keepalive
+    parsed_pos = parsed_pos + 3
+
+    if res.protocol_ver == 5 then
+        -- skip properties
+        local property_len
+        property_len, parsed_pos = decode_variable_byte_int(data, parsed_pos + 1)
+        parsed_pos = parsed_pos + property_len
     end
 
     local client_id_len = str_byte(data, parsed_pos + 1, parsed_pos + 1) * 256
@@ -111,7 +128,13 @@ local function parse_mqtt(data)
         return res
     end
 
-    res.client_id = str_sub(data, parsed_pos + 1, parsed_pos + client_id_len)
+    if client_id_len == 0 then
+        -- A Server MAY allow a Client to supply a ClientID that has a length of zero bytes
+        res.client_id = ""
+    else
+        res.client_id = str_sub(data, parsed_pos + 1, parsed_pos + client_id_len)
+    end
+
     parsed_pos = parsed_pos + client_id_len
 
     res.expect_len = parsed_pos
@@ -120,34 +143,32 @@ end
 
 
 function _M.preread(conf, ctx)
-    core.log.warn("plugin rewrite phase, conf: ", core.json.encode(conf))
-    -- core.log.warn(" ctx: ", core.json.encode(ctx, true))
     local sock = ngx.req.socket()
-    local data, err = sock:peek(16)
+    -- the header format of MQTT CONNECT can be found in
+    -- https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901033
+    local data, err = sock:peek(5)
     if not data then
-        core.log.error("failed to read first 16 bytes: ", err)
+        core.log.error("failed to read the msg header: ", err)
         return 503
     end
 
-    local res, err = parse_mqtt(data)
-    if not res then
-        core.log.error("failed to parse the first 16 bytes: ", err)
+    local remain_len, pos, err = parse_msg_hdr(data)
+    if not remain_len then
+        core.log.error("failed to parse the msg header: ", err)
         return 503
     end
 
-    if res.expect_len > #data then
-        data, err = sock:peek(res.expect_len)
-        if not data then
-            core.log.error("failed to read ", res.expect_len, " bytes: ", err)
-            return 503
-        end
+    local data, err = sock:peek(pos + remain_len)
+    if not data then
+        core.log.error("failed to read the Connect Command: ", err)
+        return 503
+    end
 
-        res = parse_mqtt(data)
-        if res.expect_len > #data then
-            core.log.error("failed to parse mqtt request, expect len: ",
-                           res.expect_len, " but got ", #data)
-            return 503
-        end
+    local res = parse_mqtt(data, pos)
+    if res.expect_len > #data then
+        core.log.error("failed to parse mqtt request, expect len: ",
+                        res.expect_len, " but got ", #data)
+        return 503
     end
 
     if res.protocol and res.protocol ~= conf.protocol_name then
diff --git a/apisix/utils/log-util.lua b/apisix/utils/log-util.lua
index cf3fc22..ebdc3dd 100644
--- a/apisix/utils/log-util.lua
+++ b/apisix/utils/log-util.lua
@@ -17,7 +17,11 @@
 local core = require("apisix.core")
 local ngx  = ngx
 local pairs = pairs
+local ngx_now = ngx.now
+local os_date = os.date
 local str_byte = string.byte
+local math_floor = math.floor
+local ngx_update_time = ngx.update_time
 local req_get_body_data = ngx.req.get_body_data
 
 local lru_log_format = core.lrucache.new({
@@ -153,4 +157,13 @@ function _M.get_req_original(ctx, conf)
 end
 
 
+function _M.get_rfc3339_zulu_timestamp(timestamp)
+    ngx_update_time()
+    local now = timestamp or ngx_now()
+    local second = math_floor(now)
+    local millisecond = math_floor((now - second) * 1000)
+    return os_date("!%Y-%m-%dT%T.", second) .. core.string.format("%03dZ", millisecond)
+end
+
+
 return _M
diff --git a/conf/config-default.yaml b/conf/config-default.yaml
index 5232a28..bf20942 100644
--- a/conf/config-default.yaml
+++ b/conf/config-default.yaml
@@ -253,6 +253,7 @@ nginx_config:                     # config for render the template to generate n
       jwks: 1m
       introspection: 10m
       access-tokens: 1m
+      ext-plugin: 1m
 
 etcd:
   host:                           # it's possible to define multiple etcd hosts addresses of the same etcd cluster.
diff --git a/docs/en/latest/config.json b/docs/en/latest/config.json
index 3c2105b..cfc1759 100644
--- a/docs/en/latest/config.json
+++ b/docs/en/latest/config.json
@@ -1,5 +1,5 @@
 {
-  "version": "2.10.2",
+  "version": "2.10.3",
   "sidebar": [
     {
       "type": "category",
diff --git a/docs/en/latest/health-check.md b/docs/en/latest/health-check.md
index dc53096..850a96e 100644
--- a/docs/en/latest/health-check.md
+++ b/docs/en/latest/health-check.md
@@ -55,11 +55,11 @@ it whether this unique node is healthy or not.
 | upstream.checks.active.unhealthy.tcp_failures   | Active check (unhealthy node)  | integer    | `1` to `254`         | 2                                                                                             | Active check (unhealthy node) TCP type check, determine the number of times that the node is not healthy.            |
 | upstream.checks.active.unhealthy.timeouts       | Active check (unhealthy node)  | integer    | `1` to `254`         | 3                                                                                             | Active check (unhealthy node) to determine the number of timeouts for unhealthy nodes.                              |
 | upstream.checks.passive.healthy.http_statuses   | Passive check (healthy node)   | array      | `200` to `599`       | [200, 201, 202, 203, 204, 205, 206, 207, 208, 226, 300, 301, 302, 303, 304, 305, 306, 307, 308] | Passive check (healthy node) HTTP or HTTPS type check, the HTTP status code of the healthy node.                     |
-| upstream.checks.passive.healthy.successes       | Passive check (healthy node)   | integer    | `1` to `254`         | 5                                                                                             | Passive checks (healthy node) determine the number of times a node is healthy.                                       |
+| upstream.checks.passive.healthy.successes       | Passive check (healthy node)   | integer    | `0` to `254`         | 5                                                                                             | Passive checks (healthy node) determine the number of times a node is healthy.                                       |
 | upstream.checks.passive.unhealthy.http_statuses | Passive check (unhealthy node) | array      | `200` to `599`       | [429, 500, 503]                                                                               | Passive check (unhealthy node) HTTP or HTTPS type check, the HTTP status code of the non-healthy node.               |
-| upstream.checks.passive.unhealthy.tcp_failures  | Passive check (unhealthy node) | integer    | `1` to `254`         | 2                                                                                             | Passive check (unhealthy node) When TCP type is checked, determine the number of times that the node is not healthy. |
-| upstream.checks.passive.unhealthy.timeouts      | Passive check (unhealthy node) | integer    | `1` to `254`         | 7                                                                                             | Passive checks (unhealthy node) determine the number of timeouts for unhealthy nodes.                                |
-| upstream.checks.passive.unhealthy.http_failures | Passive check (unhealthy node) | integer    | `1` to `254`         | 5                                                                                             | Passive check (unhealthy node) The number of times that the node is not healthy during HTTP or HTTPS type checking.  |
+| upstream.checks.passive.unhealthy.tcp_failures  | Passive check (unhealthy node) | integer    | `0` to `254`         | 2                                                                                             | Passive check (unhealthy node) When TCP type is checked, determine the number of times that the node is not healthy. |
+| upstream.checks.passive.unhealthy.timeouts      | Passive check (unhealthy node) | integer    | `0` to `254`         | 7                                                                                             | Passive checks (unhealthy node) determine the number of timeouts for unhealthy nodes.                                |
+| upstream.checks.passive.unhealthy.http_failures | Passive check (unhealthy node) | integer    | `0` to `254`         | 5                                                                                             | Passive check (unhealthy node) The number of times that the node is not healthy during HTTP or HTTPS type checking.  |
 
 ### Configuration example
 
diff --git a/docs/en/latest/how-to-build.md b/docs/en/latest/how-to-build.md
index ff8ea58..2a45f0a 100644
--- a/docs/en/latest/how-to-build.md
+++ b/docs/en/latest/how-to-build.md
@@ -58,7 +58,7 @@ sudo yum install -y https://repos.apiseven.com/packages/centos/apache-apisix-rep
 This installation method is suitable for CentOS 7, please run the following command to install Apache APISIX.
 
 ```shell
-sudo yum install -y https://repos.apiseven.com/packages/centos/7/x86_64/apisix-2.10.2-0.el7.x86_64.rpm
+sudo yum install -y https://repos.apiseven.com/packages/centos/7/x86_64/apisix-2.10.3-0.el7.x86_64.rpm
 ```
 
 ### Installation via Docker
@@ -71,16 +71,16 @@ Please refer to: [Installing Apache APISIX with Helm Chart](https://github.com/a
 
 ### Installation via Source Release Package
 
-1. Create a directory named `apisix-2.10.2`.
+1. Create a directory named `apisix-2.10.3`.
 
   ```shell
-  mkdir apisix-2.10.2
+  mkdir apisix-2.10.3
   ```
 
 2. Download Apache APISIX Release source package.
 
   ```shell
-  wget https://downloads.apache.org/apisix/2.10.2/apache-apisix-2.10.2-src.tgz
+  wget https://downloads.apache.org/apisix/2.10.3/apache-apisix-2.10.3-src.tgz
   ```
 
   You can also download the Apache APISIX Release source package from the Apache APISIX website. The [Apache APISIX Official Website - Download Page](https://apisix.apache.org/downloads/) also provides source packages for Apache APISIX, APISIX Dashboard and APISIX Ingress Controller.
@@ -88,14 +88,14 @@ Please refer to: [Installing Apache APISIX with Helm Chart](https://github.com/a
 3. Unzip the Apache APISIX Release source package.
 
   ```shell
-  tar zxvf apache-apisix-2.10.2-src.tgz -C apisix-2.10.2
+  tar zxvf apache-apisix-2.10.3-src.tgz -C apisix-2.10.3
   ```
 
 4. Install the runtime dependent Lua libraries.
 
   ```shell
-  # Switch to the apisix-2.10.2 directory
-  cd apisix-2.10.2
+  # Switch to the apisix-2.10.3 directory
+  cd apisix-2.10.3
   # Create dependencies
   make deps
   # Install apisix command
diff --git a/docs/zh/latest/CHANGELOG.md b/docs/zh/latest/CHANGELOG.md
index 031cd23..1617b4e 100644
--- a/docs/zh/latest/CHANGELOG.md
+++ b/docs/zh/latest/CHANGELOG.md
@@ -23,6 +23,7 @@ title: CHANGELOG
 
 ## Table of Contents
 
+- [2.10.3](#2103)
 - [2.10.2](#2102)
 - [2.10.1](#2101)
 - [2.10.0](#2100)
@@ -48,6 +49,26 @@ title: CHANGELOG
 - [0.7.0](#070)
 - [0.6.0](#060)
 
+## 2.10.3
+
+### Bugfix
+
+- 修正插件加载错误日志拼接方式 [#5540](https://github.com/apache/apisix/pull/5540)
+- 修正插件加载时忽略 /apisix/plugins/ 改变 [#5558](https://github.com/apache/apisix/pull/5558)
+- 修正改变被动健康检查后的无效错误 [#5589](https://github.com/apache/apisix/pull/5589)
+- 修正 batch-processor 中未被正确释放的陈旧对象 [#5700](https://github.com/apache/apisix/pull/5700)
+- 通过 math.randomseed 补丁提供更具随机性的随机数生成 [#5682](https://github.com/apache/apisix/pull/5682)
+- 修正 log-rotate 插件开启压缩后的日志收集异常 [#5715](https://github.com/apache/apisix/pull/5715)
+- 避免 ext-plugin 多次发送配置数据 [#5183](https://github.com/apache/apisix/pull/5183)
+- 在 ext-plugin 中使用锁确保总是从共享存储中获取 token [#5263](https://github.com/apache/apisix/pull/5263)
+- 修正 ext-plugin 使用陈旧的 key [#5782](https://github.com/apache/apisix/pull/5782)
+- 允许在使用 mqtt-proxy 插件时 client id 为空的情况 [#5816](https://github.com/apache/apisix/pull/5816)
+- 修正 sls-logger 插件中日志不支持毫秒级时间戳 [#5820](https://github.com/apache/apisix/pull/5820)
+- 完善 ua-restriction 插件中配置检查逻辑 [#5728](https://github.com/apache/apisix/pull/5728)
+- 在 cors 插件中防止 Origin 被修改 [#5890](https://github.com/apache/apisix/pull/5890)
+- 确保使用 proxy-rewrite 插件时会更新 core.request.header 的缓存 [#5914](https://github.com/apache/apisix/pull/5914)
+- 修正 mqtt-proxy 插件中对 MQTT5 的属性处理 [#5916](https://github.com/apache/apisix/pull/5916)
+
 ## 2.10.2
 
 ### Bugfix
diff --git a/docs/zh/latest/config.json b/docs/zh/latest/config.json
index 2479f81..4203546 100644
--- a/docs/zh/latest/config.json
+++ b/docs/zh/latest/config.json
@@ -1,5 +1,5 @@
 {
-  "version": "2.10.2",
+  "version": "2.10.3",
   "sidebar": [
     {
       "type": "category",
diff --git a/docs/zh/latest/health-check.md b/docs/zh/latest/health-check.md
index 4e3f992..a6d2ecd 100644
--- a/docs/zh/latest/health-check.md
+++ b/docs/zh/latest/health-check.md
@@ -53,11 +53,11 @@ Apache APISIX 的健康检查使用 [lua-resty-healthcheck](https://github.com/K
 | upstream.checks.active.unhealthy.tcp_failures   | 主动检查(非健康节点) | integer | `1` 至 `254`        | 2                                                                                             | 主动检查(非健康节点)TCP 类型检查时,确定节点非健康的次数。 |
 | upstream.checks.active.unhealthy.timeouts       | 主动检查(非健康节点) | integer | `1` 至 `254`        | 3                                                                                             | 主动检查(非健康节点)确定节点非健康的超时次数。  |
 | upstream.checks.passive.healthy.http_statuses   | 被动检查(健康节点) | array   | `200` 至 `599`      | [200, 201, 202, 203, 204, 205, 206, 207, 208, 226, 300, 301, 302, 303, 304, 305, 306, 307, 308] | 被动检查(健康节点) HTTP 或 HTTPS 类型检查时,健康节点的HTTP状态码。 |
-| upstream.checks.passive.healthy.successes       | 被动检查(健康节点) | integer | `1` 至 `254`        | 5                                                                                             | 被动检查(健康节点)确定节点健康的次数。              |
+| upstream.checks.passive.healthy.successes       | 被动检查(健康节点) | integer | `0` 至 `254`        | 5                                                                                             | 被动检查(健康节点)确定节点健康的次数。              |
 | upstream.checks.passive.unhealthy.http_statuses | 被动检查(非健康节点) | array   | `200` 至 `599`      | [429, 500, 503]                                                                               | 被动检查(非健康节点) HTTP 或 HTTPS 类型检查时,非健康节点的HTTP状态码。 |
-| upstream.checks.passive.unhealthy.tcp_failures  | 被动检查(非健康节点) | integer | `1` 至 `254`        | 2                                                                                             | 被动检查(非健康节点)TCP 类型检查时,确定节点非健康的次数。 |
-| upstream.checks.passive.unhealthy.timeouts      | 被动检查(非健康节点) | integer | `1` 至 `254`        | 7                                                                                             | 被动检查(非健康节点)确定节点非健康的超时次数。  |
-| upstream.checks.passive.unhealthy.http_failures | 被动检查(非健康节点) | integer | `1` 至 `254`        | 5                                                                                             | 被动检查(非健康节点)HTTP 或 HTTPS 类型检查时,确定节点非健康的次数。 |
+| upstream.checks.passive.unhealthy.tcp_failures  | 被动检查(非健康节点) | integer | `0` 至 `254`        | 2                                                                                             | 被动检查(非健康节点)TCP 类型检查时,确定节点非健康的次数。 |
+| upstream.checks.passive.unhealthy.timeouts      | 被动检查(非健康节点) | integer | `0` 至 `254`        | 7                                                                                             | 被动检查(非健康节点)确定节点非健康的超时次数。  |
+| upstream.checks.passive.unhealthy.http_failures | 被动检查(非健康节点) | integer | `0` 至 `254`        | 5                                                                                             | 被动检查(非健康节点)HTTP 或 HTTPS 类型检查时,确定节点非健康的次数。 |
 
 ### 配置示例:
 
diff --git a/docs/zh/latest/how-to-build.md b/docs/zh/latest/how-to-build.md
index 738f737..96856d3 100644
--- a/docs/zh/latest/how-to-build.md
+++ b/docs/zh/latest/how-to-build.md
@@ -58,7 +58,7 @@ sudo yum install -y https://repos.apiseven.com/packages/centos/apache-apisix-rep
 这种安装方式适用于 CentOS 7 操作系统,请运行以下命令安装 Apache APISIX。
 
 ```shell
-sudo yum install -y https://repos.apiseven.com/packages/centos/7/x86_64/apisix-2.10.2-0.el7.x86_64.rpm
+sudo yum install -y https://repos.apiseven.com/packages/centos/7/x86_64/apisix-2.10.3-0.el7.x86_64.rpm
 ```
 
 ### 通过 Docker 安装
@@ -71,16 +71,16 @@ sudo yum install -y https://repos.apiseven.com/packages/centos/7/x86_64/apisix-2
 
 ### 通过源码包安装
 
-1. 创建一个名为 `apisix-2.10.2` 的目录。
+1. 创建一个名为 `apisix-2.10.3` 的目录。
 
   ```shell
-  mkdir apisix-2.10.2
+  mkdir apisix-2.10.3
   ```
 
 2. 下载 Apache APISIX Release 源码包:
 
   ```shell
-  wget https://downloads.apache.org/apisix/2.10.2/apache-apisix-2.10.2-src.tgz
+  wget https://downloads.apache.org/apisix/2.10.3/apache-apisix-2.10.3-src.tgz
   ```
 
   您也可以通过 Apache APISIX 官网下载 Apache APISIX Release 源码包。 Apache APISIX 官网也提供了 Apache APISIX、APISIX Dashboard 和 APISIX Ingress Controller 的源码包,详情请参考[Apache APISIX 官网-下载页](https://apisix.apache.org/zh/downloads)。
@@ -88,14 +88,14 @@ sudo yum install -y https://repos.apiseven.com/packages/centos/7/x86_64/apisix-2
 3. 解压 Apache APISIX Release 源码包:
 
   ```shell
-  tar zxvf apache-apisix-2.10.2-src.tgz -C apisix-2.10.2
+  tar zxvf apache-apisix-2.10.3-src.tgz -C apisix-2.10.3
   ```
 
 4. 安装运行时依赖的 Lua 库:
 
   ```shell
-  # 切换到 apisix-2.10.2 目录
-  cd apisix-2.10.2
+  # 切换到 apisix-2.10.3 目录
+  cd apisix-2.10.3
   # 安装依赖
   LUAROCKS_SERVER=https://luarocks.cn make deps
   # 安装 apisix 命令
diff --git a/t/APISIX.pm b/t/APISIX.pm
index 5b960cc..83e28c8 100644
--- a/t/APISIX.pm
+++ b/t/APISIX.pm
@@ -445,6 +445,7 @@ _EOC_
     lua_shared_dict plugin-api-breaker 10m;
     lua_capture_error_log 1m;    # plugin error-log-logger
     lua_shared_dict etcd-cluster-health-check 10m; # etcd health check
+    lua_shared_dict ext-plugin 1m;
 
     proxy_ssl_name \$upstream_host;
     proxy_ssl_server_name on;
diff --git a/t/admin/health-check.t b/t/admin/health-check.t
index 22d0e5e..f4246f3 100644
--- a/t/admin/health-check.t
+++ b/t/admin/health-check.t
@@ -52,6 +52,15 @@ add_block_preprocessor(sub {
 _EOC_
 
     $block->set_value("init_by_lua_block", $init_by_lua_block);
+
+    if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
+        $block->set_value("no_error_log", "[error]");
+    }
+
+    if (!defined $block->request) {
+        $block->set_value("request", "GET /t");
+    }
+
 });
 
 run_tests;
@@ -90,12 +99,8 @@ __DATA__
             ngx.say(body)
         }
     }
---- request
-GET /t
 --- response_body
 passed
---- no_error_log
-[error]
 
 
 
@@ -137,12 +142,8 @@ passed
             ngx.say(body)
         }
     }
---- request
-GET /t
 --- response_body
 passed
---- no_error_log
-[error]
 
 
 
@@ -166,13 +167,9 @@ passed
             ngx.print(body)
         }
     }
---- request
-GET /t
 --- error_code: 400
 --- response_body
 {"error_msg":"invalid configuration: property \"upstream\" validation failed: property \"checks\" validation failed: property \"active\" validation failed: property \"healthy\" validation failed: property \"successes\" validation failed: expected 255 to be smaller than 254"}
---- no_error_log
-[error]
 
 
 
@@ -196,13 +193,9 @@ GET /t
             ngx.print(body)
         }
     }
---- request
-GET /t
 --- error_code: 400
 --- response_body
 {"error_msg":"invalid configuration: property \"upstream\" validation failed: property \"checks\" validation failed: property \"active\" validation failed: property \"healthy\" validation failed: property \"successes\" validation failed: expected 0 to be greater than 1"}
---- no_error_log
-[error]
 
 
 
@@ -226,13 +219,9 @@ GET /t
             ngx.print(body)
         }
     }
---- request
-GET /t
 --- error_code: 400
 --- response_body
 {"error_msg":"invalid configuration: property \"upstream\" validation failed: property \"checks\" validation failed: property \"passive\" validation failed: property \"unhealthy\" validation failed: property \"http_statuses\" validation failed: failed to validate item 2: expected 600 to be smaller than 599"}
---- no_error_log
-[error]
 
 
 
@@ -254,13 +243,9 @@ GET /t
             ngx.print(body)
         }
     }
---- request
-GET /t
 --- error_code: 400
 --- response_body
 {"error_msg":"invalid configuration: property \"upstream\" validation failed: property \"checks\" validation failed: property \"active\" validation failed: property \"type\" validation failed: matches none of the enum values"}
---- no_error_log
-[error]
 
 
 
@@ -284,13 +269,9 @@ GET /t
             ngx.print(body)
         }
     }
---- request
-GET /t
 --- error_code: 400
 --- response_body
 {"error_msg":"invalid configuration: property \"upstream\" validation failed: property \"checks\" validation failed: property \"active\" validation failed: property \"healthy\" validation failed: property \"http_statuses\" validation failed: expected unique items but items 1 and 2 are equal"}
---- no_error_log
-[error]
 
 
 
@@ -314,13 +295,9 @@ GET /t
             ngx.print(body)
         }
     }
---- request
-GET /t
 --- error_code: 400
 --- response_body
 {"error_msg":"invalid configuration: property \"upstream\" validation failed: property \"checks\" validation failed: property \"active\" validation failed: property \"unhealthy\" validation failed: property \"http_failures\" validation failed: wrong type: expected integer, got number"}
---- no_error_log
-[error]
 
 
 
@@ -353,12 +330,8 @@ GET /t
             ngx.say(body)
         }
     }
---- request
-GET /t
 --- response_body
 passed
---- no_error_log
-[error]
 
 
 
@@ -391,12 +364,8 @@ passed
             ngx.say(body)
         }
     }
---- request
-GET /t
 --- response_body
 passed
---- no_error_log
-[error]
 
 
 
@@ -429,13 +398,9 @@ passed
             ngx.print(body)
         }
     }
---- request
-GET /t
 --- error_code: 400
 --- response_body
 {"error_msg":"invalid configuration: property \"upstream\" validation failed: property \"checks\" validation failed: property \"active\" validation failed: property \"req_headers\" validation failed: failed to validate item 2: wrong type: expected string, got number"}
---- no_error_log
-[error]
 
 
 
@@ -469,17 +434,64 @@ GET /t
             ngx.print(body)
         }
     }
---- request
-GET /t
 --- error_code: 400
 --- response_body
 {"error_msg":"invalid configuration: property \"upstream\" validation failed: property \"checks\" validation failed: object matches none of the requireds: [\"active\"] or [\"active\",\"passive\"]"}
---- no_error_log
-[error]
 
 
 
-=== TEST 13: number type timeout
+=== TEST 13: only active
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+
+            req_data.upstream.checks = json.decode([[{
+                "active": {
+                    "http_path": "/status",
+                    "host": "foo.com",
+                    "healthy": {
+                        "interval": 2,
+                        "successes": 1
+                    },
+                    "unhealthy": {
+                        "interval": 1,
+                        "http_failures": 2
+                    }
+                }
+            }]])
+            exp_data.node.value.upstream.checks.active = req_data.upstream.checks.active
+            exp_data.node.value.upstream.checks.passive = {
+                type = "http",
+                healthy = {
+                    http_statuses = { 200, 201, 202, 203, 204, 205, 206, 207, 208, 226,
+                                      300, 301, 302, 303, 304, 305, 306, 307, 308 },
+                    successes = 0,
+                },
+                unhealthy = {
+                    http_statuses = { 429, 500, 503 },
+                    tcp_failures = 0,
+                    timeouts = 0,
+                    http_failures = 0,
+                }
+            }
+
+            local code, body = t('/apisix/admin/routes/1',
+                ngx.HTTP_PUT,
+                req_data,
+                exp_data
+            )
+
+            ngx.status = code
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 14: number type timeout
 --- config
     location /t {
         content_by_lua_block {
@@ -512,9 +524,5 @@ GET /t
             ngx.say(body)
         }
     }
---- request
-GET /t
 --- response_body
 passed
---- no_error_log
-[error]
diff --git a/t/cli/test_admin.sh b/t/cli/test_admin.sh
index ac691b1..ecfbffc 100755
--- a/t/cli/test_admin.sh
+++ b/t/cli/test_admin.sh
@@ -229,3 +229,114 @@ fi
 make stop
 
 echo "pass: sync /apisix/plugins from etcd when disabling admin successfully"
+
+
+
+# ignore changes to /apisix/plugins/ due to init_etcd
+echo '
+apisix:
+  enable_admin: false
+plugins:
+  - node-status
+nginx_config:
+  error_log_level:  info
+' > conf/config.yaml
+
+rm logs/error.log
+make init
+make run
+
+# first time check node status api
+code=$(curl -v -k -i -m 20 -o /dev/null -s -w %{http_code} http://127.0.0.1:9080/apisix/status)
+if [ ! $code -eq 200 ]; then
+    echo "failed: first time check node status api failed"
+    exit 1
+fi
+
+# mock another instance init etcd dir
+make init
+sleep 1
+
+# second time check node status api
+code=$(curl -v -k -i -m 20 -o /dev/null -s -w %{http_code} http://127.0.0.1:9080/apisix/status)
+if [ ! $code -eq 200 ]; then
+    echo "failed: second time check node status api failed"
+    exit 1
+fi
+
+make stop
+
+echo "pass: ignore changes to /apisix/plugins/ due to init_etcd successfully"
+
+
+# accept changes to /apisix/plugins when enable_admin is false
+echo '
+apisix:
+  enable_admin: false
+plugins:
+  - node-status
+stream_plugins:
+' > conf/config.yaml
+
+rm logs/error.log
+make init
+make run
+
+# first time check node status api
+code=$(curl -v -k -i -m 20 -o /dev/null -s -w %{http_code} http://127.0.0.1:9080/apisix/status)
+if [ ! $code -eq 200 ]; then
+    echo "failed: first time check node status api failed"
+    exit 1
+fi
+
+sleep 0.5
+
+# check http plugins load list
+if ! grep -E 'new plugins: {"node-status":true}' logs/error.log; then
+    echo "failed: first time load http plugins list failed"
+    exit 1
+fi
+
+# check stream plugins(no plugins under stream, it will be added below)
+if ! grep -E 'failed to read stream plugin list from local file' logs/error.log; then
+    echo "failed: first time load stream plugins list failed"
+    exit 1
+fi
+
+# mock another instance add /apisix/plugins
+res=$(etcdctl put "/apisix/plugins" '[{"name":"node-status"},{"name":"example-plugin"},{"stream":true,"name":"mqtt-proxy"}]')
+if [[ $res != "OK" ]]; then
+    echo "failed: failed to set /apisix/plugins to add more plugins"
+    exit 1
+fi
+
+sleep 0.5
+
+# second time check node status api
+code=$(curl -v -k -i -m 20 -o /dev/null -s -w %{http_code} http://127.0.0.1:9080/apisix/status)
+if [ ! $code -eq 200 ]; then
+    echo "failed: second time check node status api failed"
+    exit 1
+fi
+
+# check http plugins load list
+if ! grep -E 'new plugins: {"node-status":true}' logs/error.log; then
+    echo "failed: second time load http plugins list failed"
+    exit 1
+fi
+
+# check stream plugins load list
+if ! grep -E 'new plugins: {.*example-plugin' logs/error.log; then
+    echo "failed: second time load stream plugins list failed"
+    exit 1
+fi
+
+
+if grep -E 'new plugins: {}' logs/error.log; then
+    echo "failed: second time load plugins list failed"
+    exit 1
+fi
+
+make stop
+
+echo "pass: ccept changes to /apisix/plugins successfully"
diff --git a/t/lib/ext-plugin.lua b/t/lib/ext-plugin.lua
index c005c80..f38951a 100644
--- a/t/lib/ext-plugin.lua
+++ b/t/lib/ext-plugin.lua
@@ -54,7 +54,7 @@ end
 
 
 function _M.go(case)
-    local sock = ngx.req.socket()
+    local sock = ngx.req.socket(true)
     local ty, data = ext.receive(sock)
     if not ty then
         ngx.log(ngx.ERR, data)
diff --git a/t/plugin/cors.t b/t/plugin/cors.t
index eef790d..072dee1 100644
--- a/t/plugin/cors.t
+++ b/t/plugin/cors.t
@@ -927,3 +927,72 @@ Access-Control-Max-Age:
 Access-Control-Allow-Credentials:
 --- no_error_log
 [error]
+
+
+
+=== TEST 34: origin was modified by the proxy_rewrite plugin
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "plugins": {
+                        "cors": {
+                            "allow_origins": "http://sub.domain.com",
+                            "allow_methods": "GET,POST",
+                            "allow_headers": "headr1,headr2",
+                            "expose_headers": "ex-headr1,ex-headr2",
+                            "max_age": 50,
+                            "allow_credential": true
+                        },
+                        "proxy-rewrite": {
+                            "headers": {
+                                  "Origin": "http://example.com"
+                            }
+                        }
+                    },
+                    "upstream": {
+                        "nodes": {
+                            "127.0.0.1:1980": 1
+                        },
+                        "type": "roundrobin"
+                    },
+                    "uri": "/hello"
+                }]]
+                )
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+--- no_error_log
+[error]
+
+
+
+=== TEST 35: origin is not affected by proxy_rewrite plugins
+--- request
+GET /hello HTTP/1.1
+--- more_headers
+Origin: http://sub.domain.com
+resp-vary: Via
+--- response_body
+hello world
+--- response_headers
+Access-Control-Allow-Origin: http://sub.domain.com
+Vary: Via, Origin
+Access-Control-Allow-Methods: GET,POST
+Access-Control-Allow-Headers: headr1,headr2
+Access-Control-Expose-Headers: ex-headr1,ex-headr2
+Access-Control-Max-Age: 50
+Access-Control-Allow-Credentials: true
+--- no_error_log
+[error]
diff --git a/t/plugin/ext-plugin/conf_token.t b/t/plugin/ext-plugin/conf_token.t
new file mode 100644
index 0000000..58a8b13
--- /dev/null
+++ b/t/plugin/ext-plugin/conf_token.t
@@ -0,0 +1,143 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+workers(3);
+repeat_each(1);
+no_long_string();
+no_root_location();
+no_shuffle();
+log_level("info");
+worker_connections(1024);
+
+$ENV{"PATH"} = $ENV{PATH} . ":" . $ENV{TEST_NGINX_HTML_DIR};
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    $block->set_value("stream_conf_enable", 1);
+
+    if (!defined $block->extra_stream_config) {
+        my $stream_config = <<_EOC_;
+    server {
+        listen unix:\$TEST_NGINX_HTML_DIR/nginx.sock;
+
+        content_by_lua_block {
+            local ext = require("lib.ext-plugin")
+            ext.go({})
+        }
+    }
+
+_EOC_
+        $block->set_value("extra_stream_config", $stream_config);
+    }
+
+    my $unix_socket_path = $ENV{"TEST_NGINX_HTML_DIR"} . "/nginx.sock";
+    my $orig_extra_yaml_config = $block->extra_yaml_config // "";
+    my $cmd = $block->ext_plugin_cmd // "['sleep', '5s']";
+    my $extra_yaml_config = <<_EOC_;
+ext-plugin:
+    path_for_test: $unix_socket_path
+    cmd: $cmd
+_EOC_
+    $extra_yaml_config = $extra_yaml_config . $orig_extra_yaml_config;
+
+    $block->set_value("extra_yaml_config", $extra_yaml_config);
+
+    if (!$block->request) {
+        $block->set_value("request", "GET /t");
+    }
+
+    if (!$block->error_log) {
+        $block->set_value("no_error_log", "[error]\n[alert]");
+    }
+});
+
+run_tests;
+
+__DATA__
+
+=== TEST 1: sanity
+--- config
+    location /t {
+        content_by_lua_block {
+            local json = require("toolkit.json")
+            local t = require("lib.test_admin")
+
+            local code, message, res = t.test('/apisix/admin/routes/1',
+                ngx.HTTP_PUT,
+                 [[{
+                    "uri": "/hello",
+                    "plugins": {
+                        "ext-plugin-pre-req": {"a":"b"}
+                    },
+                    "upstream": {
+                        "nodes": {
+                            "127.0.0.1:1980": 1
+                        },
+                        "type": "roundrobin"
+                    }
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+                ngx.say(message)
+                return
+            end
+
+            ngx.say(message)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 2: share conf token in different workers
+--- config
+    location /t {
+        content_by_lua_block {
+            local http = require "resty.http"
+            local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello"
+
+            local t = {}
+            for i = 1, 16 do
+                local th = assert(ngx.thread.spawn(function(i)
+                    local httpc = http.new()
+                    local res, err = httpc:request_uri(uri)
+                    if not res then
+                        ngx.log(ngx.ERR, err)
+                        return
+                    end
+                end, i))
+                table.insert(t, th)
+            end
+            for i, th in ipairs(t) do
+                ngx.thread.wait(th)
+            end
+            ngx.say("done")
+        }
+    }
+--- response_body
+done
+--- grep_error_log eval
+qr/fetch token from shared dict, token: 233/
+--- grep_error_log_out eval
+qr/(fetch token from shared dict, token: 233){1,}/
+--- no_error_log
+[error]
diff --git a/t/plugin/ext-plugin/sanity.t b/t/plugin/ext-plugin/sanity.t
index 75b01f9..2a5e965 100644
--- a/t/plugin/ext-plugin/sanity.t
+++ b/t/plugin/ext-plugin/sanity.t
@@ -270,6 +270,7 @@ sending rpc type: 1 data length:
 receiving rpc type: 1 data length:
 --- error_log
 flush conf token lrucache
+flush conf token in shared dict
 --- no_error_log
 [error]
 
@@ -382,6 +383,7 @@ hello world
     }
 --- error_log
 refresh cache and try again
+flush conf token in shared dict
 --- no_error_log
 [error]
 
diff --git a/t/plugin/proxy-rewrite2.t b/t/plugin/proxy-rewrite2.t
index e3d1b7b..4fbfe55 100644
--- a/t/plugin/proxy-rewrite2.t
+++ b/t/plugin/proxy-rewrite2.t
@@ -175,3 +175,36 @@ GET /echo
 X-Forwarded-Proto: grpc
 --- response_headers
 X-Forwarded-Proto: https
+
+
+
+=== TEST 6: make sure X-Forwarded-Proto hit the `core.request.header` cache
+--- apisix_yaml
+routes:
+  -
+    id: 1
+    uri: /echo
+    plugins:
+        serverless-pre-function:
+            phase: rewrite
+            functions:
+              - return function(conf, ctx) local core = require("apisix.core"); ngx.log(ngx.ERR, core.request.header(ctx, "host")); end
+        proxy-rewrite:
+            headers:
+                X-Forwarded-Proto: https-rewrite
+    upstream_id: 1
+upstreams:
+  -
+    id: 1
+    nodes:
+        "127.0.0.1:1980": 1
+    type: roundrobin
+#END
+--- request
+GET /echo
+--- more_headers
+X-Forwarded-Proto: grpc
+--- response_headers
+X-Forwarded-Proto: https-rewrite
+--- error_log
+localhost
diff --git a/t/plugin/sls-logger.t b/t/plugin/sls-logger.t
index 7e8b1eb..296372c 100644
--- a/t/plugin/sls-logger.t
+++ b/t/plugin/sls-logger.t
@@ -19,7 +19,21 @@ use t::APISIX 'no_plan';
 repeat_each(1);
 no_long_string();
 no_root_location();
-run_tests;
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
+        $block->set_value("no_error_log", "[error]");
+    }
+
+    if (!defined $block->request) {
+        $block->set_value("request", "GET /t");
+    }
+
+});
+
+run_tests();
 
 __DATA__
 
@@ -37,12 +51,8 @@ __DATA__
             ngx.say("done")
         }
     }
---- request
-GET /t
 --- response_body
 done
---- no_error_log
-[error]
 
 
 
@@ -60,13 +70,9 @@ done
             ngx.say("done")
         }
     }
---- request
-GET /t
 --- response_body
 property "access_key_secret" is required
 done
---- no_error_log
-[error]
 
 
 
@@ -84,13 +90,9 @@ done
             ngx.say("done")
         }
     }
---- request
-GET /t
 --- response_body
 property "timeout" validation failed: wrong type: expected integer, got string
 done
---- no_error_log
-[error]
 
 
 
@@ -155,12 +157,8 @@ done
             ngx.say(body)
         }
     }
---- request
-GET /t
 --- response_body
 passed
---- no_error_log
-[error]
 
 
 
@@ -169,8 +167,6 @@ passed
 GET /hello
 --- response_body
 hello world
---- no_error_log
-[error]
 --- wait: 1
 
 
@@ -188,9 +184,43 @@ hello world
             ngx.say(data)
         }
     }
---- request
-GET /t
 --- response_body
 123
---- no_error_log
-[error]
+
+
+
+=== TEST 7: sls log get milliseconds
+--- config
+    location /t {
+        content_by_lua_block {
+            local function get_syslog_timestamp_millisecond(log_entry)
+                local first_idx = string.find(log_entry, " ") + 1
+                local last_idx2 = string.find(log_entry, " ", first_idx)
+                local rfc3339_date = string.sub(log_entry, first_idx, last_idx2)
+                local rfc3339_len = string.len(rfc3339_date)
+                local rfc3339_millisecond = string.sub(rfc3339_date, rfc3339_len - 4, rfc3339_len - 2)
+                return tonumber(rfc3339_millisecond)
+            end
+
+            math.randomseed(os.time())
+            local rfc5424 = require("apisix.plugins.slslog.rfc5424")
+            local m = 0
+            -- because the millisecond value obtained by `ngx.now` may be `0`
+            -- it is executed multiple times to ensure the accuracy of the test
+            for i = 1, 5 do
+                ngx.sleep(string.format("%0.3f", math.random()))
+                local log_entry = rfc5424.encode("SYSLOG", "INFO", "localhost", "apisix",
+                                                 123456, "apisix.apache.org", "apisix.apache.log",
+                                                 "apisix.sls.logger", "BD274822-96AA-4DA6-90EC-15940FB24444",
+                                                 "hello world")
+                m = get_syslog_timestamp_millisecond(log_entry) + m
+            end
+
+            if m > 0 then
+                ngx.say("passed")
+            end
+        }
+    }
+--- response_body
+passed
+--- timeout: 5
diff --git a/t/plugin/ua-restriction.t b/t/plugin/ua-restriction.t
index 77c6874..82e6658 100644
--- a/t/plugin/ua-restriction.t
+++ b/t/plugin/ua-restriction.t
@@ -739,3 +739,57 @@ hello world
     }
 --- response_body
 passed
+
+
+
+=== TEST 33: the element in allowlist is null
+--- config
+    location /t {
+        content_by_lua_block {
+            local plugin = require("apisix.plugins.ua-restriction")
+            local conf = {
+                allowlist = {
+                    "userdata: NULL",
+                    null,
+                    nil,
+                    ""
+                },
+            }
+            local ok, err = plugin.check_schema(conf)
+            if not ok then
+                ngx.say(err)
+            end
+
+            ngx.say("done")
+        }
+    }
+--- response_body
+property "allowlist" validation failed: wrong type: expected array, got table
+done
+
+
+
+=== TEST 34: the element in denylist is null
+--- config
+    location /t {
+        content_by_lua_block {
+            local plugin = require("apisix.plugins.ua-restriction")
+            local conf = {
+                denylist = {
+                    "userdata: NULL",
+                    null,
+                    nil,
+                    ""
+                },
+            }
+            local ok, err = plugin.check_schema(conf)
+            if not ok then
+                ngx.say(err)
+            end
+
+            ngx.say("done")
+        }
+    }
+--- response_body
+property "denylist" validation failed: wrong type: expected array, got table
+done
diff --git a/t/stream-plugin/mqtt-proxy.t b/t/stream-plugin/mqtt-proxy.t
index 5e74823..3ae675a 100644
--- a/t/stream-plugin/mqtt-proxy.t
+++ b/t/stream-plugin/mqtt-proxy.t
@@ -264,3 +264,138 @@ passed
 "\x10\x0f\x00\x04\x4d\x51\x54\x54\x04\x02\x00\x3c\x00\x03\x66\x6f\x6f"
 --- error_log
 failed to parse domain: loc, error:
+--- timeout: 10
+
+
+
+=== TEST 11: set route with upstream
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/stream_routes/1',
+                ngx.HTTP_PUT,
+                [[{
+                    "remote_addr": "127.0.0.1",
+                    "server_port": 1985,
+                    "plugins": {
+                        "mqtt-proxy": {
+                            "protocol_name": "MQTT",
+                            "protocol_level": 4,
+                            "upstream": {
+                                "ip": "127.0.0.1",
+                                "port": 1995
+                            }
+                        }
+                    }
+                }]]
+                )
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+--- no_error_log
+[error]
+
+
+
+=== TEST 12: hit route
+--- stream_enable
+--- stream_request eval
+"\x10\x0f\x00\x04\x4d\x51\x54\x54\x04\x02\x00\x3c\x00\x03\x66\x6f\x6f"
+--- stream_response
+hello world
+--- grep_error_log eval
+qr/mqtt client id: \w+/
+--- grep_error_log_out
+mqtt client id: foo
+--- no_error_log
+[error]
+
+
+
+=== TEST 13: hit route with empty client id
+--- stream_enable
+--- stream_request eval
+"\x10\x0c\x00\x04\x4d\x51\x54\x54\x04\x02\x00\x3c\x00\x00"
+--- stream_response
+hello world
+--- grep_error_log eval
+qr/mqtt client id: \w+/
+--- grep_error_log_out
+--- no_error_log
+[error]
+
+
+
+=== TEST 14: MQTT 5
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/stream_routes/1',
+                ngx.HTTP_PUT,
+                [[{
+                    "remote_addr": "127.0.0.1",
+                    "server_port": 1985,
+                    "plugins": {
+                        "mqtt-proxy": {
+                            "protocol_name": "MQTT",
+                            "protocol_level": 5,
+                            "upstream": {
+                                "ip": "127.0.0.1",
+                                "port": 1995
+                            }
+                        }
+                    }
+                }]]
+                )
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+--- no_error_log
+[error]
+
+
+
+=== TEST 15: hit route with empty property
+--- stream_enable
+--- stream_request eval
+"\x10\x0d\x00\x04\x4d\x51\x54\x54\x05\x02\x00\x3c\x00\x00\x00"
+--- stream_response
+hello world
+--- grep_error_log eval
+qr/mqtt client id: \w+/
+--- grep_error_log_out
+--- no_error_log
+[error]
+
+
+
+=== TEST 16: hit route with property
+--- stream_enable
+--- stream_request eval
+"\x10\x1b\x00\x04\x4d\x51\x54\x54\x05\x02\x00\x3c\x05\x11\x00\x00\x0e\x10\x00\x09\x63\x6c\x69\x6e\x74\x2d\x31\x31\x31"
+--- stream_response
+hello world
+--- grep_error_log eval
+qr/mqtt client id: \S+/
+--- grep_error_log_out
+mqtt client id: clint-111
+--- no_error_log
+[error]