You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by to...@apache.org on 2021/04/14 08:15:05 UTC

[apisix] branch master updated: plugin(kafka-logger): send logs in async mode by default (#4035)

This is an automated email from the ASF dual-hosted git repository.

tokers pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new 7029bb9  plugin(kafka-logger): send logs in async mode by default (#4035)
7029bb9 is described below

commit 7029bb9cc6381effd9f71f6b06de22c29bba5964
Author: yejingx <ye...@gmail.com>
AuthorDate: Wed Apr 14 16:14:54 2021 +0800

    plugin(kafka-logger): send logs in async mode by default (#4035)
---
 apisix/plugins/kafka-logger.lua        | 25 ++++++++++-
 docs/en/latest/plugins/kafka-logger.md |  1 +
 docs/zh/latest/plugins/kafka-logger.md |  1 +
 t/plugin/kafka-logger.t                | 77 ++++++++++++++++++++++++++++++++++
 4 files changed, 102 insertions(+), 2 deletions(-)

diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua
index 9bc8103..7cc46a9 100644
--- a/apisix/plugins/kafka-logger.lua
+++ b/apisix/plugins/kafka-logger.lua
@@ -18,6 +18,7 @@ local core     = require("apisix.core")
 local log_util = require("apisix.utils.log-util")
 local producer = require ("resty.kafka.producer")
 local batch_processor = require("apisix.utils.batch-processor")
+local math     = math
 local pairs    = pairs
 local type     = type
 local ipairs   = ipairs
@@ -44,6 +45,11 @@ local schema = {
             type = "object"
         },
         kafka_topic = {type = "string"},
+        producer_type = {
+            type = "string",
+            default = "async",
+            enum = {"async", "sync"},
+        },
         key = {type = "string"},
         timeout = {type = "integer", minimum = 1, default = 3},
         name = {type = "string", default = "kafka logger"},
@@ -70,7 +76,21 @@ function _M.check_schema(conf)
 end
 
 
-local function get_partition_id(sendbuffer, topic, log_message)
+local function get_partition_id(prod, topic, log_message)
+    if prod.async then
+        local ringbuffer = prod.ringbuffer
+        for i = 1, ringbuffer.size, 3 do
+            if ringbuffer.queue[i] == topic and
+                ringbuffer.queue[i+2] == log_message then
+                return math.floor(i / 3)
+            end
+        end
+        core.log.info("current topic in ringbuffer has no message")
+        return nil
+    end
+
+    -- sync mode
+    local sendbuffer = prod.sendbuffer
     if not sendbuffer.topics[topic] then
         core.log.info("current topic in sendbuffer has no message")
         return nil
@@ -115,7 +135,7 @@ local function send_kafka_data(conf, log_message, prod)
     local ok, err = prod:send(conf.kafka_topic, conf.key, log_message)
     core.log.info("partition_id: ",
                   core.log.delay_exec(get_partition_id,
-                                      prod.sendbuffer, conf.kafka_topic, log_message))
+                                      prod, conf.kafka_topic, log_message))
 
     if not ok then
         return nil, "failed to send data to Kafka topic: " .. err
@@ -164,6 +184,7 @@ function _M.log(conf, ctx)
     end
 
     broker_config["request_timeout"] = conf.timeout * 1000
+    broker_config["producer_type"] = conf.producer_type
 
     local prod, err = core.lrucache.plugin_ctx(lrucache, ctx, nil, create_producer,
                                                broker_list, broker_config)
diff --git a/docs/en/latest/plugins/kafka-logger.md b/docs/en/latest/plugins/kafka-logger.md
index ae7b301..e894117 100644
--- a/docs/en/latest/plugins/kafka-logger.md
+++ b/docs/en/latest/plugins/kafka-logger.md
@@ -47,6 +47,7 @@ For more info on Batch-Processor in Apache APISIX please refer.
 | ---------------- | ------- | ----------- | -------------- | ------- | ---------------------------------------------------------------------------------------- |
 | broker_list      | object  | required    |                |         | An array of Kafka brokers.                                                               |
 | kafka_topic      | string  | required    |                |         | Target  topic to push data.                                                              |
+| producer_type    | string  | optional    | async          | ["async", "sync"]        | Producer's mode of sending messages.          |
 | key              | string  | optional    |                |         | Used for partition allocation of messages.                                               |
 | timeout          | integer | optional    | 3              | [1,...] | Timeout for the upstream to send data.                                                   |
 | name             | string  | optional    | "kafka logger" |         | A  unique identifier to identity the batch processor.                                     |
diff --git a/docs/zh/latest/plugins/kafka-logger.md b/docs/zh/latest/plugins/kafka-logger.md
index 2bfcd5f..226faca 100644
--- a/docs/zh/latest/plugins/kafka-logger.md
+++ b/docs/zh/latest/plugins/kafka-logger.md
@@ -45,6 +45,7 @@ title: kafka-logger
 | ---------------- | ------- | ------ | -------------- | ------- | ------------------------------------------------ |
 | broker_list      | object  | 必须   |                |         | 要推送的 kafka 的 broker 列表。                  |
 | kafka_topic      | string  | 必须   |                |         | 要推送的 topic。                                 |
+| producer_type    | string  | 可选   | async          | ["async", "sync"]        | 生产者发送消息的模式。          |
 | key              | string  | 可选   |                |         | 用于消息的分区分配。                             |
 | timeout          | integer | 可选   | 3              | [1,...] | 发送数据的超时时间。                             |
 | name             | string  | 可选   | "kafka logger" |         | batch processor 的唯一标识。                     |
diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t
index 7aa2e3b..46a520c 100644
--- a/t/plugin/kafka-logger.t
+++ b/t/plugin/kafka-logger.t
@@ -199,6 +199,7 @@ hello world
                                         "127.0.0.1":9093
                                       },
                                     "kafka_topic" : "test2",
+                                    "producer_type": "sync",
                                     "key" : "key1",
                                     "batch_max_size": 1
                              }
@@ -222,6 +223,7 @@ hello world
                                         "127.0.0.1":9093
                                       },
                                     "kafka_topic" : "test2",
+                                    "producer_type": "sync",
                                     "key" : "key1",
                                     "batch_max_size": 1
                                 }
@@ -627,6 +629,81 @@ passed
     location /t {
         content_by_lua_block {
             local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "plugins": {
+                            "kafka-logger": {
+                                "broker_list" : {
+                                    "127.0.0.1": 9092
+                                },
+                                "kafka_topic" : "test3",
+                                "producer_type": "sync",
+                                "timeout" : 1,
+                                "batch_max_size": 1,
+                                "include_req_body": false
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]]
+                )
+
+            t('/hello',ngx.HTTP_GET)
+            ngx.sleep(0.5)
+            t('/hello',ngx.HTTP_GET)
+            ngx.sleep(0.5)
+            t('/hello',ngx.HTTP_GET)
+            ngx.sleep(0.5)
+        }
+    }
+--- request
+GET /t
+--- timeout: 5s
+--- ignore_response
+--- no_error_log
+[error]
+--- error_log eval
+[qr/partition_id: 1/,
+qr/partition_id: 0/,
+qr/partition_id: 2/]
+
+
+
+=== TEST 19: report log to kafka by different partitions in async mode
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "plugins": {
+                            "kafka-logger": {
+                                "broker_list" : {
+                                    "127.0.0.1": 9092
+                                },
+                                "kafka_topic" : "test3",
+                                "producer_type": "async",
+                                "timeout" : 1,
+                                "batch_max_size": 1,
+                                "include_req_body": false
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]]
+                )
             t('/hello',ngx.HTTP_GET)
             ngx.sleep(0.5)
             t('/hello',ngx.HTTP_GET)