You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by sp...@apache.org on 2022/10/11 05:40:37 UTC

[apisix] branch master updated: feat(kafka-logger): support sasl config in brokers (#8050)

This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new c201d72e3 feat(kafka-logger): support sasl config in brokers (#8050)
c201d72e3 is described below

commit c201d72e398a08d3e36695f89576410afe541365
Author: Peter Zhu <st...@gmail.com>
AuthorDate: Tue Oct 11 13:40:30 2022 +0800

    feat(kafka-logger): support sasl config in brokers (#8050)
    
    Co-authored-by: 罗泽轩 <sp...@gmail.com>
    Co-authored-by: biubiue <hi...@gmail.com>
---
 apisix/plugins/kafka-logger.lua           |  16 ++++-
 ci/pod/docker-compose.plugin.yml          |   5 +-
 ci/pod/kafka/kafka-server/env/common2.env |   8 +++
 docs/en/latest/plugins/kafka-logger.md    |   4 ++
 docs/zh/latest/plugins/kafka-logger.md    |   4 ++
 t/plugin/kafka-logger.t                   | 116 ++++++++++++++++++++++++++++++
 t/plugin/kafka-logger2.t                  |  50 +++++++++++++
 7 files changed, 201 insertions(+), 2 deletions(-)

diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua
index f1489502b..ee8453e26 100644
--- a/apisix/plugins/kafka-logger.lua
+++ b/apisix/plugins/kafka-logger.lua
@@ -68,6 +68,20 @@ local schema = {
                         maximum = 65535,
                         description = "the port of kafka broker",
                     },
+                    sasl_config = {
+                        type = "object",
+                        description = "sasl config",
+                        properties = {
+                            mechanism = {
+                                type = "string",
+                                default = "PLAIN",
+                                enum = {"PLAIN"},
+                            },
+                            user = { type = "string", description = "user" },
+                            password =  { type = "string", description = "password" },
+                        },
+                        required = {"user", "password"},
+                    },
                 },
                 required = {"host", "port"},
             },
@@ -109,7 +123,7 @@ local schema = {
         producer_batch_num = {type = "integer", minimum = 1, default = 200},
         producer_batch_size = {type = "integer", minimum = 0, default = 1048576},
         producer_max_buffering = {type = "integer", minimum = 1, default = 50000},
-        producer_time_linger = {type = "integer", minimum = 1, default = 1}
+        producer_time_linger = {type = "integer", minimum = 1, default = 1},
     },
     oneOf = {
         { required = {"broker_list", "kafka_topic"},},
diff --git a/ci/pod/docker-compose.plugin.yml b/ci/pod/docker-compose.plugin.yml
index 4c0c4cb7e..6715c7a33 100644
--- a/ci/pod/docker-compose.plugin.yml
+++ b/ci/pod/docker-compose.plugin.yml
@@ -104,17 +104,20 @@ services:
   kafka-server2:
     image: bitnami/kafka:2.8.1
     env_file:
-      - ci/pod/kafka/kafka-server/env/common.env
+      - ci/pod/kafka/kafka-server/env/common2.env
     environment:
       KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper-server2:2181
     restart: unless-stopped
     ports:
       - "19092:9092"
+      - "19094:9094"
     depends_on:
       - zookeeper-server1
       - zookeeper-server2
     networks:
       kafka_net:
+    volumes:
+      - ./ci/pod/kafka/kafka-server/kafka_jaas.conf:/opt/bitnami/kafka/config/kafka_jaas.conf:ro
 
   ## SkyWalking
   skywalking:
diff --git a/ci/pod/kafka/kafka-server/env/common2.env b/ci/pod/kafka/kafka-server/env/common2.env
new file mode 100644
index 000000000..d07bf6d1a
--- /dev/null
+++ b/ci/pod/kafka/kafka-server/env/common2.env
@@ -0,0 +1,8 @@
+ALLOW_PLAINTEXT_LISTENER=yes
+KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
+KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,SASL_PLAINTEXT://0.0.0.0:9094
+KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092,SASL_PLAINTEXT://127.0.0.1:9094
+KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=
+KAFKA_CFG_SSL_KEYSTORE_LOCATION=/opt/bitnami/kafka/config/certs/kafka.keystore.jks
+KAFKA_CFG_SSL_KEYSTORE_PASSWORD=changeit
+KAFKA_CFG_SSL_KEY_PASSWORD=changeit
diff --git a/docs/en/latest/plugins/kafka-logger.md b/docs/en/latest/plugins/kafka-logger.md
index 5f007802a..2f49108a7 100644
--- a/docs/en/latest/plugins/kafka-logger.md
+++ b/docs/en/latest/plugins/kafka-logger.md
@@ -41,6 +41,10 @@ It might take some time to receive the log data. It will be automatically sent a
 | brokers                | array   | True     |                |                       | List of Kafka brokers (nodes).                                                                                                                                                                                                                                                                                                                   |
 | brokers.host           | string  | True     |                |                       | The host of Kafka broker, e.g, `192.168.1.1`.                                                                                                                                                                                                                                                                                                                   |
 | brokers.port           | integer | True     |                |   [0, 65535]                  |  The port of Kafka broker                                                                                                                                                                                                                                                                                                                  |
+| brokers.sasl_config    | object  | False    |                |                               |  The sasl config of Kafka broker                                                                                                                                                                                                                                                                                                                 |
+| brokers.sasl_config.mechanism  | string  | False    | "PLAIN"          | ["PLAIN"]           |     The mechaism of sasl config                                                                                                                                                                                                                                                                                                             |
+| brokers.sasl_config.user       | string  | True     |                  |                     |  The user of sasl_config. If sasl_config exists, it's required.                                                                                                                                                                                                                                                                                             |
+| brokers.sasl_config.password   | string  | True     |                  |                     | The password of sasl_config. If sasl_config exists, it's required.                                                                                                                                                                                                                                                                                                 |
 | kafka_topic            | string  | True     |                |                       | Target topic to push the logs for organisation.                                                                                                                                                                                                                                                                                                  |
 | producer_type          | string  | False    | async          | ["async", "sync"]     | Message sending mode of the producer.                                                                                                                                                                                                                                                                                                            |
 | required_acks          | integer | False    | 1              | [0, 1, -1]            | Number of acknowledgements the leader needs to receive for the producer to consider the request complete. This controls the durability of the sent records. The attribute follows the same configuration as the Kafka `acks` attribute. See [Apache Kafka documentation](https://kafka.apache.org/documentation/#producerconfigs_acks) for more. |
diff --git a/docs/zh/latest/plugins/kafka-logger.md b/docs/zh/latest/plugins/kafka-logger.md
index e14e1ed62..5a6fd987b 100644
--- a/docs/zh/latest/plugins/kafka-logger.md
+++ b/docs/zh/latest/plugins/kafka-logger.md
@@ -39,6 +39,10 @@ description: API 网关 Apache APISIX 的 kafka-logger 插件用于将日志作
 | brokers                | array   | 是     |                |                       | 需要推送的 Kafka 的 broker 列表。                   |
 | brokers.host           | string  | 是     |                |                       | Kafka broker 的节点 host 配置,例如 `192.168.1.1`                     |
 | brokers.port           | string  | 是     |                |                       | Kafka broker 的节点端口配置                         |
+| brokers.sasl_config    | object  | 否     |                |                       | Kafka broker 中的 sasl_config                     |
+| brokers.sasl_config.mechanism  | string  | 否     | "PLAIN"          | ["PLAIN"]   | Kafka broker 中的 sasl 认证机制                     |
+| brokers.sasl_config.user       | string  | 是     |                  |             | Kafka broker 中 sasl 配置中的 user,如果 sasl_config 存在,则必须填写                 |
+| brokers.sasl_config.password   | string  | 是     |                  |             | Kafka broker 中 sasl 配置中的 password,如果 sasl_config 存在,则必须填写             |
 | kafka_topic            | string  | 是     |                |                       | 需要推送的 topic。                                 |
 | producer_type          | string  | 否     | async          | ["async", "sync"]     | 生产者发送消息的模式。          |
 | required_acks          | integer | 否     | 1              | [0, 1, -1]            | 生产者在确认一个请求发送完成之前需要收到的反馈信息的数量。该参数是为了保证发送请求的可靠性。该属性的配置与 Kafka `acks` 属性相同,具体配置请参考 [Apache Kafka 文档](https://kafka.apache.org/documentation/#producerconfigs_acks)。  |
diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t
index 218f14c2a..777e50329 100644
--- a/t/plugin/kafka-logger.t
+++ b/t/plugin/kafka-logger.t
@@ -592,3 +592,119 @@ qr/partition_id: 2/]
 [qr/partition_id: 1/,
 qr/partition_id: 0/,
 qr/partition_id: 2/]
+
+
+
+=== TEST 20: set route with incorrect sasl_config
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "plugins":{
+                        "kafka-logger":{
+                            "brokers":[
+                            {
+                                "host":"127.0.0.1",
+                                "port":19094,
+                                "sasl_config":{
+                                    "mechanism":"PLAIN",
+                                    "user":"admin",
+                                    "password":"admin-secret2233"
+                            }
+                        }],
+                            "kafka_topic":"test2",
+                            "key":"key1",
+                            "timeout":1,
+                            "batch_max_size":1
+                        }
+                    },
+                    "upstream":{
+                        "nodes":{
+                            "127.0.0.1:1980":1
+                        },
+                        "type":"roundrobin"
+                    },
+                    "uri":"/hello"
+                }]]
+            )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 21: hit route, failed to send data to kafka
+--- request
+GET /hello
+--- response_body
+hello world
+--- error_log
+failed to do PLAIN auth with 127.0.0.1:19094: Authentication failed: Invalid username or password
+--- wait: 2
+
+
+
+=== TEST 22: set route with correct sasl_config
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "plugins":{
+                        "kafka-logger":{
+                            "brokers":[
+                            {
+                                "host":"127.0.0.1",
+                                "port":19094,
+                                "sasl_config":{
+                                    "mechanism":"PLAIN",
+                                    "user":"admin",
+                                    "password":"admin-secret"
+                            }
+                        }],
+                            "kafka_topic":"test2",
+                            "key":"key1",
+                            "timeout":1,
+                            "batch_max_size":1,
+                            "include_req_body": true
+                        }
+                    },
+                    "upstream":{
+                        "nodes":{
+                            "127.0.0.1:1980":1
+                        },
+                        "type":"roundrobin"
+                    },
+                    "uri":"/hello"
+                }]]
+            )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 23: hit route, send data to kafka successfully
+--- request
+POST /hello?name=qwerty
+abcdef
+--- response_body
+hello world
+--- error_log eval
+qr/send data to kafka: \{.*"body":"abcdef"/
+--- wait: 2
diff --git a/t/plugin/kafka-logger2.t b/t/plugin/kafka-logger2.t
index 98179734b..6f670bbaf 100644
--- a/t/plugin/kafka-logger2.t
+++ b/t/plugin/kafka-logger2.t
@@ -406,6 +406,53 @@ qr/not found topic, retryable: true, topic: undefined_topic, partition_id: -1/
                         key = "key1",
                     },
                 },
+                {
+                    input = {
+                        brokers = {
+                            {
+                                host = "127.0.0.1",
+                                port = 9093,
+                                sasl_config = {
+                                    mechanism = "INVALID",
+                                    user = "admin",
+                                    password = "admin-secret",
+                                },
+                            },
+                        },
+                        kafka_topic = "test",
+                        key = "key1",
+                    },
+                },
+                {
+                    input = {
+                        brokers = {
+                            {
+                                host = "127.0.0.1",
+                                port = 9093,
+                                sasl_config = {
+                                    user = "admin",
+                                },
+                            },
+                        },
+                        kafka_topic = "test",
+                        key = "key1",
+                    },
+                },
+                {
+                    input = {
+                        brokers = {
+                            {
+                                host = "127.0.0.1",
+                                port = 9093,
+                                sasl_config = {
+                                    password = "admin-secret",
+                                },
+                            },
+                        },
+                        kafka_topic = "test",
+                        key = "key1",
+                    },
+                },
             }
 
             local plugin = require("apisix.plugins.kafka-logger")
@@ -433,6 +480,9 @@ property "brokers" validation failed: failed to validate item 1: property "host"
 property "brokers" validation failed: failed to validate item 1: property "port" validation failed: wrong type: expected integer, got string
 property "brokers" validation failed: failed to validate item 1: property "port" validation failed: expected 0 to be at least 1
 property "brokers" validation failed: failed to validate item 1: property "port" validation failed: expected 65536 to be at most 65535
+property "brokers" validation failed: failed to validate item 1: property "sasl_config" validation failed: property "mechanism" validation failed: matches none of the enum values
+property "brokers" validation failed: failed to validate item 1: property "sasl_config" validation failed: property "password" is required
+property "brokers" validation failed: failed to validate item 1: property "sasl_config" validation failed: property "user" is required