You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by sp...@apache.org on 2021/11/26 03:12:58 UTC

[apisix] branch master updated: feat(http/kafka-logger): support to log response body (#5550)

This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new 49a539b  feat(http/kafka-logger): support to log response body (#5550)
49a539b is described below

commit 49a539bbb8b97898a468a18b388637df3e7c4878
Author: Daming <zt...@foxmail.com>
AuthorDate: Fri Nov 26 11:12:53 2021 +0800

    feat(http/kafka-logger): support to log response body (#5550)
    
    Co-authored-by: 罗泽轩 <sp...@gmail.com>
---
 apisix/plugins/http-logger.lua         |  23 +++++-
 apisix/plugins/kafka-logger.lua        |  33 +++++---
 apisix/utils/log-util.lua              |  57 ++++++++++++-
 docs/en/latest/plugins/http-logger.md  |   4 +-
 docs/en/latest/plugins/kafka-logger.md |   2 +
 docs/zh/latest/plugins/http-logger.md  |   2 +
 docs/zh/latest/plugins/kafka-logger.md |   2 +
 t/plugin/http-logger-json.t            | 125 +++++++++++++++++++++++++++-
 t/plugin/http-logger.t                 |  36 +++++++++
 t/plugin/kafka-logger.t                | 143 +++++++++++++++++++++++++++++++++
 10 files changed, 412 insertions(+), 15 deletions(-)

diff --git a/apisix/plugins/http-logger.lua b/apisix/plugins/http-logger.lua
index 3d2e7f1..b52c1ad 100644
--- a/apisix/plugins/http-logger.lua
+++ b/apisix/plugins/http-logger.lua
@@ -44,6 +44,17 @@ local schema = {
         inactive_timeout = {type = "integer", minimum = 1, default = 5},
         batch_max_size = {type = "integer", minimum = 1, default = 1000},
         include_req_body = {type = "boolean", default = false},
+        include_resp_body = {type = "boolean", default = false},
+        include_resp_body_expr = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "array",
+                items = {
+                    type = "string"
+                }
+            }
+        },
         concat_method = {type = "string", default = "json",
                          enum = {"json", "new_line"}}
     },
@@ -72,7 +83,12 @@ function _M.check_schema(conf, schema_type)
     if schema_type == core.schema.TYPE_METADATA then
         return core.schema.check(metadata_schema, conf)
     end
-    return core.schema.check(schema, conf)
+
+    local ok, err = core.schema.check(schema, conf)
+    if not ok then
+        return nil, err
+    end
+    return log_util.check_log_schema(conf)
 end
 
 
@@ -162,6 +178,11 @@ local function remove_stale_objects(premature)
 end
 
 
+function _M.body_filter(conf, ctx)
+    log_util.collect_body(conf, ctx)
+end
+
+
 function _M.log(conf, ctx)
     local metadata = plugin.plugin_metadata(plugin_name)
     core.log.info("metadata: ", core.json.delay_encode(metadata))
diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua
index f045c39..def042f 100644
--- a/apisix/plugins/kafka-logger.lua
+++ b/apisix/plugins/kafka-logger.lua
@@ -19,7 +19,6 @@ local log_util = require("apisix.utils.log-util")
 local producer = require ("resty.kafka.producer")
 local batch_processor = require("apisix.utils.batch-processor")
 local plugin = require("apisix.plugin")
-local expr        = require("resty.expr.v1")
 
 local math     = math
 local pairs    = pairs
@@ -85,6 +84,17 @@ local schema = {
                 }
             }
         },
+        include_resp_body = {type = "boolean", default = false},
+        include_resp_body_expr = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "array",
+                items = {
+                    type = "string"
+                }
+            }
+        },
         -- in lua-resty-kafka, cluster_name is defined as number
         -- see https://github.com/doujiang24/lua-resty-kafka#new-1
         cluster_name = {type = "integer", minimum = 1, default = 1},
@@ -109,19 +119,15 @@ local _M = {
 
 
 function _M.check_schema(conf, schema_type)
-
-    if conf.include_req_body_expr then
-        local ok, err = expr.new(conf.include_req_body_expr)
-        if not ok then
-            return nil,
-            {error_msg = "failed to validate the 'include_req_body_expr' expression: " .. err}
-        end
-    end
-
     if schema_type == core.schema.TYPE_METADATA then
         return core.schema.check(metadata_schema, conf)
     end
-    return core.schema.check(schema, conf)
+
+    local ok, err = core.schema.check(schema, conf)
+    if not ok then
+        return nil, err
+    end
+    return log_util.check_log_schema(conf)
 end
 
 
@@ -191,6 +197,11 @@ local function send_kafka_data(conf, log_message, prod)
 end
 
 
+function _M.body_filter(conf, ctx)
+    log_util.collect_body(conf, ctx)
+end
+
+
 function _M.log(conf, ctx)
     local entry
     if conf.meta_format == "origin" then
diff --git a/apisix/utils/log-util.lua b/apisix/utils/log-util.lua
index 3f82b92..3f268aa 100644
--- a/apisix/utils/log-util.lua
+++ b/apisix/utils/log-util.lua
@@ -123,6 +123,10 @@ local function get_full_log(ngx, conf)
         latency = (ngx_now() - ngx.req.start_time()) * 1000
     }
 
+    if ctx.resp_body then
+        log.response.body = ctx.resp_body
+    end
+
     if conf.include_req_body then
 
         local log_request_body = true
@@ -132,7 +136,7 @@ local function get_full_log(ngx, conf)
             if not conf.request_expr then
                 local request_expr, err = expr.new(conf.include_req_body_expr)
                 if not request_expr then
-                    core.log.error('generate log expr err ' .. err)
+                    core.log.error('generate request expr err ' .. err)
                     return log
                 end
                 conf.request_expr = request_expr
@@ -201,6 +205,57 @@ function _M.latency_details_in_ms(ctx)
 end
 
 
+function _M.check_log_schema(conf)
+    if conf.include_req_body_expr then
+        local ok, err = expr.new(conf.include_req_body_expr)
+        if not ok then
+            return nil, "failed to validate the 'include_req_body_expr' expression: " .. err
+        end
+    end
+    if conf.include_resp_body_expr then
+        local ok, err = expr.new(conf.include_resp_body_expr)
+        if not ok then
+            return nil, "failed to validate the 'include_resp_body_expr' expression: " .. err
+        end
+    end
+    return true, nil
+end
+
+
+function _M.collect_body(conf, ctx)
+    if conf.include_resp_body then
+        local log_response_body = true
+
+        if conf.include_resp_body_expr then
+            if not conf.response_expr then
+                local response_expr, err = expr.new(conf.include_resp_body_expr)
+                if not response_expr then
+                    core.log.error('generate response expr err ' .. err)
+                    return
+                end
+                conf.response_expr = response_expr
+            end
+
+            if ctx.res_expr_eval_result == nil then
+                ctx.res_expr_eval_result = conf.response_expr:eval(ctx.var)
+            end
+
+            if not ctx.res_expr_eval_result then
+                log_response_body = false
+            end
+        end
+
+        if log_response_body then
+            local final_body = core.response.hold_body_chunk(ctx, true)
+            if not final_body then
+                return
+            end
+            ctx.resp_body = final_body
+        end
+    end
+end
+
+
 function _M.get_rfc3339_zulu_timestamp(timestamp)
     ngx_update_time()
     local now = timestamp or ngx_now()
diff --git a/docs/en/latest/plugins/http-logger.md b/docs/en/latest/plugins/http-logger.md
index acc0bc0..c4f5056 100644
--- a/docs/en/latest/plugins/http-logger.md
+++ b/docs/en/latest/plugins/http-logger.md
@@ -49,7 +49,9 @@ This will provide the ability to send Log data requests as JSON objects to Monit
 | buffer_duration  | integer | optional    | 60            | [1,...] | Maximum age in seconds of the oldest entry in a batch before the batch must be processed.|
 | max_retry_count  | integer | optional    | 0             | [0,...] | Maximum number of retries before removing from the processing pipe line.                 |
 | retry_delay      | integer | optional    | 1             | [0,...] | Number of seconds the process execution should be delayed if the execution fails.        |
-| include_req_body | boolean | optional    | false         | [false, true] | Whether to include the request body. false: indicates that the requested body is not included; true: indicates that the requested body is included. |
+| include_req_body | boolean | optional    | false         | [false, true] | Whether to include the request body. false: indicates that the requested body is not included; true: indicates that the requested body is included. Note: if the request body is too big to be kept in the memory, it can't be logged due to Nginx's limitation. |
+| include_resp_body| boolean | optional    | false         | [false, true] | Whether to include the response body. The response body is included if and only if it is `true`. |
+| include_resp_body_expr  | array  | optional    |          |         | When `include_resp_body` is true, control the behavior based on the result of the [lua-resty-expr](https://github.com/api7/lua-resty-expr) expression. If present, only log the response body when the result is true. |
 | concat_method    | string  | optional    | "json"        | ["json", "new_line"] | Enum type: `json` and `new_line`. **json**: use `json.encode` for all pending logs. **new_line**: use `json.encode` for each pending log and concat them with "\n" line. |
 
 ## How To Enable
diff --git a/docs/en/latest/plugins/kafka-logger.md b/docs/en/latest/plugins/kafka-logger.md
index 9aa3d92..504aa11 100644
--- a/docs/en/latest/plugins/kafka-logger.md
+++ b/docs/en/latest/plugins/kafka-logger.md
@@ -58,6 +58,8 @@ For more info on Batch-Processor in Apache APISIX please refer.
 | retry_delay      | integer | optional    | 1              | [0,...] | Number of seconds the process execution should be delayed if the execution fails.        |
 | include_req_body | boolean | optional    | false          | [false, true] | Whether to include the request body. false: indicates that the requested body is not included; true: indicates that the requested body is included. Note: if the request body is too big to be kept in the memory, it can't be logged due to Nginx's limitation. |
 | include_req_body_expr  | array  | optional    |          |         | When `include_req_body` is true, control the behavior based on the result of the [lua-resty-expr](https://github.com/api7/lua-resty-expr) expression. If present, only log the request body when the result is true. |
+| include_resp_body| boolean | optional    | false         | [false, true] | Whether to include the response body. The response body is included if and only if it is `true`. |
+| include_resp_body_expr  | array  | optional    |          |         | When `include_resp_body` is true, control the behavior based on the result of the [lua-resty-expr](https://github.com/api7/lua-resty-expr) expression. If present, only log the response body when the result is true. |
 | cluster_name     | integer | optional    | 1              | [0,...] | the name of the cluster. When there are two or more kafka clusters, you can specify different names. And this only works with async producer_type.|
 
 ### examples of meta_format
diff --git a/docs/zh/latest/plugins/http-logger.md b/docs/zh/latest/plugins/http-logger.md
index 7ea4fd7..b253355 100644
--- a/docs/zh/latest/plugins/http-logger.md
+++ b/docs/zh/latest/plugins/http-logger.md
@@ -50,6 +50,8 @@ title: http-logger
 | max_retry_count  | integer | 可选   | 0             | [0,...] | 从处理管道中移除之前的最大重试次数。               |
 | retry_delay      | integer | 可选   | 1             | [0,...] | 如果执行失败,则应延迟执行流程的秒数。             |
 | include_req_body | boolean | 可选   | false         | [false, true] | 是否包括请求 body。false: 表示不包含请求的 body ; true: 表示包含请求的 body 。 |
+| include_resp_body| boolean | 可选   | false         | [false, true] | 是否包括响应体。包含响应体,当为`true`。 |
+| include_resp_body_expr | array  | 可选    |           |         | 是否采集响体, 基于[lua-resty-expr](https://github.com/api7/lua-resty-expr)。 该选项需要开启 `include_resp_body`|
 | concat_method    | string  | 可选   | "json"        | ["json", "new_line"] | 枚举类型: `json`、`new_line`。**json**: 对所有待发日志使用 `json.encode` 编码。**new_line**: 对每一条待发日志单独使用 `json.encode` 编码并使用 "\n" 连接起来。 |
 
 ## 如何开启
diff --git a/docs/zh/latest/plugins/kafka-logger.md b/docs/zh/latest/plugins/kafka-logger.md
index d51158a..8595533 100644
--- a/docs/zh/latest/plugins/kafka-logger.md
+++ b/docs/zh/latest/plugins/kafka-logger.md
@@ -58,6 +58,8 @@ title: kafka-logger
 | retry_delay      | integer | 可选   | 1              | [0,...] | 如果执行失败,则应延迟执行流程的秒数。           |
 | include_req_body | boolean | 可选   | false          | [false, true] | 是否包括请求 body。false: 表示不包含请求的 body ;true: 表示包含请求的 body。注意:如果请求 body 没办法完全放在内存中,由于 Nginx 的限制,我们没有办法把它记录下来。|
 | include_req_body_expr | array  | 可选    |           |         | 当 `include_req_body` 开启时, 基于 [lua-resty-expr](https://github.com/api7/lua-resty-expr) 表达式的结果进行记录。如果该选项存在,只有在表达式为真的时候才会记录请求 body。 |
+| include_resp_body| boolean | 可选   | false          | [false, true] | 是否包括响应体。包含响应体,当为`true`。 |
+| include_resp_body_expr | array  | 可选    |           |         | 是否采集响体, 基于[lua-resty-expr](https://github.com/api7/lua-resty-expr)。 该选项需要开启 `include_resp_body`|
 | cluster_name     | integer | 可选   | 1              | [0,...] | kafka 集群的名称。当有两个或多个 kafka 集群时,可以指定不同的名称。只适用于 producer_type 是 async 模式。|
 
 ### meta_format 参考示例
diff --git a/t/plugin/http-logger-json.t b/t/plugin/http-logger-json.t
index ed727c2..9787165 100644
--- a/t/plugin/http-logger-json.t
+++ b/t/plugin/http-logger-json.t
@@ -42,7 +42,7 @@ run_tests;
 
 __DATA__
 
-=== TEST 1: json body
+=== TEST 1: json body with request_body
 --- apisix_yaml
 routes:
   -
@@ -62,3 +62,126 @@ POST /hello
 {"sample_payload":"hello"}
 --- error_log
 "body":"{\"sample_payload\":\"hello\"}"
+
+
+
+=== TEST 2: json body with response_body
+--- apisix_yaml
+routes:
+  -
+    uri: /hello
+    upstream:
+        nodes:
+            "127.0.0.1:1980": 1
+        type: roundrobin
+    plugins:
+        http-logger:
+            batch_max_size: 1
+            uri: http://127.0.0.1:1980/log
+            include_resp_body: true
+#END
+--- request
+POST /hello
+{"sample_payload":"hello"}
+--- error_log
+"response":{"body":"hello world\n"
+
+
+
+=== TEST 3: json body with response_body and response_body expression
+--- apisix_yaml
+routes:
+  -
+    uri: /hello
+    upstream:
+        nodes:
+            "127.0.0.1:1980": 1
+        type: roundrobin
+    plugins:
+        http-logger:
+            batch_max_size: 1
+            uri: http://127.0.0.1:1980/log
+            include_resp_body: true
+            include_resp_body_expr:
+                - - arg_bar
+                  - ==
+                  - foo
+#END
+--- request
+POST /hello?bar=foo
+{"sample_payload":"hello"}
+--- error_log
+"response":{"body":"hello world\n"
+
+
+
+=== TEST 4: json body with response_body, expr not hit
+--- apisix_yaml
+routes:
+  -
+    uri: /hello
+    upstream:
+        nodes:
+            "127.0.0.1:1980": 1
+        type: roundrobin
+    plugins:
+        http-logger:
+            batch_max_size: 1
+            uri: http://127.0.0.1:1980/log
+            include_resp_body: true
+            include_resp_body_expr:
+                - - arg_bar
+                  - ==
+                  - foo
+#END
+--- request
+POST /hello?bar=bar
+{"sample_payload":"hello"}
+--- no_error_log
+"response":{"body":"hello world\n"
+
+
+
+=== TEST 5: json body with request_body and response_body
+--- apisix_yaml
+routes:
+  -
+    uri: /hello
+    upstream:
+        nodes:
+            "127.0.0.1:1980": 1
+        type: roundrobin
+    plugins:
+        http-logger:
+            batch_max_size: 1
+            uri: http://127.0.0.1:1980/log
+            include_req_body: true
+            include_resp_body: true
+#END
+--- request
+POST /hello
+{"sample_payload":"hello"}
+--- error_log eval
+qr/(.*"response":\{.*"body":"hello world\\n".*|.*\{\\\"sample_payload\\\":\\\"hello\\\"\}.*){2}/
+
+
+
+=== TEST 6: json body without request_body or response_body
+--- apisix_yaml
+routes:
+  -
+    uri: /hello
+    upstream:
+        nodes:
+            "127.0.0.1:1980": 1
+        type: roundrobin
+    plugins:
+        http-logger:
+            batch_max_size: 1
+            uri: http://127.0.0.1:1980/log
+#END
+--- request
+POST /hello
+{"sample_payload":"hello"}
+--- error_log eval
+qr/(.*"response":\{.*"body":"hello world\\n".*|.*\{\\\"sample_payload\\\":\\\"hello\\\"\}.*){0}/
diff --git a/t/plugin/http-logger.t b/t/plugin/http-logger.t
index 1dd0122..9dd85db 100644
--- a/t/plugin/http-logger.t
+++ b/t/plugin/http-logger.t
@@ -784,3 +784,39 @@ qr/sending a batch logs to http:\/\/127.0.0.1:1982\/hello\d?/
 --- grep_error_log_out
 sending a batch logs to http://127.0.0.1:1982/hello
 sending a batch logs to http://127.0.0.1:1982/hello1
+
+
+
+=== TEST 18: check log schema(include_resp_body_expr)
+--- config
+    location /t {
+        content_by_lua_block {
+            local plugin = require("apisix.plugins.http-logger")
+            local ok, err = plugin.check_schema({uri = "http://127.0.0.1",
+                                                 auth_header = "Basic 123",
+                                                 timeout = 3,
+                                                 name = "http-logger",
+                                                 max_retry_count = 2,
+                                                 retry_delay = 2,
+                                                 buffer_duration = 2,
+                                                 inactive_timeout = 2,
+                                                 batch_max_size = 500,
+                                                 include_resp_body = true,
+                                                 include_resp_body_expr = {
+                                                     {"bar", "<>", "foo"}
+                                                 }
+                                                 })
+            if not ok then
+                ngx.say(err)
+            end
+
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+failed to validate the 'include_resp_body_expr' expression: invalid operator '<>'
+done
+--- no_error_log
+[error]
diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t
index 5094910..42277c6 100644
--- a/t/plugin/kafka-logger.t
+++ b/t/plugin/kafka-logger.t
@@ -1193,3 +1193,146 @@ hello world
 --- no_error_log eval
 qr/send data to kafka: \{.*"body":"abcdef"/
 --- wait: 2
+
+
+
+=== TEST 29: check log schema(include_req_body)
+--- config
+    location /t {
+        content_by_lua_block {
+            local plugin = require("apisix.plugins.kafka-logger")
+            local ok, err = plugin.check_schema({
+                 kafka_topic = "test",
+                 key = "key1",
+                 broker_list = {
+                    ["127.0.0.1"] = 3
+                 },
+                 include_req_body = true,
+                 include_req_body_expr = {
+                     {"bar", "<>", "foo"}
+                 }
+            })
+            if not ok then
+                ngx.say(err)
+            end
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+failed to validate the 'include_req_body_expr' expression: invalid operator '<>'
+done
+--- no_error_log
+[error]
+
+
+
+=== TEST 30: check log schema(include_resp_body)
+--- config
+    location /t {
+        content_by_lua_block {
+            local plugin = require("apisix.plugins.kafka-logger")
+            local ok, err = plugin.check_schema({
+                 kafka_topic = "test",
+                 key = "key1",
+                 broker_list = {
+                    ["127.0.0.1"] = 3
+                 },
+                 include_resp_body = true,
+                 include_resp_body_expr = {
+                     {"bar", "<!>", "foo"}
+                 }
+            })
+            if not ok then
+                ngx.say(err)
+            end
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+failed to validate the 'include_resp_body_expr' expression: invalid operator '<!>'
+done
+--- no_error_log
+[error]
+
+
+
+=== TEST 31: set route(id: 1,include_resp_body = true,include_resp_body_expr = array)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [=[{
+                        "plugins": {
+                            "kafka-logger": {
+                                "broker_list" :
+                                  {
+                                    "127.0.0.1":9092
+                                  },
+                                "kafka_topic" : "test2",
+                                "key" : "key1",
+                                "timeout" : 1,
+                                "include_resp_body": true,
+                                "include_resp_body_expr": [
+                                    [
+                                      "arg_name",
+                                      "==",
+                                      "qwerty"
+                                    ]
+                                ],
+                                "batch_max_size": 1
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]=]
+                )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+
+--- request
+GET /t
+--- response_body
+passed
+--- no_error_log
+[error]
+
+
+
+=== TEST 32: hit route, expr eval success
+--- request
+POST /hello?name=qwerty
+abcdef
+--- response_body
+hello world
+--- no_error_log
+[error]
+--- error_log eval
+qr/send data to kafka: \{.*"body":"hello world\\n"/
+--- wait: 2
+
+
+
+=== TEST 33: hit route,expr eval fail
+--- request
+POST /hello?name=zcxv
+abcdef
+--- response_body
+hello world
+--- no_error_log eval
+qr/send data to kafka: \{.*"body":"hello world\\n"/
+--- wait: 2