You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by sp...@apache.org on 2021/05/06 02:08:03 UTC

[apisix] branch master updated: feat(ext-plugin): implement the http-req-call protocol (#4183)

This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new ffed438  feat(ext-plugin): implement the http-req-call protocol (#4183)
ffed438 is described below

commit ffed4383f14dd9de4118511ec4e6edff53965e03
Author: 罗泽轩 <sp...@gmail.com>
AuthorDate: Thu May 6 10:07:56 2021 +0800

    feat(ext-plugin): implement the http-req-call protocol (#4183)
---
 apisix/core/request.lua             |  28 ++
 apisix/plugins/ext-plugin/init.lua  | 283 +++++++++++++++++++-
 rockspec/apisix-master-0.rockspec   |   2 +-
 t/lib/ext-plugin.lua                | 214 +++++++++++++++
 t/lib/server.lua                    |   6 +-
 t/plugin/ext-plugin/http-req-call.t | 517 ++++++++++++++++++++++++++++++++++++
 6 files changed, 1038 insertions(+), 12 deletions(-)

diff --git a/apisix/core/request.lua b/apisix/core/request.lua
index f359460..e43a647 100644
--- a/apisix/core/request.lua
+++ b/apisix/core/request.lua
@@ -29,6 +29,8 @@ local io_open  = io.open
 local req_read_body = ngx.req.read_body
 local req_get_body_data = ngx.req.get_body_data
 local req_get_body_file = ngx.req.get_body_file
+local req_get_uri_args = ngx.req.get_uri_args
+local req_set_uri_args = ngx.req.set_uri_args
 
 
 local _M = {}
@@ -122,6 +124,32 @@ function _M.get_remote_client_port(ctx)
 end
 
 
+function _M.get_uri_args(ctx)
+    if not ctx then
+        ctx = ngx.ctx.api_ctx
+    end
+
+    if not ctx.req_uri_args then
+        -- use 0 to avoid truncated result and keep the behavior as the
+        -- same as other platforms
+        local args = req_get_uri_args(0)
+        ctx.req_uri_args = args
+    end
+
+    return ctx.req_uri_args
+end
+
+
+function _M.set_uri_args(ctx, args)
+    if not ctx then
+        ctx = ngx.ctx.api_ctx
+    end
+
+    ctx.req_uri_args = nil
+    return req_set_uri_args(args)
+end
+
+
 local function get_file(file_name)
     local f, err = io_open(file_name, 'r')
     if not f then
diff --git a/apisix/plugins/ext-plugin/init.lua b/apisix/plugins/ext-plugin/init.lua
index 9dde610..f37ab81 100644
--- a/apisix/plugins/ext-plugin/init.lua
+++ b/apisix/plugins/ext-plugin/init.lua
@@ -16,8 +16,14 @@
 --
 local is_http = ngx.config.subsystem == "http"
 local flatbuffers = require("flatbuffers")
+local a6_method = require("A6.Method")
 local prepare_conf_req = require("A6.PrepareConf.Req")
 local prepare_conf_resp = require("A6.PrepareConf.Resp")
+local http_req_call_req = require("A6.HTTPReqCall.Req")
+local http_req_call_resp = require("A6.HTTPReqCall.Resp")
+local http_req_call_action = require("A6.HTTPReqCall.Action")
+local http_req_call_stop = require("A6.HTTPReqCall.Stop")
+local http_req_call_rewrite = require("A6.HTTPReqCall.Rewrite")
 local text_entry = require("A6.TextEntry")
 local err_resp = require("A6.Err.Resp")
 local err_code = require("A6.Err.Code")
@@ -35,16 +41,23 @@ local band = bit.band
 local lshift = bit.lshift
 local rshift = bit.rshift
 local ffi = require("ffi")
+local ffi_new = ffi.new
 local ffi_str = ffi.string
 local socket_tcp = ngx.socket.tcp
-local str_byte = string.byte
-local str_format = string.format
+local worker_id = ngx.worker.id
 local ngx_timer_at = ngx.timer.at
 local exiting = ngx.worker.exiting
+local str_byte = string.byte
+local str_format = string.format
+local str_lower = string.lower
+local str_sub = string.sub
 local error = error
-local events_list
+local ipairs = ipairs
+local pairs = pairs
+local type = type
 
 
+local events_list
 local lrucache = core.lrucache.new({
     type = "plugin",
     ttl = helper.get_conf_token_cache_time(),
@@ -171,6 +184,80 @@ end
 _M.receive = receive
 
 
+local generate_id
+do
+    local count = 0
+    local MAX_COUNT = lshift(1, 22)
+
+    function generate_id()
+        local wid = worker_id()
+        local id = lshift(wid, 22) + count
+        count = count + 1
+        if count == MAX_COUNT then
+            count = 0
+        end
+        return id
+    end
+end
+
+
+local encode_a6_method
+do
+    local map = {
+        GET = a6_method.GET,
+        HEAD = a6_method.HEAD,
+        POST = a6_method.POST,
+        PUT = a6_method.PUT,
+        DELETE = a6_method.DELETE,
+        MKCOL = a6_method.MKCOL,
+        COPY = a6_method.COPY,
+        MOVE = a6_method.MOVE,
+        OPTIONS = a6_method.OPTIONS,
+        PROPFIND = a6_method.PROPFIND,
+        PROPPATCH = a6_method.PROPPATCH,
+        LOCK = a6_method.LOCK,
+        UNLOCK = a6_method.UNLOCK,
+        PATCH = a6_method.PATCH,
+        TRACE = a6_method.TRACE,
+    }
+
+    function encode_a6_method(name)
+        return map[name]
+    end
+end
+
+
+local function build_args(builder, key, val)
+    local name = builder:CreateString(key)
+    local value
+    if val ~= true then
+        value = builder:CreateString(val)
+    end
+
+    text_entry.Start(builder)
+    text_entry.AddName(builder, name)
+    if val ~= true then
+        text_entry.AddValue(builder, value)
+    end
+    return text_entry.End(builder)
+end
+
+
+local function build_headers(var, builder, key, val)
+    if key == "host" then
+        val = var.upstream_host
+    end
+
+    local name = builder:CreateString(key)
+    local value = builder:CreateString(val)
+
+    text_entry.Start(builder)
+    text_entry.AddName(builder, name)
+    text_entry.AddValue(builder, value)
+    return text_entry.End(builder)
+end
+
+
 local rpc_call
 local rpc_handlers = {
     nil,
@@ -232,8 +319,90 @@ local rpc_handlers = {
             return nil, err
         end
 
-        local req = "hello"
-        local ok, err = send(sock, constants.RPC_HTTP_REQ_CALL, req)
+        builder:Clear()
+        local var = ctx.var
+
+        local uri
+        if var.upstream_uri == "" then
+            -- use original uri instead of rewritten one
+            uri = var.uri
+        else
+            uri = var.upstream_uri
+
+            -- the rewritten one may contain new args
+            local index = core.string.find(uri, "?")
+            if index then
+                local raw_uri = uri
+                uri = str_sub(raw_uri, 1, index - 1)
+                core.request.set_uri_args(ctx, str_sub(raw_uri, index + 1))
+            end
+        end
+
+        local path = builder:CreateString(uri)
+
+        local bin_addr = var.binary_remote_addr
+        local len = #bin_addr
+        http_req_call_req.StartSrcIpVector(builder, len)
+        for i = len, 1, -1 do
+            builder:PrependByte(str_byte(bin_addr, i))
+        end
+        local src_ip = builder:EndVector(len)
+
+        local args = core.request.get_uri_args(ctx)
+        local textEntries = {}
+        for key, val in pairs(args) do
+            local ty = type(val)
+            if ty == "table" then
+                for _, v in ipairs(val) do
+                    core.table.insert(textEntries, build_args(builder, key, v))
+                end
+            else
+                core.table.insert(textEntries, build_args(builder, key, val))
+            end
+        end
+        local len = #textEntries
+        http_req_call_req.StartArgsVector(builder, len)
+        for i = len, 1, -1 do
+            builder:PrependUOffsetTRelative(textEntries[i])
+        end
+        local args_vec = builder:EndVector(len)
+
+        local hdrs = core.request.headers(ctx)
+        core.table.clear(textEntries)
+        for key, val in pairs(hdrs) do
+            local ty = type(val)
+            if ty == "table" then
+                for _, v in ipairs(val) do
+                    core.table.insert(textEntries, build_headers(var, builder, key, v))
+                end
+            else
+                core.table.insert(textEntries, build_headers(var, builder, key, val))
+            end
+        end
+        local len = #textEntries
+        http_req_call_req.StartHeadersVector(builder, len)
+        for i = len, 1, -1 do
+            builder:PrependUOffsetTRelative(textEntries[i])
+        end
+        local hdrs_vec = builder:EndVector(len)
+
+        local id = generate_id()
+        local method = var.method
+
+        http_req_call_req.Start(builder)
+        http_req_call_req.AddId(builder, id)
+        http_req_call_req.AddConfToken(builder, token)
+        http_req_call_req.AddSrcIp(builder, src_ip)
+        http_req_call_req.AddPath(builder, path)
+        http_req_call_req.AddArgs(builder, args_vec)
+        http_req_call_req.AddHeaders(builder, hdrs_vec)
+        http_req_call_req.AddMethod(builder, encode_a6_method(method))
+        -- TODO: handle extraInfo
+
+        local req = http_req_call_req.End(builder)
+        builder:Finish(req)
+
+        local ok, err = send(sock, constants.RPC_HTTP_REQ_CALL, builder:Output())
         if not ok then
             return nil, "failed to send RPC_HTTP_REQ_CALL: " .. err
         end
@@ -247,7 +416,95 @@ local rpc_handlers = {
             return nil, "failed to receive RPC_HTTP_REQ_CALL: unexpected type " .. ty
         end
 
-        core.log.warn(resp)
+        local buf = flatbuffers.binaryArray.New(resp)
+        local call_resp = http_req_call_resp.GetRootAsResp(buf, 0)
+        local action_type = call_resp:ActionType()
+
+        if action_type == http_req_call_action.Stop then
+            local action = call_resp:Action()
+            local stop = http_req_call_stop.New()
+            stop:Init(action.bytes, action.pos)
+
+            local len = stop:HeadersLength()
+            if len > 0 then
+                for i = 1, len do
+                    local entry = stop:Headers(i)
+                    core.response.set_header(entry:Name(), entry:Value())
+                end
+            end
+
+            local body
+            local len = stop:BodyLength()
+            if len > 0 then
+                -- TODO: support empty body
+                body = ffi_new("unsigned char[?]", len)
+                for i = 1, len do
+                    body[i - 1] = stop:Body(i)
+                end
+                body = ffi_str(body, len)
+            end
+            return true, nil, stop:Status(), body
+        end
+
+        if action_type == http_req_call_action.Rewrite then
+            ctx.request_rewritten = constants.REWRITTEN_BY_EXT_PLUGIN
+
+            local action = call_resp:Action()
+            local rewrite = http_req_call_rewrite.New()
+            rewrite:Init(action.bytes, action.pos)
+
+            local path = rewrite:Path()
+            if path then
+                path = core.utils.uri_safe_encode(path)
+                var.upstream_uri = path
+            end
+
+            local len = rewrite:HeadersLength()
+            if len > 0 then
+                for i = 1, len do
+                    local entry = rewrite:Headers(i)
+                    local name = entry:Name()
+                    core.request.set_header(ctx, name, entry:Value())
+
+                    if str_lower(name) == "host" then
+                        var.upstream_host = entry:Value()
+                    end
+                end
+            end
+
+            local len = rewrite:ArgsLength()
+            if len > 0 then
+                local changed = {}
+                for i = 1, len do
+                    local entry = rewrite:Args(i)
+                    local name = entry:Name()
+                    local value = entry:Value()
+                    if value == nil then
+                        args[name] = nil
+
+                    else
+                        if changed[name] then
+                            if type(args[name]) == "table" then
+                                core.table.insert(args[name], value)
+                            else
+                                args[name] = {args[name], entry:Value()}
+                            end
+                        else
+                            args[name] = entry:Value()
+                        end
+
+                        changed[name] = true
+                    end
+                end
+
+                core.request.set_uri_args(ctx, args)
+
+                if path then
+                    var.upstream_uri = path .. '?' .. var.args
+                end
+            end
+        end
+
         return true
     end,
 }
@@ -263,8 +520,8 @@ rpc_call = function (ty, conf, ctx)
         return nil, "failed to connect to the unix socket " .. path .. ": " .. err
     end
 
-    local ok, err = rpc_handlers[ty + 1](conf, ctx, sock)
-    if not ok then
+    local res, err, code, body = rpc_handlers[ty + 1](conf, ctx, sock)
+    if not res then
         sock:close()
         return nil, err
     end
@@ -273,16 +530,22 @@ rpc_call = function (ty, conf, ctx)
     if not ok then
         core.log.info("failed to setkeepalive: ", err)
     end
-    return true
+
+    return res, nil, code, body
 end
 
 
 function _M.communicate(conf, ctx)
-    local ok, err = rpc_call(constants.RPC_HTTP_REQ_CALL, conf, ctx)
+    local ok, err, code, body = rpc_call(constants.RPC_HTTP_REQ_CALL, conf, ctx)
     if not ok then
         core.log.error(err)
         return 503
     end
+
+    if code then
+        return code, body
+    end
+    return
 end
 
 
diff --git a/rockspec/apisix-master-0.rockspec b/rockspec/apisix-master-0.rockspec
index 8c536e3..bc1082f 100644
--- a/rockspec/apisix-master-0.rockspec
+++ b/rockspec/apisix-master-0.rockspec
@@ -66,7 +66,7 @@ dependencies = {
     "luasec = 0.9-1",
     "lua-resty-consul = 0.3-2",
     "penlight = 1.9.2-1",
-    "ext-plugin-proto = 0.1.0",
+    "ext-plugin-proto = 0.1.1",
 }
 
 build = {
diff --git a/t/lib/ext-plugin.lua b/t/lib/ext-plugin.lua
index be7f6e5..5ef8eb9 100644
--- a/t/lib/ext-plugin.lua
+++ b/t/lib/ext-plugin.lua
@@ -14,6 +14,7 @@
 -- See the License for the specific language governing permissions and
 -- limitations under the License.
 --
+local json = require("toolkit.json")
 local ext = require("apisix.plugins.ext-plugin.init")
 local constants = require("apisix.constants")
 local flatbuffers = require("flatbuffers")
@@ -21,12 +22,26 @@ local err_code = require("A6.Err.Code")
 local err_resp = require("A6.Err.Resp")
 local prepare_conf_req = require("A6.PrepareConf.Req")
 local prepare_conf_resp = require("A6.PrepareConf.Resp")
+local a6_method = require("A6.Method")
+local text_entry = require("A6.TextEntry")
+local http_req_call_req = require("A6.HTTPReqCall.Req")
+local http_req_call_resp = require("A6.HTTPReqCall.Resp")
+local http_req_call_action = require("A6.HTTPReqCall.Action")
+local http_req_call_stop = require("A6.HTTPReqCall.Stop")
+local http_req_call_rewrite = require("A6.HTTPReqCall.Rewrite")
 
 
 local _M = {}
 local builder = flatbuffers.Builder(0)
 
 
+local function build_action(action, ty)
+    http_req_call_resp.Start(builder)
+    http_req_call_resp.AddActionType(builder, ty)
+    http_req_call_resp.AddAction(builder, action)
+end
+
+
 function _M.go(case)
     local sock = ngx.req.socket()
     local ty, data = ext.receive(sock)
@@ -68,6 +83,205 @@ function _M.go(case)
         end
     end
 
+    if ty == constants.RPC_HTTP_REQ_CALL then
+        local buf = flatbuffers.binaryArray.New(data)
+        local call_req = http_req_call_req.GetRootAsReq(buf, 0)
+        if case.check_input then
+            assert(call_req:Id() == 0)
+            assert(call_req:ConfToken() == 233)
+            assert(call_req:SrcIpLength() == 4)
+            assert(call_req:SrcIp(1) == 127)
+            assert(call_req:SrcIp(2) == 0)
+            assert(call_req:SrcIp(3) == 0)
+            assert(call_req:SrcIp(4) == 1)
+            assert(call_req:Method() == a6_method.PUT)
+            assert(call_req:Path() == "/hello")
+
+            assert(call_req:ArgsLength() == 4)
+            local res = {}
+            for i = 1, call_req:ArgsLength() do
+                local entry = call_req:Args(i)
+                local r = res[entry:Name()]
+                if r then
+                    res[entry:Name()] = {r, entry:Value()}
+                else
+                    res[entry:Name()] = entry:Value() or true
+                end
+            end
+            assert(json.encode(res) == '{\"xx\":[\"y\",\"z\"],\"y\":\"\",\"z\":true}')
+
+            assert(call_req:HeadersLength() == 5)
+            local res = {}
+            for i = 1, call_req:HeadersLength() do
+                local entry = call_req:Headers(i)
+                local r = res[entry:Name()]
+                if r then
+                    res[entry:Name()] = {r, entry:Value()}
+                else
+                    res[entry:Name()] = entry:Value() or true
+                end
+            end
+            assert(json.encode(res) == '{\"connection\":\"close\",\"host\":\"localhost\",' ..
+                   '\"x-req\":[\"foo\",\"bar\"],\"x-resp\":\"cat\"}')
+        elseif case.check_input_ipv6 then
+            assert(call_req:SrcIpLength() == 16)
+            for i = 1, 15 do
+                assert(call_req:SrcIp(i) == 0)
+            end
+            assert(call_req:SrcIp(16) == 1)
+        elseif case.check_input_rewrite_host then
+            for i = 1, call_req:HeadersLength() do
+                local entry = call_req:Headers(i)
+                if entry:Name() == "host" then
+                    assert(entry:Value() == "test.com")
+                end
+            end
+        elseif case.check_input_rewrite_path then
+            assert(call_req:Path() == "/xxx")
+        elseif case.check_input_rewrite_args then
+            assert(call_req:Path() == "/xxx")
+            assert(call_req:ArgsLength() == 1)
+            local entry = call_req:Args(1)
+            assert(entry:Name() == "x")
+            assert(entry:Value() == "z")
+        else
+            assert(call_req:Method() == a6_method.GET)
+        end
+
+        if case.stop == true then
+            local len = 3
+            http_req_call_stop.StartBodyVector(builder, len)
+            builder:PrependByte(string.byte("t"))
+            builder:PrependByte(string.byte("a"))
+            builder:PrependByte(string.byte("c"))
+            local b = builder:EndVector(len)
+
+            local hdrs = {
+                {"X-Resp", "foo"},
+                {"X-Req", "bar"},
+            }
+            local len = #hdrs
+            local textEntries = {}
+            for i = 1, len do
+                local name = builder:CreateString(hdrs[i][1])
+                local value = builder:CreateString(hdrs[i][2])
+                text_entry.Start(builder)
+                text_entry.AddName(builder, name)
+                text_entry.AddValue(builder, value)
+                local c = text_entry.End(builder)
+                textEntries[i] = c
+            end
+            http_req_call_stop.StartHeadersVector(builder, len)
+            for i = len, 1, -1 do
+                builder:PrependUOffsetTRelative(textEntries[i])
+            end
+            local vec = builder:EndVector(len)
+
+            http_req_call_stop.Start(builder)
+            http_req_call_stop.AddStatus(builder, 405)
+            http_req_call_stop.AddBody(builder, b)
+            http_req_call_stop.AddHeaders(builder, vec)
+            local action = http_req_call_stop.End(builder)
+            build_action(action, http_req_call_action.Stop)
+
+        elseif case.rewrite == true or case.rewrite_host == true then
+            local hdrs
+            if case.rewrite_host then
+                hdrs = {{"host", "127.0.0.1"}}
+            else
+                hdrs = {
+                    {"X-Delete", nil},
+                    {"X-Change", "bar"},
+                    {"X-Add", "bar"},
+                }
+            end
+
+            local len = #hdrs
+            local textEntries = {}
+            for i = 1, len do
+                local name = builder:CreateString(hdrs[i][1])
+                local value
+                if hdrs[i][2] then
+                    value = builder:CreateString(hdrs[i][2])
+                end
+                text_entry.Start(builder)
+                text_entry.AddName(builder, name)
+                if value then
+                    text_entry.AddValue(builder, value)
+                end
+                local c = text_entry.End(builder)
+                textEntries[i] = c
+            end
+            http_req_call_rewrite.StartHeadersVector(builder, len)
+            for i = len, 1, -1 do
+                builder:PrependUOffsetTRelative(textEntries[i])
+            end
+            local vec = builder:EndVector(len)
+
+            local path = builder:CreateString("/uri")
+
+            http_req_call_rewrite.Start(builder)
+            http_req_call_rewrite.AddPath(builder, path)
+            http_req_call_rewrite.AddHeaders(builder, vec)
+            local action = http_req_call_rewrite.End(builder)
+            build_action(action, http_req_call_action.Rewrite)
+
+        elseif case.rewrite_args == true or case.rewrite_args_only == true then
+            local path = builder:CreateString("/plugin_proxy_rewrite_args")
+
+            local args = {
+                {"a", "foo"},
+                {"d", nil},
+                {"c", "bar"},
+                {"a", "bar"},
+            }
+
+            local len = #args
+            local textEntries = {}
+            for i = 1, len do
+                local name = builder:CreateString(args[i][1])
+                local value
+                if args[i][2] then
+                    value = builder:CreateString(args[i][2])
+                end
+                text_entry.Start(builder)
+                text_entry.AddName(builder, name)
+                if value then
+                    text_entry.AddValue(builder, value)
+                end
+                local c = text_entry.End(builder)
+                textEntries[i] = c
+            end
+            http_req_call_rewrite.StartHeadersVector(builder, len)
+            for i = len, 1, -1 do
+                builder:PrependUOffsetTRelative(textEntries[i])
+            end
+            local vec = builder:EndVector(len)
+
+            http_req_call_rewrite.Start(builder)
+            if not case.rewrite_args_only then
+                http_req_call_rewrite.AddPath(builder, path)
+            end
+            http_req_call_rewrite.AddArgs(builder, vec)
+            local action = http_req_call_rewrite.End(builder)
+            build_action(action, http_req_call_action.Rewrite)
+
+        elseif case.rewrite_bad_path == true then
+            local path = builder:CreateString("/plugin_proxy_rewrite_args?a=2")
+            http_req_call_rewrite.Start(builder)
+            http_req_call_rewrite.AddPath(builder, path)
+            local action = http_req_call_rewrite.End(builder)
+            build_action(action, http_req_call_action.Rewrite)
+
+        else
+            http_req_call_resp.Start(builder)
+        end
+
+        local req = http_req_call_resp.End(builder)
+        builder:Finish(req)
+        data = builder:Output()
+    end
+
     local ok, err = ext.send(sock, ty, data)
     if not ok then
         ngx.log(ngx.ERR, err)
diff --git a/t/lib/server.lua b/t/lib/server.lua
index f8323ee..5a1b454 100644
--- a/t/lib/server.lua
+++ b/t/lib/server.lua
@@ -94,7 +94,11 @@ function _M.plugin_proxy_rewrite_args()
     table.sort(keys)
 
     for _, key in ipairs(keys) do
-        ngx.say(key, ": ", args[key])
+        if type(args[key]) == "table" then
+            ngx.say(key, ": ", table.concat(args[key], ','))
+        else
+            ngx.say(key, ": ", args[key])
+        end
     end
 end
 
diff --git a/t/plugin/ext-plugin/http-req-call.t b/t/plugin/ext-plugin/http-req-call.t
new file mode 100644
index 0000000..76d56b4
--- /dev/null
+++ b/t/plugin/ext-plugin/http-req-call.t
@@ -0,0 +1,517 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+no_long_string();
+no_root_location();
+no_shuffle();
+log_level("info");
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    $block->set_value("stream_conf_enable", 1);
+
+    if (!defined $block->extra_stream_config) {
+        my $stream_config = <<_EOC_;
+    server {
+        listen unix:\$TEST_NGINX_HTML_DIR/nginx.sock;
+
+        content_by_lua_block {
+            local ext = require("lib.ext-plugin")
+            ext.go({})
+        }
+    }
+
+_EOC_
+        $block->set_value("extra_stream_config", $stream_config);
+    }
+
+    my $unix_socket_path = $ENV{"TEST_NGINX_HTML_DIR"} . "/nginx.sock";
+    my $extra_yaml_config = <<_EOC_;
+ext-plugin:
+    path_for_test: $unix_socket_path
+_EOC_
+
+    $block->set_value("extra_yaml_config", $extra_yaml_config);
+
+    if (!$block->request) {
+        $block->set_value("request", "GET /t");
+    }
+
+    if (!$block->error_log) {
+        $block->set_value("no_error_log", "[error]\n[alert]");
+    }
+});
+
+run_tests;
+
+__DATA__
+
+=== TEST 1: add route
+--- config
+    location /t {
+        content_by_lua_block {
+            local json = require("toolkit.json")
+            local t = require("lib.test_admin")
+
+            local code, message, res = t.test('/apisix/admin/routes/1',
+                ngx.HTTP_PUT,
+                 [[{
+                    "uri": "/hello",
+                    "plugins": {
+                        "ext-plugin-pre-req": {
+                        }
+                    },
+                    "upstream": {
+                        "nodes": {
+                            "127.0.0.1:1980": 1
+                        },
+                        "type": "roundrobin"
+                    }
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+                ngx.say(message)
+                return
+            end
+
+            ngx.say(message)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 2: stop
+--- request
+GET /hello
+--- response_body chomp
+cat
+--- extra_stream_config
+    server {
+        listen unix:$TEST_NGINX_HTML_DIR/nginx.sock;
+
+        content_by_lua_block {
+            local ext = require("lib.ext-plugin")
+            ext.go({stop = true})
+        }
+    }
+--- error_code: 405
+--- response_headers
+X-Resp: foo
+X-Req: bar
+
+
+
+=== TEST 3: check input
+--- request
+PUT /hello?xx=y&xx=z&&y=&&z
+--- more_headers
+X-Req: foo
+X-Req: bar
+X-Resp: cat
+--- extra_stream_config
+    server {
+        listen unix:$TEST_NGINX_HTML_DIR/nginx.sock;
+
+        content_by_lua_block {
+            local ext = require("lib.ext-plugin")
+            ext.go({check_input = true})
+        }
+    }
+
+
+
+=== TEST 4: check input (ipv6)
+--- config
+location /t {
+    content_by_lua_block {
+        local t = require("lib.test_admin").test_ipv6
+        t('/hello')
+    }
+}
+--- extra_stream_config
+    server {
+        listen unix:$TEST_NGINX_HTML_DIR/nginx.sock;
+
+        content_by_lua_block {
+            local ext = require("lib.ext-plugin")
+            ext.go({check_input_ipv6 = true})
+        }
+    }
+--- listen_ipv6
+
+
+
+=== TEST 5: rewrite
+--- request
+GET /hello
+--- more_headers
+X-Change: foo
+X-Delete: foo
+--- extra_stream_config
+    server {
+        listen unix:$TEST_NGINX_HTML_DIR/nginx.sock;
+
+        content_by_lua_block {
+            local ext = require("lib.ext-plugin")
+            ext.go({rewrite = true})
+        }
+    }
+--- response_body
+uri: /uri
+host: localhost
+x-add: bar
+x-change: bar
+x-real-ip: 127.0.0.1
+
+
+
+=== TEST 6: rewrite host
+--- request
+GET /hello
+--- extra_stream_config
+    server {
+        listen unix:$TEST_NGINX_HTML_DIR/nginx.sock;
+
+        content_by_lua_block {
+            local ext = require("lib.ext-plugin")
+            ext.go({rewrite_host = true})
+        }
+    }
+--- response_body
+uri: /uri
+host: 127.0.0.1
+x-real-ip: 127.0.0.1
+
+
+
+=== TEST 7: rewrite args
+--- request
+GET /hello?c=foo&d=bar
+--- extra_stream_config
+    server {
+        listen unix:$TEST_NGINX_HTML_DIR/nginx.sock;
+
+        content_by_lua_block {
+            local ext = require("lib.ext-plugin")
+            ext.go({rewrite_args = true})
+        }
+    }
+--- response_body
+uri: /plugin_proxy_rewrite_args
+a: foo,bar
+c: bar
+
+
+
+=== TEST 8: proxy-rewrite + rewrite host
+--- config
+    location /t {
+        content_by_lua_block {
+            local json = require("toolkit.json")
+            local t = require("lib.test_admin")
+
+            local code, message, res = t.test('/apisix/admin/routes/1',
+                ngx.HTTP_PUT,
+                 [[{
+                    "uri": "/hello",
+                    "plugins": {
+                        "proxy-rewrite": {
+                            "host": "test.com"
+                        },
+                        "ext-plugin-post-req": {
+                        }
+                    },
+                    "upstream": {
+                        "nodes": {
+                            "127.0.0.1:1980": 1
+                        },
+                        "type": "roundrobin"
+                    }
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+                ngx.say(message)
+                return
+            end
+
+            ngx.say(message)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 9: hit
+--- request
+GET /hello
+--- extra_stream_config
+    server {
+        listen unix:$TEST_NGINX_HTML_DIR/nginx.sock;
+
+        content_by_lua_block {
+            local ext = require("lib.ext-plugin")
+            ext.go({rewrite_host = true, check_input_rewrite_host = true})
+        }
+    }
+--- response_body
+uri: /uri
+host: 127.0.0.1
+x-real-ip: 127.0.0.1
+
+
+
+=== TEST 10: proxy-rewrite + rewrite path
+--- config
+    location /t {
+        content_by_lua_block {
+            local json = require("toolkit.json")
+            local t = require("lib.test_admin")
+
+            local code, message, res = t.test('/apisix/admin/routes/1',
+                ngx.HTTP_PUT,
+                 [[{
+                    "uri": "/hello",
+                    "plugins": {
+                        "proxy-rewrite": {
+                            "uri": "/xxx"
+                        },
+                        "ext-plugin-post-req": {
+                        }
+                    },
+                    "upstream": {
+                        "nodes": {
+                            "127.0.0.1:1980": 1
+                        },
+                        "type": "roundrobin"
+                    }
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+                ngx.say(message)
+                return
+            end
+
+            ngx.say(message)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 11: hit
+--- request
+GET /hello
+--- extra_stream_config
+    server {
+        listen unix:$TEST_NGINX_HTML_DIR/nginx.sock;
+
+        content_by_lua_block {
+            local ext = require("lib.ext-plugin")
+            ext.go({rewrite_host = true, check_input_rewrite_path = true})
+        }
+    }
+--- response_body
+uri: /uri
+host: 127.0.0.1
+x-real-ip: 127.0.0.1
+
+
+
+=== TEST 12: proxy-rewrite + rewrite path with args
+--- config
+    location /t {
+        content_by_lua_block {
+            local json = require("toolkit.json")
+            local t = require("lib.test_admin")
+
+            local code, message, res = t.test('/apisix/admin/routes/1',
+                ngx.HTTP_PUT,
+                 [[{
+                    "uri": "/hello",
+                    "plugins": {
+                        "proxy-rewrite": {
+                            "uri": "/xxx?x=z"
+                        },
+                        "ext-plugin-post-req": {
+                        }
+                    },
+                    "upstream": {
+                        "nodes": {
+                            "127.0.0.1:1980": 1
+                        },
+                        "type": "roundrobin"
+                    }
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+                ngx.say(message)
+                return
+            end
+
+            ngx.say(message)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 13: hit
+--- request
+GET /hello
+--- extra_stream_config
+    server {
+        listen unix:$TEST_NGINX_HTML_DIR/nginx.sock;
+
+        content_by_lua_block {
+            local ext = require("lib.ext-plugin")
+            ext.go({rewrite_args = true, check_input_rewrite_args = true})
+        }
+    }
+--- response_body
+uri: /plugin_proxy_rewrite_args
+a: foo,bar
+c: bar
+x: z
+
+
+
+=== TEST 14: rewrite args only
+--- config
+    location /t {
+        content_by_lua_block {
+            local json = require("toolkit.json")
+            local t = require("lib.test_admin")
+
+            local code, message, res = t.test('/apisix/admin/routes/1',
+                ngx.HTTP_PUT,
+                 [[{
+                    "uri": "/plugin_proxy_rewrite_args",
+                    "plugins": {
+                        "ext-plugin-post-req": {
+                        }
+                    },
+                    "upstream": {
+                        "nodes": {
+                            "127.0.0.1:1980": 1
+                        },
+                        "type": "roundrobin"
+                    }
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+                ngx.say(message)
+                return
+            end
+
+            ngx.say(message)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 15: hit
+--- request
+GET /plugin_proxy_rewrite_args
+--- extra_stream_config
+    server {
+        listen unix:$TEST_NGINX_HTML_DIR/nginx.sock;
+
+        content_by_lua_block {
+            local ext = require("lib.ext-plugin")
+            ext.go({rewrite_args_only = true})
+        }
+    }
+--- response_body
+uri: /plugin_proxy_rewrite_args
+a: foo,bar
+c: bar
+
+
+
+=== TEST 16: rewrite, bad path
+--- config
+    location /t {
+        content_by_lua_block {
+            local json = require("toolkit.json")
+            local t = require("lib.test_admin")
+
+            local code, message, res = t.test('/apisix/admin/routes/1',
+                ngx.HTTP_PUT,
+                 [[{
+                    "uri": "/hello",
+                    "plugins": {
+                        "ext-plugin-post-req": {
+                        }
+                    },
+                    "upstream": {
+                        "nodes": {
+                            "127.0.0.1:1980": 1
+                        },
+                        "type": "roundrobin"
+                    }
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+                ngx.say(message)
+                return
+            end
+
+            ngx.say(message)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 17: hit
+--- request
+GET /hello
+--- extra_stream_config
+    server {
+        listen unix:$TEST_NGINX_HTML_DIR/nginx.sock;
+
+        content_by_lua_block {
+            local ext = require("lib.ext-plugin")
+            ext.go({rewrite_bad_path = true})
+        }
+    }
+--- access_log
+GET /plugin_proxy_rewrite_args%3Fa=2
+--- error_code: 404