You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by st...@apache.org on 2022/10/09 09:31:46 UTC

[apisix] branch kafka_sasl created (now 82e81e6a3)

This is an automated email from the ASF dual-hosted git repository.

starsz pushed a change to branch kafka_sasl
in repository https://gitbox.apache.org/repos/asf/apisix.git


      at 82e81e6a3 fix: ci

This branch includes the following new commits:

     new 82e81e6a3 fix: ci

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[apisix] 01/01: fix: ci

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

starsz pushed a commit to branch kafka_sasl
in repository https://gitbox.apache.org/repos/asf/apisix.git

commit 82e81e6a31a7c3b0bb71776b8f9fd9442ba544ad
Author: starsz <st...@gmail.com>
AuthorDate: Sun Oct 9 17:31:28 2022 +0800

    fix: ci
---
 apisix/plugins/kafka-logger.lua           |  25 +----
 ci/pod/docker-compose.plugin.yml          |  36 ++-----
 ci/pod/kafka/kafka-server/env/common2.env |   8 ++
 docs/en/latest/plugins/kafka-logger.md    |  11 +-
 t/plugin/kafka-logger.t                   | 164 +++++++++++++-----------------
 5 files changed, 87 insertions(+), 157 deletions(-)

diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua
index 95187bfd1..972a5f5ff 100644
--- a/apisix/plugins/kafka-logger.lua
+++ b/apisix/plugins/kafka-logger.lua
@@ -73,10 +73,11 @@ local schema = {
                         description = "sasl config",
                         properties = {
                             mechanism = { type = "string", description = "mechanism", default = "PLAIN" },
+                            user = { type = "string", description = "user" },
                             password =  { type = "string", description = "password" },
-                            user = { type = "string", description = "user" }
                         },
-                        required = {"password", "user"},
+                        required = {"user", "password"},
+                    },
                 },
                 required = {"host", "port"},
             },
@@ -119,11 +120,6 @@ local schema = {
         producer_batch_size = {type = "integer", minimum = 0, default = 1048576},
         producer_max_buffering = {type = "integer", minimum = 1, default = 50000},
         producer_time_linger = {type = "integer", minimum = 1, default = 1},
-        client_ssl = {type = "boolean", default = false},
-        client_ssl_verify = {type = "boolean", default = false},
-        client_socket_timeout = {type = "integer", default = 3000},
-        client_keepalive_timeout = {type = "integer", default = 600},
-        client_keepalive_size = {type = "integer", default = 2}
     },
     oneOf = {
         { required = {"broker_list", "kafka_topic"},},
@@ -241,15 +237,6 @@ function _M.log(conf, ctx)
     local broker_list = core.table.clone(conf.brokers or {})
     local broker_config = {}
 
-<<<<<<< HEAD
-    for host, port in pairs(conf.broker_list) do
-        local broker = {
-            host = host,
-            port = port,
-            sasl_config = conf.sasl_config or nil
-        }
-        core.table.insert(broker_list, broker)
-=======
     if conf.broker_list then
         for host, port in pairs(conf.broker_list) do
             local broker = {
@@ -258,7 +245,6 @@ function _M.log(conf, ctx)
             }
             core.table.insert(broker_list, broker)
         end
->>>>>>> master
     end
 
     broker_config["request_timeout"] = conf.timeout * 1000
@@ -268,11 +254,6 @@ function _M.log(conf, ctx)
     broker_config["batch_size"] = conf.producer_batch_size
     broker_config["max_buffering"] = conf.producer_max_buffering
     broker_config["flush_time"] = conf.producer_time_linger * 1000
-    broker_config["ssl"] = conf.client_ssl
-    broker_config["ssl_verify"] = conf.client_ssl_verify
-    broker_config["socket_timeout"] = conf.client_socket_timeout
-    broker_config["keepalive_timeout"] = conf.client_keepalive_timeout * 1000
-    broker_config["keepalive_size"] = conf.client_keepalive_size
 
     local prod, err = core.lrucache.plugin_ctx(lrucache, ctx, nil, create_producer,
                                                broker_list, broker_config, conf.cluster_name)
diff --git a/ci/pod/docker-compose.plugin.yml b/ci/pod/docker-compose.plugin.yml
index 4c0c4cb7e..f16fa2693 100644
--- a/ci/pod/docker-compose.plugin.yml
+++ b/ci/pod/docker-compose.plugin.yml
@@ -104,17 +104,22 @@ services:
   kafka-server2:
     image: bitnami/kafka:2.8.1
     env_file:
-      - ci/pod/kafka/kafka-server/env/common.env
+      - ci/pod/kafka/kafka-server/env/common2.env
     environment:
       KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper-server2:2181
     restart: unless-stopped
     ports:
       - "19092:9092"
+      - "19094:9094"
     depends_on:
       - zookeeper-server1
       - zookeeper-server2
     networks:
       kafka_net:
+    volumes:
+      - ./ci/pod/kafka/kafka-server/kafka_jaas.conf:/opt/bitnami/kafka/config/kafka_jaas.conf:ro
+      - ./ci/pod/kafka/kafka-server/selfsigned.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro
+      - ./ci/pod/kafka/kafka-server/selfsigned.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro
 
   ## SkyWalking
   skywalking:
@@ -247,35 +252,6 @@ services:
       xpack.security.enabled: 'true'
 
 
-  # The function services of OpenFunction
-  test-header:
-    image: test-header-image:latest
-    restart: unless-stopped
-    ports:
-      - "30583:8080"
-    environment:
-      CONTEXT_MODE: "self-host"
-      FUNC_CONTEXT: "{\"name\":\"HelloWorld\",\"version\":\"v1.0.0\",\"port\":\"8080\",\"runtime\":\"Knative\"}"
-
-  test-uri:
-    image: test-uri-image:latest
-    restart: unless-stopped
-    ports:
-      - "30584:8080"
-    environment:
-      CONTEXT_MODE: "self-host"
-      FUNC_CONTEXT: "{\"name\":\"HelloWorld\",\"version\":\"v1.0.0\",\"port\":\"8080\",\"runtime\":\"Knative\"}"
-
-  test-body:
-    image: test-body-image:latest
-    restart: unless-stopped
-    ports:
-      - "30585:8080"
-    environment:
-      CONTEXT_MODE: "self-host"
-      FUNC_CONTEXT: "{\"name\":\"HelloWorld\",\"version\":\"v1.0.0\",\"port\":\"8080\",\"runtime\":\"Knative\"}"
-
-
 networks:
   apisix_net:
   kafka_net:
diff --git a/ci/pod/kafka/kafka-server/env/common2.env b/ci/pod/kafka/kafka-server/env/common2.env
new file mode 100644
index 000000000..d07bf6d1a
--- /dev/null
+++ b/ci/pod/kafka/kafka-server/env/common2.env
@@ -0,0 +1,8 @@
+ALLOW_PLAINTEXT_LISTENER=yes
+KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
+KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,SASL_PLAINTEXT://0.0.0.0:9094
+KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092,SASL_PLAINTEXT://127.0.0.1:9094
+KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=
+KAFKA_CFG_SSL_KEYSTORE_LOCATION=/opt/bitnami/kafka/config/certs/kafka.keystore.jks
+KAFKA_CFG_SSL_KEYSTORE_PASSWORD=changeit
+KAFKA_CFG_SSL_KEY_PASSWORD=changeit
diff --git a/docs/en/latest/plugins/kafka-logger.md b/docs/en/latest/plugins/kafka-logger.md
index 3d9904522..714cc6d78 100644
--- a/docs/en/latest/plugins/kafka-logger.md
+++ b/docs/en/latest/plugins/kafka-logger.md
@@ -37,16 +37,12 @@ It might take some time to receive the log data. It will be automatically sent a
 
 | Name                   | Type    | Required | Default        | Valid values          | Description                                                                                                                                                                                                                                                                                                                                      |
 | ---------------------- | ------- | -------- | -------------- | --------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
-<<<<<<< HEAD
-| broker_list            | object  | True     |                |                       | List of Kafka brokers (nodes).                                                                                                                                                                                                                                                                                                                   |
-
-=======
 | broker_list            | object  | True     |                |                       | Deprecated, use `brokers` instead. List of Kafka brokers.  (nodes).                                                                                                                                                                                                                                                                                                                   |
 | brokers                | array   | True     |                |                       | List of Kafka brokers (nodes).                                                                                                                                                                                                                                                                                                                   |
 | brokers.host           | string  | True     |                |                       | The host of Kafka broker, e.g, `192.168.1.1`.                                                                                                                                                                                                                                                                                                                   |
 | brokers.port           | integer | True     |                |   [0, 65535]                  |  The port of Kafka broker                                                                                                                                                                                                                                                                                                                  |
 | brokers.sasl_config    | object  | False    |                |                               |  The sasl config of Kafka broker                                                                                                                                                                                                                                                                                                                 |
-| brokers.sasl_config.mechanism  | string  | False    | PLAIN          | "PLAIN"              |     The mechaism of sasl config                                                                                                                                                                                                                                                                                                             |
+| brokers.sasl_config.mechanism  | string  | False    | "PLAIN"          | ["PLAIN"]              |     The mechaism of sasl config                                                                                                                                                                                                                                                                                                             |
 | brokers.sasl_config.user   | string  | True     |                |                          |    The user of sasl_config.If sasl_config exists, it's required.                                                                                                                                                                                                                                                                                             |
 | brokers.sasl_config.password  | string  | True   |                |                       | The password of sasl_config.If sasl_config exists, it's required.                                                                                                                                                                                                                                                                                                 |
 | kafka_topic            | string  | True     |                |                       | Target topic to push the logs for organisation.                                                                                                                                                                                                                                                                                                  |
@@ -65,11 +61,6 @@ It might take some time to receive the log data. It will be automatically sent a
 | producer_batch_size    | integer | optional    | 1048576        | [0,...]               | `batch_size` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) in bytes.                                                                                                                                                                                                                                                                                                             [...]
 | producer_max_buffering | integer | optional    | 50000          | [1,...]               | `max_buffering` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) representing maximum buffer size. Unit is message count.                                                                                                                                                                                                                                                           [...]
 | producer_time_linger   | integer | optional    | 1              | [1,...]               | `flush_time` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) in seconds.                                                                                                                                                                                                                                                                                                           [...]
-| client_ssl             | boolean | optional    | false           | [true,false]          | `ssl` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka)                                                                                                                                                                                                                                                                                                                             [...]
-| client_ssl_verify      | boolean | optional    | false           | [true,false]          | `ssl_verify` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) .                                                                                                                                                                                                                                                                                                                    [...]
-| client_socket_timeout  | integer | optional    | 3000            | [1,...]               | `socket_timeout` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka),Unit is seconds.                                                                                                                                                                                                                                                                                                 [...]
-| client_keepalive_timeout  | integer | optional    | 600            | [1,...]               | `keepalive_timeout` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) ,Unit is millisecond.                                                                                                                                                                                                                                                                                       [...]
-| client_keepalive_size  | integer | optional    | 2            | [1,...]               | `keepalive_size` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) .                                                                                                                                                                                                                                                                                                                   [...]
 
 This Plugin supports using batch processors to aggregate and process entries (logs/data) in a batch. This avoids the need for frequently submitting the data. The batch processor submits data every `5` seconds or when the data in the queue reaches `1000`. See [Batch Processor](../batch-processor.md#configuration) for more information or setting your custom configuration.
 
diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t
index ec3b9344f..4064c10b4 100644
--- a/t/plugin/kafka-logger.t
+++ b/t/plugin/kafka-logger.t
@@ -595,7 +595,7 @@ qr/partition_id: 2/]
 
 
 
-=== TEST 20: set route(id: 1)
+=== TEST 20: set route with incorrect sasl_config
 --- config
     location /t {
         content_by_lua_block {
@@ -603,32 +603,33 @@ qr/partition_id: 2/]
             local code, body = t('/apisix/admin/routes/1',
                  ngx.HTTP_PUT,
                  [[{
-                        "plugins": {
-                            "kafka-logger": {
-                                "broker_list" :
-                                  {
-                                    "127.0.0.1":9092
-                                  },
-                                "kafka_topic" : "test2",
-                                "key" : "key1",
-                                "timeout" : 1,
-                                "batch_max_size": 1,
-                                "sasl_config": {
-                                    "mechanism": "PLAIN",
-                                    "user": "admin",
-                                    "password": "admin-secret"
-                                }
+                    "plugins":{
+                        "kafka-logger":{
+                            "brokers":[
+                            {
+                                "host":"127.0.0.1",
+                                "port":19094,
+                                "sasl_config":{
+                                    "mechanism":"PLAIN",
+                                    "user":"admin",
+                                    "password":"admin-secret2233"
                             }
+                        }],
+                            "kafka_topic":"test2",
+                            "key":"key1",
+                            "timeout":1,
+                            "batch_max_size":1
+                        }
+                    },
+                    "upstream":{
+                        "nodes":{
+                            "127.0.0.1:1980":1
                         },
-                        "upstream": {
-                            "nodes": {
-                                "127.0.0.1:1980": 1
-                            },
-                            "type": "roundrobin"
-                        },
-                        "uri": "/hello"
+                        "type":"roundrobin"
+                    },
+                    "uri":"/hello"
                 }]]
-                )
+            )
             if code >= 300 then
                 ngx.status = code
             end
@@ -640,52 +641,18 @@ passed
 
 
 
-=== TEST 21: inject create producer
---- extra_init_by_lua
-    local producer = require("resty.kafka.producer")
-    producer.new = function(self, broker_list, producer_config, cluster_name)
-        local opts = producer_config or {}
-        local cli = {
-                broker_list = broker_list,
-                topic_partitions = {},
-                brokers = {},
-                api_versions = {},
-                client_id = "worker" .. pid(),
-                socket_config = {
-                    socket_timeout = opts.socket_timeout or 3000,
-                    keepalive_timeout = opts.keepalive_timeout or (600 * 1000),
-                    keepalive_size = opts.keepalive_size or 2,
-                    ssl = opts.ssl or false,
-                    ssl_verify = opts.ssl_verify or false,
-                    resolver = opts.resolver or nil
-                }
-            }
-        return {
-            client = cli,
-            correlation_id = 1,
-            request_timeout = opts.request_timeout or 2000,
-            retry_backoff = opts.retry_backoff or 100,
-            max_retry = opts.max_retry or 3,
-            required_acks = opts.required_acks or 1,
-            partitioner = opts.partitioner,
-            error_handle = opts.error_handle,
-            api_version = opts.api_version or 0,
-            async = opts.producer_type == "async",
-            socket_config = cli.socket_config,
-            _timer_flushing_buffer = false,
-            ringbuffer = ringbuffer:new(opts.batch_num or 200, opts.max_buffering or 50000),
-            sendbuffer = sendbuffer:new(opts.batch_num or 200, opts.batch_size or 1048576)
-        }
-    end
+=== TEST 21: hit route, failed to send data to kafka 
 --- request
 GET /hello
 --- response_body
 hello world
+--- error_log
+failed to do PLAIN auth with 127.0.0.1:19094: Authentication failed: Invalid username or password
 --- wait: 2
 
 
 
-=== TEST 22: error log
+=== TEST 22: set route with correct sasl_config
 --- config
     location /t {
         content_by_lua_block {
@@ -693,44 +660,51 @@ hello world
             local code, body = t('/apisix/admin/routes/1',
                  ngx.HTTP_PUT,
                  [[{
-                        "plugins": {
-                             "kafka-logger": {
-                                    "broker_list" :
-                                      {
-                                        "127.0.0.1":9092,
-                                        "127.0.0.1":9093
-                                      },
-                                    "kafka_topic" : "test2",
-                                    "key" : "key1",
-                                    "timeout" : 1,
-                                    "batch_max_size": 1,
-                                    "sasl_config": {
-                                        "mechanism": "PLAIN",
-                                        "user": "admin",
-                                        "password": "admin-secret"
-                                    }
-                             }
-                        },
-                        "upstream": {
-                            "nodes": {
-                                "127.0.0.1:1980": 1
-                            },
-                            "type": "roundrobin"
+                    "plugins":{
+                        "kafka-logger":{
+                            "brokers":[
+                            {
+                                "host":"127.0.0.1",
+                                "port":19094,
+                                "sasl_config":{
+                                    "mechanism":"PLAIN",
+                                    "user":"admin",
+                                    "password":"admin-secret"
+                            }
+                        }],
+                            "kafka_topic":"test2",
+                            "key":"key1",
+                            "timeout":1,
+                            "batch_max_size":1,
+                            "include_req_body": true
+                        }
+                    },
+                    "upstream":{
+                        "nodes":{
+                            "127.0.0.1:1980":1
                         },
-                        "uri": "/hello"
+                        "type":"roundrobin"
+                    },
+                    "uri":"/hello"
                 }]]
-                )
+            )
             if code >= 300 then
                 ngx.status = code
             end
             ngx.say(body)
-            local http = require "resty.http"
-            local httpc = http.new()
-            local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello"
-            local res, err = httpc:request_uri(uri, {method = "GET"})
         }
     }
---- error_log
-failed to send data to Kafka topic
-[error]
---- wait: 1
+--- response_body
+passed
+
+
+
+=== TEST 23: hit route, send data to kafka successfully
+--- request
+POST /hello?name=qwerty
+abcdef
+--- response_body
+hello world
+--- error_log eval
+qr/send data to kafka: \{.*"body":"abcdef"/
+--- wait: 2