You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by sp...@apache.org on 2022/05/12 02:09:32 UTC
[apisix] branch master updated: feat(xRPC): support log filter (#6960)
This is an automated email from the ASF dual-hosted git repository.
spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new 935e62f22 feat(xRPC): support log filter (#6960)
935e62f22 is described below
commit 935e62f225c399d1ed9b819e19375c3eb5338461
Author: tzssangglass <tz...@gmail.com>
AuthorDate: Thu May 12 10:09:27 2022 +0800
feat(xRPC): support log filter (#6960)
---
apisix/schema_def.lua | 22 +
apisix/stream/xrpc/runner.lua | 55 ++
apisix/stream/xrpc/sdk.lua | 3 +
t/xds-library/config_xds_2.t | 2 -
.../apisix/stream/xrpc/protocols/pingpong/init.lua | 1 +
t/xrpc/pingpong2.t | 613 +++++++++++++++++++++
6 files changed, 694 insertions(+), 2 deletions(-)
diff --git a/apisix/schema_def.lua b/apisix/schema_def.lua
index 96cc11eaa..fbeef89bb 100644
--- a/apisix/schema_def.lua
+++ b/apisix/schema_def.lua
@@ -807,6 +807,28 @@ local xrpc_protocol_schema = {
description = "protocol-specific configuration",
type = "object",
},
+ logger = {
+ type = "array",
+ items = {
+ properties = {
+ name = {
+ type = "string",
+ },
+ filter = {
+ description = "logger filter rules",
+ type = "array",
+ },
+ conf = {
+ description = "logger plugin configuration",
+ type = "object",
+ },
+ },
+ dependencies = {
+ name = {"conf"},
+ },
+ },
+ },
+
},
required = {"name"}
}
diff --git a/apisix/stream/xrpc/runner.lua b/apisix/stream/xrpc/runner.lua
index ea807f60a..22cb62d03 100644
--- a/apisix/stream/xrpc/runner.lua
+++ b/apisix/stream/xrpc/runner.lua
@@ -14,14 +14,22 @@
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
+local require = require
local core = require("apisix.core")
+local expr = require("resty.expr.v1")
local pairs = pairs
local ngx = ngx
local ngx_now = ngx.now
local OK = ngx.OK
local DECLINED = ngx.DECLINED
local DONE = ngx.DONE
+local pcall = pcall
+local ipairs = ipairs
+local tostring = tostring
+local logger_expr_cache = core.lrucache.new({
+ ttl = 300, count = 1024
+})
local _M = {}
@@ -71,9 +79,56 @@ local function put_req_ctx(session, ctx)
end
+local function filter_logger(ctx, logger)
+ if not logger then
+ return false
+ end
+
+ if not logger.filter or #logger.filter == 0 then
+ -- no valid filter, default execution plugin
+ return true
+ end
+
+ local version = tostring(logger.filter)
+ local filter_expr, err = logger_expr_cache(ctx.conf_id, version, expr.new, logger.filter)
+ if not filter_expr or err then
+ core.log.error("failed to validate the 'filter' expression: ", err)
+ return false
+ end
+ return filter_expr:eval(ctx)
+end
+
+
+local function run_log_plugin(ctx, logger)
+ local pkg_name = "apisix.stream.plugins." .. logger.name
+ local ok, plugin = pcall(require, pkg_name)
+ if not ok then
+ core.log.error("failed to load plugin [", logger.name, "] err: ", plugin)
+ return
+ end
+
+ local log_func = plugin.log
+ if log_func then
+ log_func(logger.conf, ctx)
+ end
+end
+
+
local function finish_req(protocol, session, ctx)
ctx._rpc_end_time = ngx_now()
+ local loggers = session.route.protocol.logger
+ if loggers and #loggers > 0 then
+ for _, logger in ipairs(loggers) do
+ ctx.conf_id = tostring(logger.conf)
+ local matched = filter_logger(ctx, logger)
+ core.log.info("log filter: ", logger.name, " filter result: ", matched)
+ if matched then
+ run_log_plugin(ctx, logger)
+ end
+ end
+ end
+
protocol.log(session, ctx)
put_req_ctx(session, ctx)
end
diff --git a/apisix/stream/xrpc/sdk.lua b/apisix/stream/xrpc/sdk.lua
index 3e3f4557a..eb70a4018 100644
--- a/apisix/stream/xrpc/sdk.lua
+++ b/apisix/stream/xrpc/sdk.lua
@@ -108,6 +108,9 @@ function _M.get_req_ctx(session, id)
local ctx = core.tablepool.fetch("xrpc_ctxs", 4, 4)
-- fields start with '_' should not be accessed by the protocol implementation
ctx._id = id
+ core.ctx.set_vars_meta(ctx)
+ ctx.conf_type = "xrpc-" .. session.route.protocol.name .. "-logger"
+
session._ctxs[id] = ctx
ctx._rpc_start_time = ngx_now()
diff --git a/t/xds-library/config_xds_2.t b/t/xds-library/config_xds_2.t
index 85f9e0de3..67629d4bc 100644
--- a/t/xds-library/config_xds_2.t
+++ b/t/xds-library/config_xds_2.t
@@ -205,8 +205,6 @@ decode the conf of [/routes/3] failed, err: Expected object key string but found
}
}
local data_str = core.json.encode(data)
- ngx.log(ngx.WARN, "data_str : ", require("inspect")(data_str))
-
ngx.shared["xds-config"]:set("/routes/3", data_str)
ngx.update_time()
ngx.shared["xds-config-version"]:set("version", ngx.now())
diff --git a/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua b/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua
index 1487044ff..212cd6302 100644
--- a/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua
+++ b/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua
@@ -160,6 +160,7 @@ function _M.from_downstream(session, downstream)
if typ == TYPE_UNARY_DYN_UP then
ctx.len = ctx.len + 4
end
+
return OK, ctx
end
diff --git a/t/xrpc/pingpong2.t b/t/xrpc/pingpong2.t
index 3c7152c5b..93c57bd27 100644
--- a/t/xrpc/pingpong2.t
+++ b/t/xrpc/pingpong2.t
@@ -86,6 +86,18 @@ _EOC_
$block->set_value("no_error_log", "[error]\nRPC is not finished");
}
+ if (!defined $block->extra_stream_config) {
+ my $stream_config = <<_EOC_;
+ server {
+ listen 8125 udp;
+ content_by_lua_block {
+ require("lib.mock_layer4").dogstatsd()
+ }
+ }
+_EOC_
+ $block->set_value("extra_stream_config", $stream_config);
+ }
+
$block;
});
@@ -139,3 +151,604 @@ lua tcp socket send timeout: 60000
stream lua tcp socket read timeout: 60000
--- log_level: debug
--- stream_conf_enable
+
+
+
+=== TEST 3: bad loggger filter
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/stream_routes/1',
+ ngx.HTTP_PUT,
+ {
+ protocol = {
+ name = "pingpong",
+ logger = {
+ {
+ name = "syslog",
+ filter = {
+ {}
+ },
+ conf = {}
+ }
+ }
+ },
+ upstream = {
+ nodes = {
+ ["127.0.0.1:1995"] = 1
+ },
+ type = "roundrobin"
+ }
+ }
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- request
+GET /t
+--- response_body
+passed
+
+
+
+=== TEST 4: failed to validate the 'filter' expression
+--- request eval
+"POST /t
+" .
+"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC"
+--- stream_conf_enable
+--- error_log
+failed to validate the 'filter' expression: rule too short
+
+
+
+=== TEST 5: set loggger filter(single rule)
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/stream_routes/1',
+ ngx.HTTP_PUT,
+ {
+ protocol = {
+ name = "pingpong",
+ logger = {
+ {
+ name = "syslog",
+ filter = {
+ {"len", ">", 10}
+ },
+ conf = {}
+ }
+ }
+ },
+ upstream = {
+ nodes = {
+ ["127.0.0.1:1995"] = 1
+ },
+ type = "roundrobin"
+ }
+ }
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- request
+GET /t
+--- response_body
+passed
+
+
+
+=== TEST 6: log filter matched successful
+--- request eval
+"POST /t
+" .
+"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC"
+--- stream_conf_enable
+--- error_log
+log filter: syslog filter result: true
+
+
+
+=== TEST 7: update loggger filter
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/stream_routes/1',
+ ngx.HTTP_PUT,
+ {
+ protocol = {
+ name = "pingpong",
+ logger = {
+ {
+ name = "syslog",
+ filter = {
+ {"len", "<", 10}
+ },
+ conf = {}
+ }
+ }
+ },
+ upstream = {
+ nodes = {
+ ["127.0.0.1:1995"] = 1
+ },
+ type = "roundrobin"
+ }
+ }
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- request
+GET /t
+--- response_body
+passed
+
+
+
+=== TEST 8: failed to match log filter
+--- request eval
+"POST /t
+" .
+"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC"
+--- stream_conf_enable
+--- error_log
+log filter: syslog filter result: false
+
+
+
+=== TEST 9: set loggger filter(multiple rules)
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/stream_routes/1',
+ ngx.HTTP_PUT,
+ {
+ protocol = {
+ name = "pingpong",
+ logger = {
+ {
+ name = "syslog",
+ filter = {
+ {"len", ">", 12},
+ {"len", "<", 14}
+ },
+ conf = {}
+ }
+ }
+ },
+ upstream = {
+ nodes = {
+ ["127.0.0.1:1995"] = 1
+ },
+ type = "roundrobin"
+ }
+ }
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- request
+GET /t
+--- response_body
+passed
+
+
+
+=== TEST 10: log filter matched successful
+--- request eval
+"POST /t
+" .
+"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC"
+--- stream_conf_enable
+--- error_log
+log filter: syslog filter result: true
+
+
+
+=== TEST 11: update loggger filter
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/stream_routes/1',
+ ngx.HTTP_PUT,
+ {
+ protocol = {
+ name = "pingpong",
+ logger = {
+ {
+ name = "syslog",
+ filter = {
+ {"len", "<", 10},
+ {"len", ">", 12}
+ },
+ conf = {}
+ }
+ }
+ },
+ upstream = {
+ nodes = {
+ ["127.0.0.1:1995"] = 1
+ },
+ type = "roundrobin"
+ }
+ }
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- request
+GET /t
+--- response_body
+passed
+
+
+
+=== TEST 12: failed to match log filter
+--- request eval
+"POST /t
+" .
+"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC"
+--- stream_conf_enable
+--- error_log
+log filter: syslog filter result: false
+
+
+
+=== TEST 13: set custom log format
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/plugin_metadata/syslog',
+ ngx.HTTP_PUT,
+ [[{
+ "log_format": {
+ "client_ip": "$remote_addr"
+ }
+ }]]
+ )
+
+ if code >= 300 then
+ ngx.status = code
+ ngx.say(body)
+ return
+ end
+
+ ngx.say(body)
+ }
+ }
+--- request
+GET /t
+--- response_body
+passed
+
+
+
+=== TEST 14: no loggger filter, defaulte executed logger plugin
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/stream_routes/1',
+ ngx.HTTP_PUT,
+ {
+ protocol = {
+ name = "pingpong",
+ logger = {
+ {
+ name = "syslog",
+ conf = {
+ host = "127.0.0.1",
+ port = 8125,
+ sock_type = "udp",
+ batch_max_size = 1,
+ flush_limit = 1
+ }
+ }
+ }
+ },
+ upstream = {
+ nodes = {
+ ["127.0.0.1:1995"] = 1
+ },
+ type = "roundrobin"
+ }
+ }
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- request
+GET /t
+--- response_body
+passed
+
+
+
+=== TEST 15: verify the data received by the log server
+--- request eval
+"POST /t
+" .
+"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC"
+--- stream_conf_enable
+--- wait: 0.5
+--- error_log eval
+qr/message received:.*\"client_ip\\"\:\\"127.0.0.1\\"/
+
+
+
+=== TEST 16: set loggger filter
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/stream_routes/1',
+ ngx.HTTP_PUT,
+ {
+ protocol = {
+ name = "pingpong",
+ logger = {
+ {
+ name = "syslog",
+ filter = {
+ {"len", ">", 10}
+ },
+ conf = {
+ host = "127.0.0.1",
+ port = 8125,
+ sock_type = "udp",
+ batch_max_size = 1,
+ flush_limit = 1
+ }
+ }
+ }
+ },
+ upstream = {
+ nodes = {
+ ["127.0.0.1:1995"] = 1
+ },
+ type = "roundrobin"
+ }
+ }
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- request
+GET /t
+--- response_body
+passed
+
+
+
+=== TEST 17: verify the data received by the log server
+--- request eval
+"POST /t
+" .
+"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC"
+--- stream_conf_enable
+--- wait: 0.5
+--- error_log eval
+qr/message received:.*\"client_ip\\"\:\\"127.0.0.1\\"/
+
+
+
+=== TEST 18: small flush_limit, instant flush
+--- stream_conf_enable
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/stream_routes/1',
+ ngx.HTTP_PUT,
+ {
+ protocol = {
+ name = "pingpong",
+ logger = {
+ {
+ name = "syslog",
+ filter = {
+ {"len", ">", 10}
+ },
+ conf = {
+ host = "127.0.0.1",
+ port = 5044,
+ batch_max_size = 1,
+ flush_limit = 1
+ }
+ }
+ }
+ },
+ upstream = {
+ nodes = {
+ ["127.0.0.1:1995"] = 1
+ },
+ type = "roundrobin"
+ }
+ }
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+
+ -- wait etcd sync
+ ngx.sleep(0.5)
+
+ local sock = ngx.socket.tcp()
+ sock:settimeout(1000)
+ local ok, err = sock:connect("127.0.0.1", 1985)
+ if not ok then
+ ngx.log(ngx.ERR, "failed to connect: ", err)
+ return ngx.exit(503)
+ end
+
+ assert(sock:send("pp\x02\x00\x00\x00\x00\x00\x00\x03ABC"))
+
+ while true do
+ local data, err = sock:receiveany(4096)
+ if not data then
+ sock:close()
+ break
+ end
+ ngx.print(data)
+ end
+ -- wait flush log
+ ngx.sleep(2.5)
+ }
+ }
+--- request
+GET /t
+--- response_body eval
+"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC"
+--- timeout: 5
+--- error_log
+try to lock with key xrpc-pingpong-logger#table
+unlock with key xrpc-pingpong-logger#table
+
+
+
+=== TEST 19: check plugin configuration updating
+--- stream_conf_enable
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/stream_routes/1',
+ ngx.HTTP_PUT,
+ {
+ protocol = {
+ name = "pingpong",
+ logger = {
+ {
+ name = "syslog",
+ filter = {
+ {"len", ">", 10}
+ },
+ conf = {
+ host = "127.0.0.1",
+ port = 5044,
+ batch_max_size = 1
+ }
+ }
+ }
+ },
+ upstream = {
+ nodes = {
+ ["127.0.0.1:1995"] = 1
+ },
+ type = "roundrobin"
+ }
+ }
+ )
+ if code >= 300 then
+ ngx.status = code
+ ngx.say("fail")
+ return
+ end
+ local sock = ngx.socket.tcp()
+ local ok, err = sock:connect("127.0.0.1", 1985)
+ if not ok then
+ ngx.status = code
+ ngx.say("fail")
+ return
+ end
+ assert(sock:send("pp\x02\x00\x00\x00\x00\x00\x00\x03ABC"))
+ local body1, err
+ while true do
+ body1, err = sock:receiveany(4096)
+ if not data then
+ sock:close()
+ break
+ end
+ end
+ local code, body = t('/apisix/admin/stream_routes/1',
+ ngx.HTTP_PUT,
+ {
+ protocol = {
+ name = "pingpong",
+ logger = {
+ {
+ name = "syslog",
+ filter = {
+ {"len", ">", 10}
+ },
+ conf = {
+ host = "127.0.0.1",
+ port = 5045,
+ batch_max_size = 1
+ }
+ }
+ }
+ },
+ upstream = {
+ nodes = {
+ ["127.0.0.1:1995"] = 1
+ },
+ type = "roundrobin"
+ }
+ }
+ )
+ if code >= 300 then
+ ngx.status = code
+ ngx.say("fail")
+ return
+ end
+ local sock = ngx.socket.tcp()
+ local ok, err = sock:connect("127.0.0.1", 1985)
+ if not ok then
+ ngx.status = code
+ ngx.say("fail")
+ return
+ end
+ assert(sock:send("pp\x02\x00\x00\x00\x00\x00\x00\x03ABC"))
+ local body2, err
+ while true do
+ body2, err = sock:receiveany(4096)
+ if not data then
+ sock:close()
+ break
+ end
+ end
+ ngx.print(body1)
+ ngx.print(body2)
+ }
+ }
+--- request
+GET /t
+--- wait: 0.5
+--- response_body eval
+"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC" .
+"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC"
+--- grep_error_log eval
+qr/sending a batch logs to 127.0.0.1:(\d+)/
+--- grep_error_log_out
+sending a batch logs to 127.0.0.1:5044
+sending a batch logs to 127.0.0.1:5045