You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by to...@apache.org on 2021/04/14 08:15:05 UTC
[apisix] branch master updated: plugin(kafka-logger): send logs in
async mode by default (#4035)
This is an automated email from the ASF dual-hosted git repository.
tokers pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new 7029bb9 plugin(kafka-logger): send logs in async mode by default (#4035)
7029bb9 is described below
commit 7029bb9cc6381effd9f71f6b06de22c29bba5964
Author: yejingx <ye...@gmail.com>
AuthorDate: Wed Apr 14 16:14:54 2021 +0800
plugin(kafka-logger): send logs in async mode by default (#4035)
---
apisix/plugins/kafka-logger.lua | 25 ++++++++++-
docs/en/latest/plugins/kafka-logger.md | 1 +
docs/zh/latest/plugins/kafka-logger.md | 1 +
t/plugin/kafka-logger.t | 77 ++++++++++++++++++++++++++++++++++
4 files changed, 102 insertions(+), 2 deletions(-)
diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua
index 9bc8103..7cc46a9 100644
--- a/apisix/plugins/kafka-logger.lua
+++ b/apisix/plugins/kafka-logger.lua
@@ -18,6 +18,7 @@ local core = require("apisix.core")
local log_util = require("apisix.utils.log-util")
local producer = require ("resty.kafka.producer")
local batch_processor = require("apisix.utils.batch-processor")
+local math = math
local pairs = pairs
local type = type
local ipairs = ipairs
@@ -44,6 +45,11 @@ local schema = {
type = "object"
},
kafka_topic = {type = "string"},
+ producer_type = {
+ type = "string",
+ default = "async",
+ enum = {"async", "sync"},
+ },
key = {type = "string"},
timeout = {type = "integer", minimum = 1, default = 3},
name = {type = "string", default = "kafka logger"},
@@ -70,7 +76,21 @@ function _M.check_schema(conf)
end
-local function get_partition_id(sendbuffer, topic, log_message)
+local function get_partition_id(prod, topic, log_message)
+ if prod.async then
+ local ringbuffer = prod.ringbuffer
+ for i = 1, ringbuffer.size, 3 do
+ if ringbuffer.queue[i] == topic and
+ ringbuffer.queue[i+2] == log_message then
+ return math.floor(i / 3)
+ end
+ end
+ core.log.info("current topic in ringbuffer has no message")
+ return nil
+ end
+
+ -- sync mode
+ local sendbuffer = prod.sendbuffer
if not sendbuffer.topics[topic] then
core.log.info("current topic in sendbuffer has no message")
return nil
@@ -115,7 +135,7 @@ local function send_kafka_data(conf, log_message, prod)
local ok, err = prod:send(conf.kafka_topic, conf.key, log_message)
core.log.info("partition_id: ",
core.log.delay_exec(get_partition_id,
- prod.sendbuffer, conf.kafka_topic, log_message))
+ prod, conf.kafka_topic, log_message))
if not ok then
return nil, "failed to send data to Kafka topic: " .. err
@@ -164,6 +184,7 @@ function _M.log(conf, ctx)
end
broker_config["request_timeout"] = conf.timeout * 1000
+ broker_config["producer_type"] = conf.producer_type
local prod, err = core.lrucache.plugin_ctx(lrucache, ctx, nil, create_producer,
broker_list, broker_config)
diff --git a/docs/en/latest/plugins/kafka-logger.md b/docs/en/latest/plugins/kafka-logger.md
index ae7b301..e894117 100644
--- a/docs/en/latest/plugins/kafka-logger.md
+++ b/docs/en/latest/plugins/kafka-logger.md
@@ -47,6 +47,7 @@ For more info on Batch-Processor in Apache APISIX please refer.
| ---------------- | ------- | ----------- | -------------- | ------- | ---------------------------------------------------------------------------------------- |
| broker_list | object | required | | | An array of Kafka brokers. |
| kafka_topic | string | required | | | Target topic to push data. |
+| producer_type | string | optional | async | ["async", "sync"] | Producer's mode of sending messages. |
| key | string | optional | | | Used for partition allocation of messages. |
| timeout | integer | optional | 3 | [1,...] | Timeout for the upstream to send data. |
| name | string | optional | "kafka logger" | | A unique identifier to identity the batch processor. |
diff --git a/docs/zh/latest/plugins/kafka-logger.md b/docs/zh/latest/plugins/kafka-logger.md
index 2bfcd5f..226faca 100644
--- a/docs/zh/latest/plugins/kafka-logger.md
+++ b/docs/zh/latest/plugins/kafka-logger.md
@@ -45,6 +45,7 @@ title: kafka-logger
| ---------------- | ------- | ------ | -------------- | ------- | ------------------------------------------------ |
| broker_list | object | 必须 | | | 要推送的 kafka 的 broker 列表。 |
| kafka_topic | string | 必须 | | | 要推送的 topic。 |
+| producer_type | string | 可选 | async | ["async", "sync"] | 生产者发送消息的模式。 |
| key | string | 可选 | | | 用于消息的分区分配。 |
| timeout | integer | 可选 | 3 | [1,...] | 发送数据的超时时间。 |
| name | string | 可选 | "kafka logger" | | batch processor 的唯一标识。 |
diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t
index 7aa2e3b..46a520c 100644
--- a/t/plugin/kafka-logger.t
+++ b/t/plugin/kafka-logger.t
@@ -199,6 +199,7 @@ hello world
"127.0.0.1":9093
},
"kafka_topic" : "test2",
+ "producer_type": "sync",
"key" : "key1",
"batch_max_size": 1
}
@@ -222,6 +223,7 @@ hello world
"127.0.0.1":9093
},
"kafka_topic" : "test2",
+ "producer_type": "sync",
"key" : "key1",
"batch_max_size": 1
}
@@ -627,6 +629,81 @@ passed
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "plugins": {
+ "kafka-logger": {
+ "broker_list" : {
+ "127.0.0.1": 9092
+ },
+ "kafka_topic" : "test3",
+ "producer_type": "sync",
+ "timeout" : 1,
+ "batch_max_size": 1,
+ "include_req_body": false
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1980": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/hello"
+ }]]
+ )
+
+ t('/hello',ngx.HTTP_GET)
+ ngx.sleep(0.5)
+ t('/hello',ngx.HTTP_GET)
+ ngx.sleep(0.5)
+ t('/hello',ngx.HTTP_GET)
+ ngx.sleep(0.5)
+ }
+ }
+--- request
+GET /t
+--- timeout: 5s
+--- ignore_response
+--- no_error_log
+[error]
+--- error_log eval
+[qr/partition_id: 1/,
+qr/partition_id: 0/,
+qr/partition_id: 2/]
+
+
+
+=== TEST 19: report log to kafka by different partitions in async mode
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "plugins": {
+ "kafka-logger": {
+ "broker_list" : {
+ "127.0.0.1": 9092
+ },
+ "kafka_topic" : "test3",
+ "producer_type": "async",
+ "timeout" : 1,
+ "batch_max_size": 1,
+ "include_req_body": false
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1980": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/hello"
+ }]]
+ )
t('/hello',ngx.HTTP_GET)
ngx.sleep(0.5)
t('/hello',ngx.HTTP_GET)