You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by sp...@apache.org on 2022/05/12 02:09:32 UTC

[apisix] branch master updated: feat(xRPC): support log filter (#6960)

This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new 935e62f22 feat(xRPC): support log filter (#6960)
935e62f22 is described below

commit 935e62f225c399d1ed9b819e19375c3eb5338461
Author: tzssangglass <tz...@gmail.com>
AuthorDate: Thu May 12 10:09:27 2022 +0800

    feat(xRPC): support log filter (#6960)
---
 apisix/schema_def.lua                              |  22 +
 apisix/stream/xrpc/runner.lua                      |  55 ++
 apisix/stream/xrpc/sdk.lua                         |   3 +
 t/xds-library/config_xds_2.t                       |   2 -
 .../apisix/stream/xrpc/protocols/pingpong/init.lua |   1 +
 t/xrpc/pingpong2.t                                 | 613 +++++++++++++++++++++
 6 files changed, 694 insertions(+), 2 deletions(-)

diff --git a/apisix/schema_def.lua b/apisix/schema_def.lua
index 96cc11eaa..fbeef89bb 100644
--- a/apisix/schema_def.lua
+++ b/apisix/schema_def.lua
@@ -807,6 +807,28 @@ local xrpc_protocol_schema = {
             description = "protocol-specific configuration",
             type = "object",
         },
+        logger = {
+            type = "array",
+            items = {
+                properties = {
+                    name = {
+                        type = "string",
+                    },
+                    filter = {
+                        description = "logger filter rules",
+                        type = "array",
+                    },
+                    conf = {
+                        description = "logger plugin configuration",
+                        type = "object",
+                    },
+                },
+                dependencies = {
+                    name = {"conf"},
+                },
+            },
+        },
+
     },
     required = {"name"}
 }
diff --git a/apisix/stream/xrpc/runner.lua b/apisix/stream/xrpc/runner.lua
index ea807f60a..22cb62d03 100644
--- a/apisix/stream/xrpc/runner.lua
+++ b/apisix/stream/xrpc/runner.lua
@@ -14,14 +14,22 @@
 -- See the License for the specific language governing permissions and
 -- limitations under the License.
 --
+local require = require
 local core = require("apisix.core")
+local expr = require("resty.expr.v1")
 local pairs = pairs
 local ngx = ngx
 local ngx_now = ngx.now
 local OK = ngx.OK
 local DECLINED = ngx.DECLINED
 local DONE = ngx.DONE
+local pcall = pcall
+local ipairs = ipairs
+local tostring = tostring
 
+local logger_expr_cache = core.lrucache.new({
+    ttl = 300, count = 1024
+})
 
 local _M = {}
 
@@ -71,9 +79,56 @@ local function put_req_ctx(session, ctx)
 end
 
 
+local function filter_logger(ctx, logger)
+    if not logger then
+       return false
+    end
+
+    if not logger.filter or #logger.filter == 0 then
+        -- no valid filter, default execution plugin
+        return true
+    end
+
+    local version = tostring(logger.filter)
+    local filter_expr, err = logger_expr_cache(ctx.conf_id, version, expr.new, logger.filter)
+    if not filter_expr or err then
+        core.log.error("failed to validate the 'filter' expression: ", err)
+        return false
+    end
+    return filter_expr:eval(ctx)
+end
+
+
+local function run_log_plugin(ctx, logger)
+    local pkg_name = "apisix.stream.plugins." .. logger.name
+    local ok, plugin = pcall(require, pkg_name)
+    if not ok then
+        core.log.error("failed to load plugin [", logger.name, "] err: ", plugin)
+        return
+    end
+
+    local log_func = plugin.log
+    if log_func then
+        log_func(logger.conf, ctx)
+    end
+end
+
+
 local function finish_req(protocol, session, ctx)
     ctx._rpc_end_time = ngx_now()
 
+    local loggers = session.route.protocol.logger
+    if loggers and #loggers > 0 then
+        for _, logger in ipairs(loggers) do
+            ctx.conf_id = tostring(logger.conf)
+            local matched = filter_logger(ctx, logger)
+            core.log.info("log filter: ", logger.name, " filter result: ", matched)
+            if matched then
+                run_log_plugin(ctx, logger)
+            end
+        end
+    end
+
     protocol.log(session, ctx)
     put_req_ctx(session, ctx)
 end
diff --git a/apisix/stream/xrpc/sdk.lua b/apisix/stream/xrpc/sdk.lua
index 3e3f4557a..eb70a4018 100644
--- a/apisix/stream/xrpc/sdk.lua
+++ b/apisix/stream/xrpc/sdk.lua
@@ -108,6 +108,9 @@ function _M.get_req_ctx(session, id)
     local ctx = core.tablepool.fetch("xrpc_ctxs", 4, 4)
     -- fields start with '_' should not be accessed by the protocol implementation
     ctx._id = id
+    core.ctx.set_vars_meta(ctx)
+    ctx.conf_type = "xrpc-" .. session.route.protocol.name .. "-logger"
+
     session._ctxs[id] = ctx
 
     ctx._rpc_start_time = ngx_now()
diff --git a/t/xds-library/config_xds_2.t b/t/xds-library/config_xds_2.t
index 85f9e0de3..67629d4bc 100644
--- a/t/xds-library/config_xds_2.t
+++ b/t/xds-library/config_xds_2.t
@@ -205,8 +205,6 @@ decode the conf of [/routes/3] failed, err: Expected object key string but found
                 }
             }
             local data_str = core.json.encode(data)
-            ngx.log(ngx.WARN, "data_str : ", require("inspect")(data_str))
-
             ngx.shared["xds-config"]:set("/routes/3", data_str)
             ngx.update_time()
             ngx.shared["xds-config-version"]:set("version", ngx.now())
diff --git a/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua b/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua
index 1487044ff..212cd6302 100644
--- a/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua
+++ b/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua
@@ -160,6 +160,7 @@ function _M.from_downstream(session, downstream)
     if typ == TYPE_UNARY_DYN_UP then
         ctx.len = ctx.len + 4
     end
+
     return OK, ctx
 end
 
diff --git a/t/xrpc/pingpong2.t b/t/xrpc/pingpong2.t
index 3c7152c5b..93c57bd27 100644
--- a/t/xrpc/pingpong2.t
+++ b/t/xrpc/pingpong2.t
@@ -86,6 +86,18 @@ _EOC_
         $block->set_value("no_error_log", "[error]\nRPC is not finished");
     }
 
+    if (!defined $block->extra_stream_config) {
+        my $stream_config = <<_EOC_;
+    server {
+        listen 8125 udp;
+        content_by_lua_block {
+            require("lib.mock_layer4").dogstatsd()
+        }
+    }
+_EOC_
+        $block->set_value("extra_stream_config", $stream_config);
+    }
+
     $block;
 });
 
@@ -139,3 +151,604 @@ lua tcp socket send timeout: 60000
 stream lua tcp socket read timeout: 60000
 --- log_level: debug
 --- stream_conf_enable
+
+
+
+=== TEST 3: bad loggger filter
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/stream_routes/1',
+                ngx.HTTP_PUT,
+                {
+                    protocol = {
+                        name = "pingpong",
+                        logger = {
+                            {
+                                name = "syslog",
+                                filter = {
+                                    {}
+                                },
+                                conf = {}
+                            }
+                        }
+                    },
+                    upstream = {
+                        nodes = {
+                            ["127.0.0.1:1995"] = 1
+                        },
+                        type = "roundrobin"
+                    }
+                }
+            )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+
+
+
+=== TEST 4: failed to validate the 'filter' expression
+--- request eval
+"POST /t
+" .
+"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC"
+--- stream_conf_enable
+--- error_log
+failed to validate the 'filter' expression: rule too short
+
+
+
+=== TEST 5: set loggger filter(single rule)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/stream_routes/1',
+                ngx.HTTP_PUT,
+                {
+                    protocol = {
+                        name = "pingpong",
+                        logger = {
+                            {
+                                name = "syslog",
+                                filter = {
+                                    {"len", ">", 10}
+                                },
+                                conf = {}
+                            }
+                        }
+                    },
+                    upstream = {
+                        nodes = {
+                            ["127.0.0.1:1995"] = 1
+                        },
+                        type = "roundrobin"
+                    }
+                }
+            )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+
+
+
+=== TEST 6: log filter matched successful
+--- request eval
+"POST /t
+" .
+"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC"
+--- stream_conf_enable
+--- error_log
+log filter: syslog filter result: true
+
+
+
+=== TEST 7: update loggger filter
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/stream_routes/1',
+                ngx.HTTP_PUT,
+                {
+                    protocol = {
+                        name = "pingpong",
+                        logger = {
+                            {
+                                name = "syslog",
+                                filter = {
+                                    {"len", "<", 10}
+                                },
+                                conf = {}
+                            }
+                        }
+                    },
+                    upstream = {
+                        nodes = {
+                            ["127.0.0.1:1995"] = 1
+                        },
+                        type = "roundrobin"
+                    }
+                }
+            )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+
+
+
+=== TEST 8: failed to match log filter
+--- request eval
+"POST /t
+" .
+"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC"
+--- stream_conf_enable
+--- error_log
+log filter: syslog filter result: false
+
+
+
+=== TEST 9: set loggger filter(multiple rules)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/stream_routes/1',
+                ngx.HTTP_PUT,
+                {
+                    protocol = {
+                        name = "pingpong",
+                        logger = {
+                            {
+                                name = "syslog",
+                                filter = {
+                                    {"len", ">", 12},
+                                    {"len", "<", 14}
+                                },
+                                conf = {}
+                            }
+                        }
+                    },
+                    upstream = {
+                        nodes = {
+                            ["127.0.0.1:1995"] = 1
+                        },
+                        type = "roundrobin"
+                    }
+                }
+            )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+
+
+
+=== TEST 10: log filter matched successful
+--- request eval
+"POST /t
+" .
+"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC"
+--- stream_conf_enable
+--- error_log
+log filter: syslog filter result: true
+
+
+
+=== TEST 11: update loggger filter
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/stream_routes/1',
+                ngx.HTTP_PUT,
+                {
+                    protocol = {
+                        name = "pingpong",
+                        logger = {
+                            {
+                                name = "syslog",
+                                filter = {
+                                    {"len", "<", 10},
+                                    {"len", ">", 12}
+                                },
+                                conf = {}
+                            }
+                        }
+                    },
+                    upstream = {
+                        nodes = {
+                            ["127.0.0.1:1995"] = 1
+                        },
+                        type = "roundrobin"
+                    }
+                }
+            )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+
+
+
+=== TEST 12: failed to match log filter
+--- request eval
+"POST /t
+" .
+"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC"
+--- stream_conf_enable
+--- error_log
+log filter: syslog filter result: false
+
+
+
+=== TEST 13: set custom log format
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/plugin_metadata/syslog',
+                ngx.HTTP_PUT,
+                [[{
+                    "log_format": {
+                        "client_ip": "$remote_addr"
+                    }
+                }]]
+                )
+
+            if code >= 300 then
+                ngx.status = code
+                ngx.say(body)
+                return
+            end
+
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+
+
+
+=== TEST 14: no loggger filter, defaulte executed logger plugin
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/stream_routes/1',
+                ngx.HTTP_PUT,
+                {
+                    protocol = {
+                        name = "pingpong",
+                        logger = {
+                            {
+                                name = "syslog",
+                                conf = {
+                                    host = "127.0.0.1",
+                                    port = 8125,
+                                    sock_type = "udp",
+                                    batch_max_size = 1,
+                                    flush_limit = 1
+                                }
+                            }
+                        }
+                    },
+                    upstream = {
+                        nodes = {
+                            ["127.0.0.1:1995"] = 1
+                        },
+                        type = "roundrobin"
+                    }
+                }
+            )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+
+
+
+=== TEST 15: verify the data received by the log server
+--- request eval
+"POST /t
+" .
+"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC"
+--- stream_conf_enable
+--- wait: 0.5
+--- error_log eval
+qr/message received:.*\"client_ip\\"\:\\"127.0.0.1\\"/
+
+
+
+=== TEST 16: set loggger filter
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/stream_routes/1',
+                ngx.HTTP_PUT,
+                {
+                    protocol = {
+                        name = "pingpong",
+                        logger = {
+                            {
+                                name = "syslog",
+                                filter = {
+                                    {"len", ">", 10}
+                                },
+                                conf = {
+                                    host = "127.0.0.1",
+                                    port = 8125,
+                                    sock_type = "udp",
+                                    batch_max_size = 1,
+                                    flush_limit = 1
+                                }
+                            }
+                        }
+                    },
+                    upstream = {
+                        nodes = {
+                            ["127.0.0.1:1995"] = 1
+                        },
+                        type = "roundrobin"
+                    }
+                }
+            )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+
+
+
+=== TEST 17: verify the data received by the log server
+--- request eval
+"POST /t
+" .
+"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC"
+--- stream_conf_enable
+--- wait: 0.5
+--- error_log eval
+qr/message received:.*\"client_ip\\"\:\\"127.0.0.1\\"/
+
+
+
+=== TEST 18: small flush_limit, instant flush
+--- stream_conf_enable
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/stream_routes/1',
+                ngx.HTTP_PUT,
+                {
+                    protocol = {
+                        name = "pingpong",
+                        logger = {
+                            {
+                                name = "syslog",
+                                filter = {
+                                    {"len", ">", 10}
+                                },
+                                conf = {
+                                    host = "127.0.0.1",
+                                    port = 5044,
+                                    batch_max_size = 1,
+                                    flush_limit = 1
+                                }
+                            }
+                        }
+                    },
+                    upstream = {
+                        nodes = {
+                            ["127.0.0.1:1995"] = 1
+                        },
+                        type = "roundrobin"
+                    }
+                }
+            )
+            if code >= 300 then
+                ngx.status = code
+            end
+
+            -- wait etcd sync
+            ngx.sleep(0.5)
+
+            local sock = ngx.socket.tcp()
+            sock:settimeout(1000)
+            local ok, err = sock:connect("127.0.0.1", 1985)
+            if not ok then
+                ngx.log(ngx.ERR, "failed to connect: ", err)
+                return ngx.exit(503)
+            end
+
+            assert(sock:send("pp\x02\x00\x00\x00\x00\x00\x00\x03ABC"))
+
+            while true do
+                local data, err = sock:receiveany(4096)
+                if not data then
+                    sock:close()
+                    break
+                end
+                ngx.print(data)
+            end
+            -- wait flush log
+            ngx.sleep(2.5)
+        }
+    }
+--- request
+GET /t
+--- response_body eval
+"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC"
+--- timeout: 5
+--- error_log
+try to lock with key xrpc-pingpong-logger#table
+unlock with key xrpc-pingpong-logger#table
+
+
+
+=== TEST 19: check plugin configuration updating
+--- stream_conf_enable
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/stream_routes/1',
+                ngx.HTTP_PUT,
+                {
+                    protocol = {
+                        name = "pingpong",
+                        logger = {
+                            {
+                                name = "syslog",
+                                filter = {
+                                    {"len", ">", 10}
+                                },
+                                conf = {
+                                    host = "127.0.0.1",
+                                    port = 5044,
+                                    batch_max_size = 1
+                                }
+                            }
+                        }
+                    },
+                    upstream = {
+                        nodes = {
+                            ["127.0.0.1:1995"] = 1
+                        },
+                        type = "roundrobin"
+                    }
+                }
+            )
+            if code >= 300 then
+                ngx.status = code
+                ngx.say("fail")
+                return
+            end
+            local sock = ngx.socket.tcp()
+            local ok, err = sock:connect("127.0.0.1", 1985)
+            if not ok then
+                ngx.status = code
+                ngx.say("fail")
+                return
+            end
+            assert(sock:send("pp\x02\x00\x00\x00\x00\x00\x00\x03ABC"))
+            local body1, err
+            while true do
+                body1, err = sock:receiveany(4096)
+                if not data then
+                    sock:close()
+                    break
+                end
+            end
+            local code, body = t('/apisix/admin/stream_routes/1',
+                ngx.HTTP_PUT,
+                {
+                    protocol = {
+                        name = "pingpong",
+                        logger = {
+                            {
+                                name = "syslog",
+                                filter = {
+                                    {"len", ">", 10}
+                                },
+                                conf = {
+                                    host = "127.0.0.1",
+                                    port = 5045,
+                                    batch_max_size = 1
+                                }
+                            }
+                        }
+                    },
+                    upstream = {
+                        nodes = {
+                            ["127.0.0.1:1995"] = 1
+                        },
+                        type = "roundrobin"
+                    }
+                }
+            )
+            if code >= 300 then
+                ngx.status = code
+                ngx.say("fail")
+                return
+            end
+            local sock = ngx.socket.tcp()
+            local ok, err = sock:connect("127.0.0.1", 1985)
+            if not ok then
+                ngx.status = code
+                ngx.say("fail")
+                return
+            end
+            assert(sock:send("pp\x02\x00\x00\x00\x00\x00\x00\x03ABC"))
+            local body2, err
+            while true do
+                body2, err = sock:receiveany(4096)
+                if not data then
+                    sock:close()
+                    break
+                end
+            end
+            ngx.print(body1)
+            ngx.print(body2)
+        }
+    }
+--- request
+GET /t
+--- wait: 0.5
+--- response_body eval
+"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC" .
+"pp\x02\x00\x00\x00\x00\x00\x00\x03ABC"
+--- grep_error_log eval
+qr/sending a batch logs to 127.0.0.1:(\d+)/
+--- grep_error_log_out
+sending a batch logs to 127.0.0.1:5044
+sending a batch logs to 127.0.0.1:5045