You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by sp...@apache.org on 2021/11/26 03:12:58 UTC
[apisix] branch master updated: feat(http/kafka-logger): support to log response body (#5550)
This is an automated email from the ASF dual-hosted git repository.
spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new 49a539b feat(http/kafka-logger): support to log response body (#5550)
49a539b is described below
commit 49a539bbb8b97898a468a18b388637df3e7c4878
Author: Daming <zt...@foxmail.com>
AuthorDate: Fri Nov 26 11:12:53 2021 +0800
feat(http/kafka-logger): support to log response body (#5550)
Co-authored-by: 罗泽轩 <sp...@gmail.com>
---
apisix/plugins/http-logger.lua | 23 +++++-
apisix/plugins/kafka-logger.lua | 33 +++++---
apisix/utils/log-util.lua | 57 ++++++++++++-
docs/en/latest/plugins/http-logger.md | 4 +-
docs/en/latest/plugins/kafka-logger.md | 2 +
docs/zh/latest/plugins/http-logger.md | 2 +
docs/zh/latest/plugins/kafka-logger.md | 2 +
t/plugin/http-logger-json.t | 125 +++++++++++++++++++++++++++-
t/plugin/http-logger.t | 36 +++++++++
t/plugin/kafka-logger.t | 143 +++++++++++++++++++++++++++++++++
10 files changed, 412 insertions(+), 15 deletions(-)
diff --git a/apisix/plugins/http-logger.lua b/apisix/plugins/http-logger.lua
index 3d2e7f1..b52c1ad 100644
--- a/apisix/plugins/http-logger.lua
+++ b/apisix/plugins/http-logger.lua
@@ -44,6 +44,17 @@ local schema = {
inactive_timeout = {type = "integer", minimum = 1, default = 5},
batch_max_size = {type = "integer", minimum = 1, default = 1000},
include_req_body = {type = "boolean", default = false},
+ include_resp_body = {type = "boolean", default = false},
+ include_resp_body_expr = {
+ type = "array",
+ minItems = 1,
+ items = {
+ type = "array",
+ items = {
+ type = "string"
+ }
+ }
+ },
concat_method = {type = "string", default = "json",
enum = {"json", "new_line"}}
},
@@ -72,7 +83,12 @@ function _M.check_schema(conf, schema_type)
if schema_type == core.schema.TYPE_METADATA then
return core.schema.check(metadata_schema, conf)
end
- return core.schema.check(schema, conf)
+
+ local ok, err = core.schema.check(schema, conf)
+ if not ok then
+ return nil, err
+ end
+ return log_util.check_log_schema(conf)
end
@@ -162,6 +178,11 @@ local function remove_stale_objects(premature)
end
+function _M.body_filter(conf, ctx)
+ log_util.collect_body(conf, ctx)
+end
+
+
function _M.log(conf, ctx)
local metadata = plugin.plugin_metadata(plugin_name)
core.log.info("metadata: ", core.json.delay_encode(metadata))
diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua
index f045c39..def042f 100644
--- a/apisix/plugins/kafka-logger.lua
+++ b/apisix/plugins/kafka-logger.lua
@@ -19,7 +19,6 @@ local log_util = require("apisix.utils.log-util")
local producer = require ("resty.kafka.producer")
local batch_processor = require("apisix.utils.batch-processor")
local plugin = require("apisix.plugin")
-local expr = require("resty.expr.v1")
local math = math
local pairs = pairs
@@ -85,6 +84,17 @@ local schema = {
}
}
},
+ include_resp_body = {type = "boolean", default = false},
+ include_resp_body_expr = {
+ type = "array",
+ minItems = 1,
+ items = {
+ type = "array",
+ items = {
+ type = "string"
+ }
+ }
+ },
-- in lua-resty-kafka, cluster_name is defined as number
-- see https://github.com/doujiang24/lua-resty-kafka#new-1
cluster_name = {type = "integer", minimum = 1, default = 1},
@@ -109,19 +119,15 @@ local _M = {
function _M.check_schema(conf, schema_type)
-
- if conf.include_req_body_expr then
- local ok, err = expr.new(conf.include_req_body_expr)
- if not ok then
- return nil,
- {error_msg = "failed to validate the 'include_req_body_expr' expression: " .. err}
- end
- end
-
if schema_type == core.schema.TYPE_METADATA then
return core.schema.check(metadata_schema, conf)
end
- return core.schema.check(schema, conf)
+
+ local ok, err = core.schema.check(schema, conf)
+ if not ok then
+ return nil, err
+ end
+ return log_util.check_log_schema(conf)
end
@@ -191,6 +197,11 @@ local function send_kafka_data(conf, log_message, prod)
end
+function _M.body_filter(conf, ctx)
+ log_util.collect_body(conf, ctx)
+end
+
+
function _M.log(conf, ctx)
local entry
if conf.meta_format == "origin" then
diff --git a/apisix/utils/log-util.lua b/apisix/utils/log-util.lua
index 3f82b92..3f268aa 100644
--- a/apisix/utils/log-util.lua
+++ b/apisix/utils/log-util.lua
@@ -123,6 +123,10 @@ local function get_full_log(ngx, conf)
latency = (ngx_now() - ngx.req.start_time()) * 1000
}
+ if ctx.resp_body then
+ log.response.body = ctx.resp_body
+ end
+
if conf.include_req_body then
local log_request_body = true
@@ -132,7 +136,7 @@ local function get_full_log(ngx, conf)
if not conf.request_expr then
local request_expr, err = expr.new(conf.include_req_body_expr)
if not request_expr then
- core.log.error('generate log expr err ' .. err)
+ core.log.error('generate request expr err ' .. err)
return log
end
conf.request_expr = request_expr
@@ -201,6 +205,57 @@ function _M.latency_details_in_ms(ctx)
end
+function _M.check_log_schema(conf)
+ if conf.include_req_body_expr then
+ local ok, err = expr.new(conf.include_req_body_expr)
+ if not ok then
+ return nil, "failed to validate the 'include_req_body_expr' expression: " .. err
+ end
+ end
+ if conf.include_resp_body_expr then
+ local ok, err = expr.new(conf.include_resp_body_expr)
+ if not ok then
+ return nil, "failed to validate the 'include_resp_body_expr' expression: " .. err
+ end
+ end
+ return true, nil
+end
+
+
+function _M.collect_body(conf, ctx)
+ if conf.include_resp_body then
+ local log_response_body = true
+
+ if conf.include_resp_body_expr then
+ if not conf.response_expr then
+ local response_expr, err = expr.new(conf.include_resp_body_expr)
+ if not response_expr then
+ core.log.error('generate response expr err ' .. err)
+ return
+ end
+ conf.response_expr = response_expr
+ end
+
+ if ctx.res_expr_eval_result == nil then
+ ctx.res_expr_eval_result = conf.response_expr:eval(ctx.var)
+ end
+
+ if not ctx.res_expr_eval_result then
+ log_response_body = false
+ end
+ end
+
+ if log_response_body then
+ local final_body = core.response.hold_body_chunk(ctx, true)
+ if not final_body then
+ return
+ end
+ ctx.resp_body = final_body
+ end
+ end
+end
+
+
function _M.get_rfc3339_zulu_timestamp(timestamp)
ngx_update_time()
local now = timestamp or ngx_now()
diff --git a/docs/en/latest/plugins/http-logger.md b/docs/en/latest/plugins/http-logger.md
index acc0bc0..c4f5056 100644
--- a/docs/en/latest/plugins/http-logger.md
+++ b/docs/en/latest/plugins/http-logger.md
@@ -49,7 +49,9 @@ This will provide the ability to send Log data requests as JSON objects to Monit
| buffer_duration | integer | optional | 60 | [1,...] | Maximum age in seconds of the oldest entry in a batch before the batch must be processed.|
| max_retry_count | integer | optional | 0 | [0,...] | Maximum number of retries before removing from the processing pipe line. |
| retry_delay | integer | optional | 1 | [0,...] | Number of seconds the process execution should be delayed if the execution fails. |
-| include_req_body | boolean | optional | false | [false, true] | Whether to include the request body. false: indicates that the requested body is not included; true: indicates that the requested body is included. |
+| include_req_body | boolean | optional | false | [false, true] | Whether to include the request body. false: indicates that the requested body is not included; true: indicates that the requested body is included. Note: if the request body is too big to be kept in the memory, it can't be logged due to Nginx's limitation. |
+| include_resp_body| boolean | optional | false | [false, true] | Whether to include the response body. The response body is included if and only if it is `true`. |
+| include_resp_body_expr | array | optional | | | When `include_resp_body` is true, control the behavior based on the result of the [lua-resty-expr](https://github.com/api7/lua-resty-expr) expression. If present, only log the response body when the result is true. |
| concat_method | string | optional | "json" | ["json", "new_line"] | Enum type: `json` and `new_line`. **json**: use `json.encode` for all pending logs. **new_line**: use `json.encode` for each pending log and concat them with "\n" line. |
## How To Enable
diff --git a/docs/en/latest/plugins/kafka-logger.md b/docs/en/latest/plugins/kafka-logger.md
index 9aa3d92..504aa11 100644
--- a/docs/en/latest/plugins/kafka-logger.md
+++ b/docs/en/latest/plugins/kafka-logger.md
@@ -58,6 +58,8 @@ For more info on Batch-Processor in Apache APISIX please refer.
| retry_delay | integer | optional | 1 | [0,...] | Number of seconds the process execution should be delayed if the execution fails. |
| include_req_body | boolean | optional | false | [false, true] | Whether to include the request body. false: indicates that the requested body is not included; true: indicates that the requested body is included. Note: if the request body is too big to be kept in the memory, it can't be logged due to Nginx's limitation. |
| include_req_body_expr | array | optional | | | When `include_req_body` is true, control the behavior based on the result of the [lua-resty-expr](https://github.com/api7/lua-resty-expr) expression. If present, only log the request body when the result is true. |
+| include_resp_body| boolean | optional | false | [false, true] | Whether to include the response body. The response body is included if and only if it is `true`. |
+| include_resp_body_expr | array | optional | | | When `include_resp_body` is true, control the behavior based on the result of the [lua-resty-expr](https://github.com/api7/lua-resty-expr) expression. If present, only log the response body when the result is true. |
| cluster_name | integer | optional | 1 | [0,...] | the name of the cluster. When there are two or more kafka clusters, you can specify different names. And this only works with async producer_type.|
### examples of meta_format
diff --git a/docs/zh/latest/plugins/http-logger.md b/docs/zh/latest/plugins/http-logger.md
index 7ea4fd7..b253355 100644
--- a/docs/zh/latest/plugins/http-logger.md
+++ b/docs/zh/latest/plugins/http-logger.md
@@ -50,6 +50,8 @@ title: http-logger
| max_retry_count | integer | 可选 | 0 | [0,...] | 从处理管道中移除之前的最大重试次数。 |
| retry_delay | integer | 可选 | 1 | [0,...] | 如果执行失败,则应延迟执行流程的秒数。 |
| include_req_body | boolean | 可选 | false | [false, true] | 是否包括请求 body。false: 表示不包含请求的 body ; true: 表示包含请求的 body 。 |
+| include_resp_body| boolean | 可选 | false | [false, true] | 是否包括响应体。包含响应体,当为`true`。 |
+| include_resp_body_expr | array | 可选 | | | 是否采集响体, 基于[lua-resty-expr](https://github.com/api7/lua-resty-expr)。 该选项需要开启 `include_resp_body`|
| concat_method | string | 可选 | "json" | ["json", "new_line"] | 枚举类型: `json`、`new_line`。**json**: 对所有待发日志使用 `json.encode` 编码。**new_line**: 对每一条待发日志单独使用 `json.encode` 编码并使用 "\n" 连接起来。 |
## 如何开启
diff --git a/docs/zh/latest/plugins/kafka-logger.md b/docs/zh/latest/plugins/kafka-logger.md
index d51158a..8595533 100644
--- a/docs/zh/latest/plugins/kafka-logger.md
+++ b/docs/zh/latest/plugins/kafka-logger.md
@@ -58,6 +58,8 @@ title: kafka-logger
| retry_delay | integer | 可选 | 1 | [0,...] | 如果执行失败,则应延迟执行流程的秒数。 |
| include_req_body | boolean | 可选 | false | [false, true] | 是否包括请求 body。false: 表示不包含请求的 body ;true: 表示包含请求的 body。注意:如果请求 body 没办法完全放在内存中,由于 Nginx 的限制,我们没有办法把它记录下来。|
| include_req_body_expr | array | 可选 | | | 当 `include_req_body` 开启时, 基于 [lua-resty-expr](https://github.com/api7/lua-resty-expr) 表达式的结果进行记录。如果该选项存在,只有在表达式为真的时候才会记录请求 body。 |
+| include_resp_body| boolean | 可选 | false | [false, true] | 是否包括响应体。包含响应体,当为`true`。 |
+| include_resp_body_expr | array | 可选 | | | 是否采集响体, 基于[lua-resty-expr](https://github.com/api7/lua-resty-expr)。 该选项需要开启 `include_resp_body`|
| cluster_name | integer | 可选 | 1 | [0,...] | kafka 集群的名称。当有两个或多个 kafka 集群时,可以指定不同的名称。只适用于 producer_type 是 async 模式。|
### meta_format 参考示例
diff --git a/t/plugin/http-logger-json.t b/t/plugin/http-logger-json.t
index ed727c2..9787165 100644
--- a/t/plugin/http-logger-json.t
+++ b/t/plugin/http-logger-json.t
@@ -42,7 +42,7 @@ run_tests;
__DATA__
-=== TEST 1: json body
+=== TEST 1: json body with request_body
--- apisix_yaml
routes:
-
@@ -62,3 +62,126 @@ POST /hello
{"sample_payload":"hello"}
--- error_log
"body":"{\"sample_payload\":\"hello\"}"
+
+
+
+=== TEST 2: json body with response_body
+--- apisix_yaml
+routes:
+ -
+ uri: /hello
+ upstream:
+ nodes:
+ "127.0.0.1:1980": 1
+ type: roundrobin
+ plugins:
+ http-logger:
+ batch_max_size: 1
+ uri: http://127.0.0.1:1980/log
+ include_resp_body: true
+#END
+--- request
+POST /hello
+{"sample_payload":"hello"}
+--- error_log
+"response":{"body":"hello world\n"
+
+
+
+=== TEST 3: json body with response_body and response_body expression
+--- apisix_yaml
+routes:
+ -
+ uri: /hello
+ upstream:
+ nodes:
+ "127.0.0.1:1980": 1
+ type: roundrobin
+ plugins:
+ http-logger:
+ batch_max_size: 1
+ uri: http://127.0.0.1:1980/log
+ include_resp_body: true
+ include_resp_body_expr:
+ - - arg_bar
+ - ==
+ - foo
+#END
+--- request
+POST /hello?bar=foo
+{"sample_payload":"hello"}
+--- error_log
+"response":{"body":"hello world\n"
+
+
+
+=== TEST 4: json body with response_body, expr not hit
+--- apisix_yaml
+routes:
+ -
+ uri: /hello
+ upstream:
+ nodes:
+ "127.0.0.1:1980": 1
+ type: roundrobin
+ plugins:
+ http-logger:
+ batch_max_size: 1
+ uri: http://127.0.0.1:1980/log
+ include_resp_body: true
+ include_resp_body_expr:
+ - - arg_bar
+ - ==
+ - foo
+#END
+--- request
+POST /hello?bar=bar
+{"sample_payload":"hello"}
+--- no_error_log
+"response":{"body":"hello world\n"
+
+
+
+=== TEST 5: json body with request_body and response_body
+--- apisix_yaml
+routes:
+ -
+ uri: /hello
+ upstream:
+ nodes:
+ "127.0.0.1:1980": 1
+ type: roundrobin
+ plugins:
+ http-logger:
+ batch_max_size: 1
+ uri: http://127.0.0.1:1980/log
+ include_req_body: true
+ include_resp_body: true
+#END
+--- request
+POST /hello
+{"sample_payload":"hello"}
+--- error_log eval
+qr/(.*"response":\{.*"body":"hello world\\n".*|.*\{\\\"sample_payload\\\":\\\"hello\\\"\}.*){2}/
+
+
+
+=== TEST 6: json body without request_body or response_body
+--- apisix_yaml
+routes:
+ -
+ uri: /hello
+ upstream:
+ nodes:
+ "127.0.0.1:1980": 1
+ type: roundrobin
+ plugins:
+ http-logger:
+ batch_max_size: 1
+ uri: http://127.0.0.1:1980/log
+#END
+--- request
+POST /hello
+{"sample_payload":"hello"}
+--- error_log eval
+qr/(.*"response":\{.*"body":"hello world\\n".*|.*\{\\\"sample_payload\\\":\\\"hello\\\"\}.*){0}/
diff --git a/t/plugin/http-logger.t b/t/plugin/http-logger.t
index 1dd0122..9dd85db 100644
--- a/t/plugin/http-logger.t
+++ b/t/plugin/http-logger.t
@@ -784,3 +784,39 @@ qr/sending a batch logs to http:\/\/127.0.0.1:1982\/hello\d?/
--- grep_error_log_out
sending a batch logs to http://127.0.0.1:1982/hello
sending a batch logs to http://127.0.0.1:1982/hello1
+
+
+
+=== TEST 18: check log schema(include_resp_body_expr)
+--- config
+ location /t {
+ content_by_lua_block {
+ local plugin = require("apisix.plugins.http-logger")
+ local ok, err = plugin.check_schema({uri = "http://127.0.0.1",
+ auth_header = "Basic 123",
+ timeout = 3,
+ name = "http-logger",
+ max_retry_count = 2,
+ retry_delay = 2,
+ buffer_duration = 2,
+ inactive_timeout = 2,
+ batch_max_size = 500,
+ include_resp_body = true,
+ include_resp_body_expr = {
+ {"bar", "<>", "foo"}
+ }
+ })
+ if not ok then
+ ngx.say(err)
+ end
+
+ ngx.say("done")
+ }
+ }
+--- request
+GET /t
+--- response_body
+failed to validate the 'include_resp_body_expr' expression: invalid operator '<>'
+done
+--- no_error_log
+[error]
diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t
index 5094910..42277c6 100644
--- a/t/plugin/kafka-logger.t
+++ b/t/plugin/kafka-logger.t
@@ -1193,3 +1193,146 @@ hello world
--- no_error_log eval
qr/send data to kafka: \{.*"body":"abcdef"/
--- wait: 2
+
+
+
+=== TEST 29: check log schema(include_req_body)
+--- config
+ location /t {
+ content_by_lua_block {
+ local plugin = require("apisix.plugins.kafka-logger")
+ local ok, err = plugin.check_schema({
+ kafka_topic = "test",
+ key = "key1",
+ broker_list = {
+ ["127.0.0.1"] = 3
+ },
+ include_req_body = true,
+ include_req_body_expr = {
+ {"bar", "<>", "foo"}
+ }
+ })
+ if not ok then
+ ngx.say(err)
+ end
+ ngx.say("done")
+ }
+ }
+--- request
+GET /t
+--- response_body
+failed to validate the 'include_req_body_expr' expression: invalid operator '<>'
+done
+--- no_error_log
+[error]
+
+
+
+=== TEST 30: check log schema(include_resp_body)
+--- config
+ location /t {
+ content_by_lua_block {
+ local plugin = require("apisix.plugins.kafka-logger")
+ local ok, err = plugin.check_schema({
+ kafka_topic = "test",
+ key = "key1",
+ broker_list = {
+ ["127.0.0.1"] = 3
+ },
+ include_resp_body = true,
+ include_resp_body_expr = {
+ {"bar", "<!>", "foo"}
+ }
+ })
+ if not ok then
+ ngx.say(err)
+ end
+ ngx.say("done")
+ }
+ }
+--- request
+GET /t
+--- response_body
+failed to validate the 'include_resp_body_expr' expression: invalid operator '<!>'
+done
+--- no_error_log
+[error]
+
+
+
+=== TEST 31: set route(id: 1,include_resp_body = true,include_resp_body_expr = array)
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [=[{
+ "plugins": {
+ "kafka-logger": {
+ "broker_list" :
+ {
+ "127.0.0.1":9092
+ },
+ "kafka_topic" : "test2",
+ "key" : "key1",
+ "timeout" : 1,
+ "include_resp_body": true,
+ "include_resp_body_expr": [
+ [
+ "arg_name",
+ "==",
+ "qwerty"
+ ]
+ ],
+ "batch_max_size": 1
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1980": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/hello"
+ }]=]
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+
+--- request
+GET /t
+--- response_body
+passed
+--- no_error_log
+[error]
+
+
+
+=== TEST 32: hit route, expr eval success
+--- request
+POST /hello?name=qwerty
+abcdef
+--- response_body
+hello world
+--- no_error_log
+[error]
+--- error_log eval
+qr/send data to kafka: \{.*"body":"hello world\\n"/
+--- wait: 2
+
+
+
+=== TEST 33: hit route,expr eval fail
+--- request
+POST /hello?name=zcxv
+abcdef
+--- response_body
+hello world
+--- no_error_log eval
+qr/send data to kafka: \{.*"body":"hello world\\n"/
+--- wait: 2