You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by sp...@apache.org on 2022/10/08 11:34:27 UTC

[apisix] branch master updated: feat: add brokers field to support set broker with the same host in kafka-logger plugin (#7999)

This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new e9ec16f2e feat: add brokers field to support set broker with the same host in kafka-logger plugin (#7999)
e9ec16f2e is described below

commit e9ec16f2e7ddb3142fdb2c90991736f73f499804
Author: Peter Zhu <st...@gmail.com>
AuthorDate: Sat Oct 8 19:34:19 2022 +0800

    feat: add brokers field to support set broker with the same host in kafka-logger plugin (#7999)
    
    Co-authored-by: Sylvia <39...@users.noreply.github.com>
---
 apisix/plugins/kafka-logger.lua        |  43 +++++++++--
 docs/en/latest/plugins/kafka-logger.md |  28 ++++---
 docs/zh/latest/plugins/kafka-logger.md |  29 ++++---
 t/plugin/kafka-logger.t                |   2 +-
 t/plugin/kafka-logger2.t               | 134 +++++++++++++++++++++++++++++++++
 5 files changed, 209 insertions(+), 27 deletions(-)

diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua
index cb43ae3db..f1489502b 100644
--- a/apisix/plugins/kafka-logger.lua
+++ b/apisix/plugins/kafka-logger.lua
@@ -39,6 +39,7 @@ local schema = {
             default = "default",
             enum = {"default", "origin"},
         },
+        -- deprecated, use "brokers" instead
         broker_list = {
             type = "object",
             minProperties = 1,
@@ -51,6 +52,27 @@ local schema = {
                 },
             },
         },
+        brokers = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "object",
+                properties = {
+                    host = {
+                        type = "string",
+                        description = "the host of kafka broker",
+                    },
+                    port = {
+                        type = "integer",
+                        minimum = 1,
+                        maximum = 65535,
+                        description = "the port of kafka broker",
+                    },
+                },
+                required = {"host", "port"},
+            },
+            uniqueItems = true,
+        },
         kafka_topic = {type = "string"},
         producer_type = {
             type = "string",
@@ -89,7 +111,10 @@ local schema = {
         producer_max_buffering = {type = "integer", minimum = 1, default = 50000},
         producer_time_linger = {type = "integer", minimum = 1, default = 1}
     },
-    required = {"broker_list", "kafka_topic"}
+    oneOf = {
+        { required = {"broker_list", "kafka_topic"},},
+        { required = {"brokers", "kafka_topic"},},
+    }
 }
 
 local metadata_schema = {
@@ -199,15 +224,17 @@ function _M.log(conf, ctx)
     end
 
     -- reuse producer via lrucache to avoid unbalanced partitions of messages in kafka
-    local broker_list = core.table.new(core.table.nkeys(conf.broker_list), 0)
+    local broker_list = core.table.clone(conf.brokers or {})
     local broker_config = {}
 
-    for host, port in pairs(conf.broker_list) do
-        local broker = {
-            host = host,
-            port = port
-        }
-        core.table.insert(broker_list, broker)
+    if conf.broker_list then
+        for host, port in pairs(conf.broker_list) do
+            local broker = {
+                host = host,
+                port = port
+            }
+            core.table.insert(broker_list, broker)
+        end
     end
 
     broker_config["request_timeout"] = conf.timeout * 1000
diff --git a/docs/en/latest/plugins/kafka-logger.md b/docs/en/latest/plugins/kafka-logger.md
index c5cd6b935..5f007802a 100644
--- a/docs/en/latest/plugins/kafka-logger.md
+++ b/docs/en/latest/plugins/kafka-logger.md
@@ -37,7 +37,10 @@ It might take some time to receive the log data. It will be automatically sent a
 
 | Name                   | Type    | Required | Default        | Valid values          | Description                                                                                                                                                                                                                                                                                                                                      |
 | ---------------------- | ------- | -------- | -------------- | --------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
-| broker_list            | object  | True     |                |                       | List of Kafka brokers (nodes).                                                                                                                                                                                                                                                                                                                   |
+| broker_list            | object  | True     |                |                       | Deprecated, use `brokers` instead. List of Kafka brokers.  (nodes).                                                                                                                                                                                                                                                                                                                   |
+| brokers                | array   | True     |                |                       | List of Kafka brokers (nodes).                                                                                                                                                                                                                                                                                                                   |
+| brokers.host           | string  | True     |                |                       | The host of Kafka broker, e.g, `192.168.1.1`.                                                                                                                                                                                                                                                                                                                   |
+| brokers.port           | integer | True     |                |   [0, 65535]                  |  The port of Kafka broker                                                                                                                                                                                                                                                                                                                  |
 | kafka_topic            | string  | True     |                |                       | Target topic to push the logs for organisation.                                                                                                                                                                                                                                                                                                  |
 | producer_type          | string  | False    | async          | ["async", "sync"]     | Message sending mode of the producer.                                                                                                                                                                                                                                                                                                            |
 | required_acks          | integer | False    | 1              | [0, 1, -1]            | Number of acknowledgements the leader needs to receive for the producer to consider the request complete. This controls the durability of the sent records. The attribute follows the same configuration as the Kafka `acks` attribute. See [Apache Kafka documentation](https://kafka.apache.org/documentation/#producerconfigs_acks) for more. |
@@ -164,10 +167,12 @@ curl http://127.0.0.1:9180/apisix/admin/routes/5 -H 'X-API-KEY: edd1c9f034335f13
 {
     "plugins": {
        "kafka-logger": {
-           "broker_list" :
+           "brokers" : [
              {
-               "127.0.0.1":9092
-             },
+               "host" :"127.0.0.1",
+               "port" : 9092
+             }
+            ],
            "kafka_topic" : "test2",
            "key" : "key1",
            "batch_max_size": 1,
@@ -187,11 +192,16 @@ curl http://127.0.0.1:9180/apisix/admin/routes/5 -H 'X-API-KEY: edd1c9f034335f13
 This Plugin also supports pushing to more than one broker at a time. You can specify multiple brokers in the Plugin configuration as shown below:
 
 ```json
-"broker_list" :
-  {
-    "127.0.0.1":9092,
-    "127.0.0.1":9093
-  },
+ "brokers" : [
+    {
+      "host" :"127.0.0.1",
+      "port" : 9092
+    },
+    {
+      "host" :"127.0.0.1",
+      "port" : 9093
+    }
+],
 ```
 
 ## Example usage
diff --git a/docs/zh/latest/plugins/kafka-logger.md b/docs/zh/latest/plugins/kafka-logger.md
index ba8bf493d..e14e1ed62 100644
--- a/docs/zh/latest/plugins/kafka-logger.md
+++ b/docs/zh/latest/plugins/kafka-logger.md
@@ -35,7 +35,10 @@ description: API 网关 Apache APISIX 的 kafka-logger 插件用于将日志作
 
 | 名称                   | 类型    | 必选项 | 默认值          | 有效值                | 描述                                             |
 | ---------------------- | ------- | ------ | -------------- | --------------------- | ------------------------------------------------ |
-| broker_list            | object  | 是     |                |                       | 需要推送的 Kafka 的 broker 列表。                  |
+| broker_list            | object  | 是     |                |                       | 已废弃,现使用 `brokers` 属性代替。原指需要推送的 Kafka 的 broker 列表。                  |
+| brokers                | array   | 是     |                |                       | 需要推送的 Kafka 的 broker 列表。                   |
+| brokers.host           | string  | 是     |                |                       | Kafka broker 的节点 host 配置,例如 `192.168.1.1`                     |
+| brokers.port           | string  | 是     |                |                       | Kafka broker 的节点端口配置                         |
 | kafka_topic            | string  | 是     |                |                       | 需要推送的 topic。                                 |
 | producer_type          | string  | 否     | async          | ["async", "sync"]     | 生产者发送消息的模式。          |
 | required_acks          | integer | 否     | 1              | [0, 1, -1]            | 生产者在确认一个请求发送完成之前需要收到的反馈信息的数量。该参数是为了保证发送请求的可靠性。该属性的配置与 Kafka `acks` 属性相同,具体配置请参考 [Apache Kafka 文档](https://kafka.apache.org/documentation/#producerconfigs_acks)。  |
@@ -162,10 +165,12 @@ curl http://127.0.0.1:9180/apisix/admin/routes/1 \
 {
     "plugins": {
        "kafka-logger": {
-           "broker_list" :
-             {
-               "127.0.0.1":9092
-             },
+            "brokers" : [
+              {
+               "host": "127.0.0.1",
+               "port": 9092
+              }
+            ],
            "kafka_topic" : "test2",
            "key" : "key1"
        }
@@ -183,10 +188,16 @@ curl http://127.0.0.1:9180/apisix/admin/routes/1 \
 该插件还支持一次推送到多个 Broker,示例如下:
 
 ```json
-{
-    "127.0.0.1":9092,
-    "127.0.0.1":9093
-}
+"brokers" : [
+    {
+      "host" :"127.0.0.1",
+      "port" : 9092
+    },
+    {
+      "host" :"127.0.0.1",
+      "port" : 9093
+    }
+],
 ```
 
 ## 测试插件
diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t
index 5b894abce..218f14c2a 100644
--- a/t/plugin/kafka-logger.t
+++ b/t/plugin/kafka-logger.t
@@ -72,7 +72,7 @@ done
         }
     }
 --- response_body
-property "broker_list" is required
+value should match only one schema, but matches none
 done
 
 
diff --git a/t/plugin/kafka-logger2.t b/t/plugin/kafka-logger2.t
index 77bcbbff6..98179734b 100644
--- a/t/plugin/kafka-logger2.t
+++ b/t/plugin/kafka-logger2.t
@@ -340,6 +340,72 @@ qr/not found topic, retryable: true, topic: undefined_topic, partition_id: -1/
                         key= "key1",
                     },
                 },
+                {
+                    input = {
+                        brokers = {
+                        },
+                        kafka_topic = "test",
+                        key = "key1",
+                    },
+                },
+                {
+                    input = {
+                        brokers = {
+                            {
+                                host = "127.0.0.1",
+                            }
+                        },
+                        kafka_topic = "test",
+                        key = "key1",
+                    },
+                },
+                {
+                    input = {
+                        brokers = {
+                            {
+                                port = 9092,
+                            }
+                        },
+                        kafka_topic = "test",
+                        key = "key1",
+                    },
+                },
+                {
+                    input = {
+                        brokers = {
+                            {
+                                host = "127.0.0.1",
+                                port = "9093",
+                            },
+                        },
+                        kafka_topic = "test",
+                        key = "key1",
+                    },
+                },
+                {
+                    input = {
+                        brokers = {
+                            {
+                                host = "127.0.0.1",
+                                port = 0,
+                            },
+                        },
+                        kafka_topic = "test",
+                        key = "key1",
+                    },
+                },
+                {
+                    input = {
+                        brokers = {
+                            {
+                                host = "127.0.0.1",
+                                port = 65536,
+                            },
+                        },
+                        kafka_topic = "test",
+                        key = "key1",
+                    },
+                },
             }
 
             local plugin = require("apisix.plugins.kafka-logger")
@@ -361,6 +427,12 @@ property "broker_list" validation failed: expect object to have at least 1 prope
 property "broker_list" validation failed: failed to validate 127.0.0.1 (matching ".*"): wrong type: expected integer, got string
 property "broker_list" validation failed: failed to validate 127.0.0.1 (matching ".*"): expected 0 to be at least 1
 property "broker_list" validation failed: failed to validate 127.0.0.1 (matching ".*"): expected 65536 to be at most 65535
+property "brokers" validation failed: expect array to have at least 1 items
+property "brokers" validation failed: failed to validate item 1: property "port" is required
+property "brokers" validation failed: failed to validate item 1: property "host" is required
+property "brokers" validation failed: failed to validate item 1: property "port" validation failed: wrong type: expected integer, got string
+property "brokers" validation failed: failed to validate item 1: property "port" validation failed: expected 0 to be at least 1
+property "brokers" validation failed: failed to validate item 1: property "port" validation failed: expected 65536 to be at most 65535
 
 
 
@@ -715,3 +787,65 @@ hello world
 [qr/send data to kafka: \{.*"body":"abcdef"/,
 qr/send data to kafka: \{.*"body":"hello world\\n"/]
 --- wait: 2
+
+
+
+=== TEST 20: update route(id: 1,include_req_body = true,include_req_body_expr = array)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [=[{
+                        "plugins": {
+                            "kafka-logger": {
+                                "brokers" :
+                                  [{
+                                    "host":"127.0.0.1",
+                                    "port": 9092
+                                  }],
+                                "kafka_topic" : "test2",
+                                "key" : "key1",
+                                "timeout" : 1,
+                                "include_req_body": true,
+                                "include_req_body_expr": [
+                                    [
+                                      "arg_name",
+                                      "==",
+                                      "qwerty"
+                                    ]
+                                ],
+                                "batch_max_size": 1
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]=]
+                )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+
+--- response_body
+passed
+
+
+
+=== TEST 21: hit route, expr eval success
+--- request
+POST /hello?name=qwerty
+abcdef
+--- response_body
+hello world
+--- error_log eval
+qr/send data to kafka: \{.*"body":"abcdef"/
+--- wait: 2