You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by we...@apache.org on 2020/10/09 08:33:15 UTC
[apisix] branch master updated: feat: add new filed `meta_style`,
collect the request information with `original` style. (#2364)
This is an automated email from the ASF dual-hosted git repository.
wenming pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new 73dfdf2 feat: add new filed `meta_style`, collect the request information with `original` style. (#2364)
73dfdf2 is described below
commit 73dfdf2185c52acb395c6627dd9d4294efdb3733
Author: YuanSheng Wang <me...@gmail.com>
AuthorDate: Fri Oct 9 16:33:07 2020 +0800
feat: add new filed `meta_style`, collect the request information with `original` style. (#2364)
---
apisix/plugins/kafka-logger.lua | 31 +++++--
apisix/utils/log-util.lua | 26 +++++-
doc/plugins/kafka-logger.md | 26 +++++-
doc/zh-cn/plugins/kafka-logger.md | 33 +++++--
t/plugin/kafka-logger.t | 186 +++++++++++++++++++++++++++++++++++++-
5 files changed, 280 insertions(+), 22 deletions(-)
diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua
index 6377bf1..0f7e767 100644
--- a/apisix/plugins/kafka-logger.lua
+++ b/apisix/plugins/kafka-logger.lua
@@ -32,6 +32,11 @@ local buffers = {}
local schema = {
type = "object",
properties = {
+ meta_format = {
+ type = "string",
+ default = "default",
+ enum = {"default", "origin"},
+ },
broker_list = {
type = "object"
},
@@ -93,7 +98,7 @@ local function send_kafka_data(conf, log_message)
return nil, "failed to send data to Kafka topic" .. err
end
- return true, nil
+ return true
end
-- remove stale objects from the memory after timer expires
@@ -113,22 +118,24 @@ local function remove_stale_objects(premature)
end
-function _M.log(conf)
- local entry = log_util.get_full_log(ngx, conf)
+function _M.log(conf, ctx)
+ local entry
+ if conf.meta_format == "origin" then
+ entry = log_util.get_req_original(ctx, conf)
+ -- core.log.info("origin entry: ", entry)
- if not entry.route_id then
- core.log.error("failed to obtain the route id for kafka logger")
- return
+ else
+ entry = log_util.get_full_log(ngx, conf)
+ core.log.info("full log entry: ", core.json.delay_encode(entry))
end
- local log_buffer = buffers[entry.route_id]
-
if not stale_timer_running then
-- run the timer every 30 mins if any log is present
timer_at(1800, remove_stale_objects)
stale_timer_running = true
end
+ local log_buffer = buffers[conf]
if log_buffer then
log_buffer:push(entry)
return
@@ -138,7 +145,10 @@ function _M.log(conf)
local func = function(entries, batch_max_size)
local data, err
if batch_max_size == 1 then
- data, err = core.json.encode(entries[1]) -- encode as single {}
+ data = entries[1]
+ if type(data) ~= "string" then
+ data, err = core.json.encode(data) -- encode as single {}
+ end
else
data, err = core.json.encode(entries) -- encode as array [{}]
end
@@ -147,6 +157,7 @@ function _M.log(conf)
return false, 'error occurred while encoding the data: ' .. err
end
+ core.log.info("send data to kafka: ", data)
return send_kafka_data(conf, data)
end
@@ -167,7 +178,7 @@ function _M.log(conf)
return
end
- buffers[entry.route_id] = log_buffer
+ buffers[conf] = log_buffer
log_buffer:push(entry)
end
diff --git a/apisix/utils/log-util.lua b/apisix/utils/log-util.lua
index b11a435..131d9b2 100644
--- a/apisix/utils/log-util.lua
+++ b/apisix/utils/log-util.lua
@@ -14,10 +14,13 @@
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
-local core = require("apisix.core")
+local core = require("apisix.core")
+local ngx = ngx
+local pairs = pairs
local _M = {}
+
local function get_full_log(ngx, conf)
local ctx = ngx.ctx.api_ctx
local var = ctx.var
@@ -71,6 +74,25 @@ local function get_full_log(ngx, conf)
return log
end
-
_M.get_full_log = get_full_log
+
+
+function _M.get_req_original(ctx, conf)
+ local headers = {
+ ctx.var.request, "\r\n"
+ }
+ for k, v in pairs(ngx.req.get_headers()) do
+ core.table.insert_tail(headers, k, ": ", v, "\r\n")
+ end
+ -- core.log.error("headers: ", core.table.concat(headers, ""))
+ core.table.insert(headers, "\r\n")
+
+ if conf.include_req_body then
+ core.table.insert(headers, ctx.var.request_body)
+ end
+
+ return core.table.concat(headers, "")
+end
+
+
return _M
diff --git a/doc/plugins/kafka-logger.md b/doc/plugins/kafka-logger.md
index d0b7380..9b6bb47 100644
--- a/doc/plugins/kafka-logger.md
+++ b/doc/plugins/kafka-logger.md
@@ -20,6 +20,7 @@
- [中文](../zh-cn/plugins/kafka-logger.md)
# Summary
+
- [**Name**](#name)
- [**Attributes**](#attributes)
- [**Info**](#info)
@@ -27,7 +28,6 @@
- [**Test Plugin**](#test-plugin)
- [**Disable Plugin**](#disable-plugin)
-
## Name
`kafka-logger` is a plugin which works as a Kafka client driver for the ngx_lua nginx module.
@@ -48,6 +48,7 @@ For more info on Batch-Processor in Apache APISIX please refer.
| key | string | required | | | Key for the message. |
| timeout | integer | optional | 3 | [1,...] | Timeout for the upstream to send data. |
| name | string | optional | "kafka logger" | | A unique identifier to identity the batch processor |
+| meta_format | string | optional | "default" | enum: `default`, `origin`| `default`: collect the request information with detfault JSON way. `origin`: collect the request information with original HTTP request. [example](#examples-of-meta_format)|
| batch_max_size | integer | optional | 1000 | [1,...] | Max size of each batch |
| inactive_timeout | integer | optional | 5 | [1,...] | Maximum age in seconds when the buffer will be flushed if inactive |
| buffer_duration | integer | optional | 60 | [1,...] | Maximum age in seconds of the oldest entry in a batch before the batch must be processed |
@@ -55,6 +56,25 @@ For more info on Batch-Processor in Apache APISIX please refer.
| retry_delay | integer | optional | 1 | [0,...] | Number of seconds the process execution should be delayed if the execution fails |
| include_req_body | boolean | optional | false | | Whether to include the request body |
+### examples of meta_format
+
+- **default**:
+
+ ```json
+ {"upstream":"127.0.0.1:1980","start_time":1602211788041,"client_ip":"127.0.0.1","service_id":"","route_id":"1","request":{"querystring":{"ab":"cd"},"size":90,"uri":"\/hello?ab=cd","url":"http:\/\/localhost:1984\/hello?ab=cd","headers":{"host":"localhost","content-length":"6","connection":"close"},"body":"abcdef","method":"GET"},"response":{"headers":{"content-type":"text\/plain","server":"APISIX\/1.5","connection":"close","transfer-encoding":"chunked"},"status":200,"size":153},"laten [...]
+ ```
+
+- **origin**:
+
+ ```http
+ GET /hello?ab=cd HTTP/1.1
+ host: localhost
+ content-length: 6
+ connection: close
+
+ abcdef
+ ```
+
## Info
The `message` will write to the buffer first.
@@ -64,7 +84,7 @@ or every `buffer_duration` flush the buffer.
In case of success, returns `true`.
In case of errors, returns `nil` with a string describing the error (`buffer overflow`).
-##### Sample broker list
+### Sample broker list
This plugin supports to push in to more than one broker at a time. Specify the brokers of the external kafka servers as below
sample to take effect of this functionality.
@@ -107,7 +127,7 @@ curl http://127.0.0.1:9080/apisix/admin/routes/5 -H 'X-API-KEY: edd1c9f034335f13
## Test Plugin
-* success:
+*success:
```shell
$ curl -i http://127.0.0.1:9080/hello
diff --git a/doc/zh-cn/plugins/kafka-logger.md b/doc/zh-cn/plugins/kafka-logger.md
index d19936e..48b5adb 100644
--- a/doc/zh-cn/plugins/kafka-logger.md
+++ b/doc/zh-cn/plugins/kafka-logger.md
@@ -20,6 +20,7 @@
- [English](../../plugins/kafka-logger.md)
# 目录
+
- [**简介**](#简介)
- [**属性**](#属性)
- [**工作原理**](#工作原理)
@@ -45,6 +46,7 @@
| key | string | 必须 | | | 发送数据的超时时间。 |
| timeout | integer | 可选 | 3 | [1,...] | 用于加密消息的密钥。 |
| name | string | 可选 | "kafka logger" | | batch processor 的唯一标识。 |
+| meta_format | string | 可选 | "default" | 枚举:`default`,`origin`| `default`:获取请求信息以默认的 JSON 编码方式。`origin`:获取请求信息以 HTTP 原始请求方式。[具体示例](#meta_format-参考示例)|
| batch_max_size | integer | 可选 | 1000 | [1,...] | 每批的最大大小 |
| inactive_timeout | integer | 可选 | 5 | [1,...] | 刷新缓冲区的最大时间(以秒为单位) |
| buffer_duration | integer | 可选 | 60 | [1,...] | 必须先处理批次中最旧条目的最长期限(以秒为单位) |
@@ -52,16 +54,35 @@
| retry_delay | integer | 可选 | 1 | [0,...] | 如果执行失败,则应延迟执行流程的秒数 |
| include_req_body | boolean | 可选 | | | 是否包括请求 body |
+### meta_format 参考示例
+
+- **default**:
+
+ ```json
+ {"upstream":"127.0.0.1:1980","start_time":1602211788041,"client_ip":"127.0.0.1","service_id":"","route_id":"1","request":{"querystring":{"ab":"cd"},"size":90,"uri":"\/hello?ab=cd","url":"http:\/\/localhost:1984\/hello?ab=cd","headers":{"host":"localhost","content-length":"6","connection":"close"},"body":"abcdef","method":"GET"},"response":{"headers":{"content-type":"text\/plain","server":"APISIX\/1.5","connection":"close","transfer-encoding":"chunked"},"status":200,"size":153},"laten [...]
+ ```
+
+- **origin**:
+
+ ```http
+ GET /hello?ab=cd HTTP/1.1
+ host: localhost
+ content-length: 6
+ connection: close
+
+ abcdef
+ ```
+
## 工作原理
消息将首先写入缓冲区。
-当缓冲区超过`batch_max_size`时,它将发送到kafka服务器,
+当缓冲区超过`batch_max_size`时,它将发送到 kafka 服务器,
或每个`buffer_duration`刷新缓冲区。
-如果成功,则返回“ true”。
-如果出现错误,则返回“ nil”,并带有描述错误的字符串(`buffer overflow`)。
+如果成功,则返回 `true`。
+如果出现错误,则返回 `nil`,并带有描述错误的字符串(`buffer overflow`)。
-##### Broker 列表
+### Broker 列表
插件支持一次推送到多个 Broker,如下配置:
@@ -101,7 +122,7 @@ curl http://127.0.0.1:9080/apisix/admin/routes/5 -H 'X-API-KEY: edd1c9f034335f13
## 测试插件
-* 成功:
+ 成功
```shell
$ curl -i http://127.0.0.1:9080/hello
@@ -112,7 +133,7 @@ hello, world
## 禁用插件
-当您要禁用`kafka-logger`插件时,这很简单,您可以在插件配置中删除相应的json配置,无需重新启动服务,它将立即生效:
+当您要禁用`kafka-logger`插件时,这很简单,您可以在插件配置中删除相应的 json 配置,无需重新启动服务,它将立即生效:
```shell
$ curl http://127.0.0.1:2379/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d value='
diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t
index a4e0814..945cccf 100644
--- a/t/plugin/kafka-logger.t
+++ b/t/plugin/kafka-logger.t
@@ -101,7 +101,7 @@ done
-=== TEST 4: add plugin
+=== TEST 4: set route(id: 1)
--- config
location /t {
content_by_lua_block {
@@ -255,3 +255,187 @@ GET /t
failed to send data to Kafka topic
[error]
--- wait: 1
+
+
+
+=== TEST 7: set route(meta_format = origin, include_req_body = true)
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "plugins": {
+ "kafka-logger": {
+ "broker_list" : {
+ "127.0.0.1":9092
+ },
+ "kafka_topic" : "test2",
+ "key" : "key1",
+ "timeout" : 1,
+ "batch_max_size": 1,
+ "include_req_body": true,
+ "meta_format": "origin"
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1980": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/hello"
+ }]]
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- request
+GET /t
+--- response_body
+passed
+--- no_error_log
+[error]
+
+
+
+=== TEST 8: hit route, report log to kafka
+--- request
+GET /hello?ab=cd
+abcdef
+--- response_body
+hello world
+--- no_error_log
+[error]
+--- error_log
+send data to kafka: GET /hello?ab=cd HTTP/1.1
+host: localhost
+content-length: 6
+connection: close
+
+abcdef
+--- wait: 2
+
+
+
+=== TEST 9: set route(meta_format = origin, include_req_body = false)
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "plugins": {
+ "kafka-logger": {
+ "broker_list" : {
+ "127.0.0.1":9092
+ },
+ "kafka_topic" : "test2",
+ "key" : "key1",
+ "timeout" : 1,
+ "batch_max_size": 1,
+ "include_req_body": false,
+ "meta_format": "origin"
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1980": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/hello"
+ }]]
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- request
+GET /t
+--- response_body
+passed
+--- no_error_log
+[error]
+
+
+
+=== TEST 10: hit route, report log to kafka
+--- request
+GET /hello?ab=cd
+abcdef
+--- response_body
+hello world
+--- no_error_log
+[error]
+--- error_log
+send data to kafka: GET /hello?ab=cd HTTP/1.1
+host: localhost
+content-length: 6
+connection: close
+--- wait: 2
+
+
+
+=== TEST 11: set route(meta_format = default)
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "plugins": {
+ "kafka-logger": {
+ "broker_list" : {
+ "127.0.0.1":9092
+ },
+ "kafka_topic" : "test2",
+ "key" : "key1",
+ "timeout" : 1,
+ "batch_max_size": 1,
+ "include_req_body": false
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1980": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/hello"
+ }]]
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- request
+GET /t
+--- response_body
+passed
+--- no_error_log
+[error]
+
+
+
+=== TEST 12: hit route, report log to kafka
+--- request
+GET /hello?ab=cd
+abcdef
+--- response_body
+hello world
+--- no_error_log
+[error]
+--- error_log_like eval
+qr/send data to kafka: \{.*"upstream":"127.0.0.1:1980"/
+--- wait: 2