You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by we...@apache.org on 2020/10/09 08:33:15 UTC

[apisix] branch master updated: feat: add new filed `meta_style`, collect the request information with `original` style. (#2364)

This is an automated email from the ASF dual-hosted git repository.

wenming pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new 73dfdf2  feat: add new filed `meta_style`, collect the request information with `original` style. (#2364)
73dfdf2 is described below

commit 73dfdf2185c52acb395c6627dd9d4294efdb3733
Author: YuanSheng Wang <me...@gmail.com>
AuthorDate: Fri Oct 9 16:33:07 2020 +0800

    feat: add new filed `meta_style`, collect the request information with `original` style. (#2364)
---
 apisix/plugins/kafka-logger.lua   |  31 +++++--
 apisix/utils/log-util.lua         |  26 +++++-
 doc/plugins/kafka-logger.md       |  26 +++++-
 doc/zh-cn/plugins/kafka-logger.md |  33 +++++--
 t/plugin/kafka-logger.t           | 186 +++++++++++++++++++++++++++++++++++++-
 5 files changed, 280 insertions(+), 22 deletions(-)

diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua
index 6377bf1..0f7e767 100644
--- a/apisix/plugins/kafka-logger.lua
+++ b/apisix/plugins/kafka-logger.lua
@@ -32,6 +32,11 @@ local buffers = {}
 local schema = {
     type = "object",
     properties = {
+        meta_format = {
+            type = "string",
+            default = "default",
+            enum = {"default", "origin"},
+        },
         broker_list = {
             type = "object"
         },
@@ -93,7 +98,7 @@ local function send_kafka_data(conf, log_message)
         return nil, "failed to send data to Kafka topic" .. err
     end
 
-    return true, nil
+    return true
 end
 
 -- remove stale objects from the memory after timer expires
@@ -113,22 +118,24 @@ local function remove_stale_objects(premature)
 end
 
 
-function _M.log(conf)
-    local entry = log_util.get_full_log(ngx, conf)
+function _M.log(conf, ctx)
+    local entry
+    if conf.meta_format == "origin" then
+        entry = log_util.get_req_original(ctx, conf)
+        -- core.log.info("origin entry: ", entry)
 
-    if not entry.route_id then
-        core.log.error("failed to obtain the route id for kafka logger")
-        return
+    else
+        entry = log_util.get_full_log(ngx, conf)
+        core.log.info("full log entry: ", core.json.delay_encode(entry))
     end
 
-    local log_buffer = buffers[entry.route_id]
-
     if not stale_timer_running then
         -- run the timer every 30 mins if any log is present
         timer_at(1800, remove_stale_objects)
         stale_timer_running = true
     end
 
+    local log_buffer = buffers[conf]
     if log_buffer then
         log_buffer:push(entry)
         return
@@ -138,7 +145,10 @@ function _M.log(conf)
     local func = function(entries, batch_max_size)
         local data, err
         if batch_max_size == 1 then
-            data, err = core.json.encode(entries[1]) -- encode as single {}
+            data = entries[1]
+            if type(data) ~= "string" then
+                data, err = core.json.encode(data) -- encode as single {}
+            end
         else
             data, err = core.json.encode(entries) -- encode as array [{}]
         end
@@ -147,6 +157,7 @@ function _M.log(conf)
             return false, 'error occurred while encoding the data: ' .. err
         end
 
+        core.log.info("send data to kafka: ", data)
         return send_kafka_data(conf, data)
     end
 
@@ -167,7 +178,7 @@ function _M.log(conf)
         return
     end
 
-    buffers[entry.route_id] = log_buffer
+    buffers[conf] = log_buffer
     log_buffer:push(entry)
 end
 
diff --git a/apisix/utils/log-util.lua b/apisix/utils/log-util.lua
index b11a435..131d9b2 100644
--- a/apisix/utils/log-util.lua
+++ b/apisix/utils/log-util.lua
@@ -14,10 +14,13 @@
 -- See the License for the specific language governing permissions and
 -- limitations under the License.
 --
-local core     = require("apisix.core")
+local core = require("apisix.core")
+local ngx  = ngx
+local pairs = pairs
 
 local _M = {}
 
+
 local function get_full_log(ngx, conf)
     local ctx = ngx.ctx.api_ctx
     local var = ctx.var
@@ -71,6 +74,25 @@ local function get_full_log(ngx, conf)
 
     return log
 end
-
 _M.get_full_log = get_full_log
+
+
+function _M.get_req_original(ctx, conf)
+    local headers = {
+        ctx.var.request, "\r\n"
+    }
+    for k, v in pairs(ngx.req.get_headers()) do
+        core.table.insert_tail(headers, k, ": ", v, "\r\n")
+    end
+    -- core.log.error("headers: ", core.table.concat(headers, ""))
+    core.table.insert(headers, "\r\n")
+
+    if conf.include_req_body then
+        core.table.insert(headers, ctx.var.request_body)
+    end
+
+    return core.table.concat(headers, "")
+end
+
+
 return _M
diff --git a/doc/plugins/kafka-logger.md b/doc/plugins/kafka-logger.md
index d0b7380..9b6bb47 100644
--- a/doc/plugins/kafka-logger.md
+++ b/doc/plugins/kafka-logger.md
@@ -20,6 +20,7 @@
 - [中文](../zh-cn/plugins/kafka-logger.md)
 
 # Summary
+
 - [**Name**](#name)
 - [**Attributes**](#attributes)
 - [**Info**](#info)
@@ -27,7 +28,6 @@
 - [**Test Plugin**](#test-plugin)
 - [**Disable Plugin**](#disable-plugin)
 
-
 ## Name
 
 `kafka-logger` is a plugin which works as a Kafka client driver for the ngx_lua nginx module.
@@ -48,6 +48,7 @@ For more info on Batch-Processor in Apache APISIX please refer.
 | key              | string  | required    |                |         | Key for the message.                                                                     |
 | timeout          | integer | optional    | 3              | [1,...] | Timeout for the upstream to send data.                                                   |
 | name             | string  | optional    | "kafka logger" |         | A  unique identifier to identity the batch processor                                     |
+| meta_format       | string  | optional    | "default"      | enum: `default`, `origin`| `default`: collect the request information with detfault JSON way. `origin`: collect the request information with original HTTP request. [example](#examples-of-meta_format)|
 | batch_max_size   | integer | optional    | 1000           | [1,...] | Max size of each batch                                                                   |
 | inactive_timeout | integer | optional    | 5              | [1,...] | Maximum age in seconds when the buffer will be flushed if inactive                       |
 | buffer_duration  | integer | optional    | 60             | [1,...] | Maximum age in seconds of the oldest entry in a batch before the batch must be processed |
@@ -55,6 +56,25 @@ For more info on Batch-Processor in Apache APISIX please refer.
 | retry_delay      | integer | optional    | 1              | [0,...] | Number of seconds the process execution should be delayed if the execution fails         |
 | include_req_body | boolean | optional    | false          |         | Whether to include the request body                                                      |
 
+### examples of meta_format
+
+- **default**:
+
+    ```json
+    {"upstream":"127.0.0.1:1980","start_time":1602211788041,"client_ip":"127.0.0.1","service_id":"","route_id":"1","request":{"querystring":{"ab":"cd"},"size":90,"uri":"\/hello?ab=cd","url":"http:\/\/localhost:1984\/hello?ab=cd","headers":{"host":"localhost","content-length":"6","connection":"close"},"body":"abcdef","method":"GET"},"response":{"headers":{"content-type":"text\/plain","server":"APISIX\/1.5","connection":"close","transfer-encoding":"chunked"},"status":200,"size":153},"laten [...]
+    ```
+
+- **origin**:
+
+    ```http
+    GET /hello?ab=cd HTTP/1.1
+    host: localhost
+    content-length: 6
+    connection: close
+
+    abcdef
+    ```
+
 ## Info
 
 The `message` will write to the buffer first.
@@ -64,7 +84,7 @@ or every `buffer_duration` flush the buffer.
 In case of success, returns `true`.
 In case of errors, returns `nil` with a string describing the error (`buffer overflow`).
 
-##### Sample broker list
+### Sample broker list
 
 This plugin supports to push in to more than one broker at a time. Specify the brokers of the external kafka servers as below
 sample to take effect of this functionality.
@@ -107,7 +127,7 @@ curl http://127.0.0.1:9080/apisix/admin/routes/5 -H 'X-API-KEY: edd1c9f034335f13
 
 ## Test Plugin
 
-* success:
+*success:
 
 ```shell
 $ curl -i http://127.0.0.1:9080/hello
diff --git a/doc/zh-cn/plugins/kafka-logger.md b/doc/zh-cn/plugins/kafka-logger.md
index d19936e..48b5adb 100644
--- a/doc/zh-cn/plugins/kafka-logger.md
+++ b/doc/zh-cn/plugins/kafka-logger.md
@@ -20,6 +20,7 @@
 - [English](../../plugins/kafka-logger.md)
 
 # 目录
+
 - [**简介**](#简介)
 - [**属性**](#属性)
 - [**工作原理**](#工作原理)
@@ -45,6 +46,7 @@
 | key              | string  | 必须   |                |         | 发送数据的超时时间。                             |
 | timeout          | integer | 可选   | 3              | [1,...] | 用于加密消息的密钥。                             |
 | name             | string  | 可选   | "kafka logger" |         | batch processor 的唯一标识。                     |
+| meta_format       | string  | 可选   | "default"      | 枚举:`default`,`origin`| `default`:获取请求信息以默认的 JSON 编码方式。`origin`:获取请求信息以 HTTP 原始请求方式。[具体示例](#meta_format-参考示例)|
 | batch_max_size   | integer | 可选   | 1000           | [1,...] | 每批的最大大小                                   |
 | inactive_timeout | integer | 可选   | 5              | [1,...] | 刷新缓冲区的最大时间(以秒为单位)               |
 | buffer_duration  | integer | 可选   | 60             | [1,...] | 必须先处理批次中最旧条目的最长期限(以秒为单位) |
@@ -52,16 +54,35 @@
 | retry_delay      | integer | 可选   | 1              | [0,...] | 如果执行失败,则应延迟执行流程的秒数             |
 | include_req_body | boolean | 可选   |                |         | 是否包括请求 body                                |
 
+### meta_format 参考示例
+
+- **default**:
+
+    ```json
+    {"upstream":"127.0.0.1:1980","start_time":1602211788041,"client_ip":"127.0.0.1","service_id":"","route_id":"1","request":{"querystring":{"ab":"cd"},"size":90,"uri":"\/hello?ab=cd","url":"http:\/\/localhost:1984\/hello?ab=cd","headers":{"host":"localhost","content-length":"6","connection":"close"},"body":"abcdef","method":"GET"},"response":{"headers":{"content-type":"text\/plain","server":"APISIX\/1.5","connection":"close","transfer-encoding":"chunked"},"status":200,"size":153},"laten [...]
+    ```
+
+- **origin**:
+
+    ```http
+    GET /hello?ab=cd HTTP/1.1
+    host: localhost
+    content-length: 6
+    connection: close
+
+    abcdef
+    ```
+
 ## 工作原理
 
 消息将首先写入缓冲区。
-当缓冲区超过`batch_max_size`时,它将发送到kafka服务器,
+当缓冲区超过`batch_max_size`时,它将发送到 kafka 服务器,
 或每个`buffer_duration`刷新缓冲区。
 
-如果成功,则返回“ true”。
-如果出现错误,则返回“ nil”,并带有描述错误的字符串(`buffer overflow`)。
+如果成功,则返回 `true`。
+如果出现错误,则返回 `nil`,并带有描述错误的字符串(`buffer overflow`)。
 
-##### Broker 列表
+### Broker 列表
 
 插件支持一次推送到多个 Broker,如下配置:
 
@@ -101,7 +122,7 @@ curl http://127.0.0.1:9080/apisix/admin/routes/5 -H 'X-API-KEY: edd1c9f034335f13
 
 ## 测试插件
 
-* 成功:
+ 成功
 
 ```shell
 $ curl -i http://127.0.0.1:9080/hello
@@ -112,7 +133,7 @@ hello, world
 
 ## 禁用插件
 
-当您要禁用`kafka-logger`插件时,这很简单,您可以在插件配置中删除相应的json配置,无需重新启动服务,它将立即生效:
+当您要禁用`kafka-logger`插件时,这很简单,您可以在插件配置中删除相应的 json 配置,无需重新启动服务,它将立即生效:
 
 ```shell
 $ curl http://127.0.0.1:2379/apisix/admin/routes/1  -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d value='
diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t
index a4e0814..945cccf 100644
--- a/t/plugin/kafka-logger.t
+++ b/t/plugin/kafka-logger.t
@@ -101,7 +101,7 @@ done
 
 
 
-=== TEST 4: add plugin
+=== TEST 4: set route(id: 1)
 --- config
     location /t {
         content_by_lua_block {
@@ -255,3 +255,187 @@ GET /t
 failed to send data to Kafka topic
 [error]
 --- wait: 1
+
+
+
+=== TEST 7: set route(meta_format = origin, include_req_body = true)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "plugins": {
+                            "kafka-logger": {
+                                "broker_list" : {
+                                    "127.0.0.1":9092
+                                },
+                                "kafka_topic" : "test2",
+                                "key" : "key1",
+                                "timeout" : 1,
+                                "batch_max_size": 1,
+                                "include_req_body": true,
+                                "meta_format": "origin"
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]]
+                )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+--- no_error_log
+[error]
+
+
+
+=== TEST 8: hit route, report log to kafka
+--- request
+GET /hello?ab=cd
+abcdef
+--- response_body
+hello world
+--- no_error_log
+[error]
+--- error_log
+send data to kafka: GET /hello?ab=cd HTTP/1.1
+host: localhost
+content-length: 6
+connection: close
+
+abcdef
+--- wait: 2
+
+
+
+=== TEST 9: set route(meta_format = origin, include_req_body = false)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "plugins": {
+                            "kafka-logger": {
+                                "broker_list" : {
+                                    "127.0.0.1":9092
+                                },
+                                "kafka_topic" : "test2",
+                                "key" : "key1",
+                                "timeout" : 1,
+                                "batch_max_size": 1,
+                                "include_req_body": false,
+                                "meta_format": "origin"
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]]
+                )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+--- no_error_log
+[error]
+
+
+
+=== TEST 10: hit route, report log to kafka
+--- request
+GET /hello?ab=cd
+abcdef
+--- response_body
+hello world
+--- no_error_log
+[error]
+--- error_log
+send data to kafka: GET /hello?ab=cd HTTP/1.1
+host: localhost
+content-length: 6
+connection: close
+--- wait: 2
+
+
+
+=== TEST 11: set route(meta_format = default)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "plugins": {
+                            "kafka-logger": {
+                                "broker_list" : {
+                                    "127.0.0.1":9092
+                                },
+                                "kafka_topic" : "test2",
+                                "key" : "key1",
+                                "timeout" : 1,
+                                "batch_max_size": 1,
+                                "include_req_body": false
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]]
+                )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+--- no_error_log
+[error]
+
+
+
+=== TEST 12: hit route, report log to kafka
+--- request
+GET /hello?ab=cd
+abcdef
+--- response_body
+hello world
+--- no_error_log
+[error]
+--- error_log_like eval
+qr/send data to kafka: \{.*"upstream":"127.0.0.1:1980"/
+--- wait: 2