You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by GitBox <gi...@apache.org> on 2021/11/30 18:12:28 UTC

[GitHub] [apisix] yuz10 opened a new pull request #5653: feat: rocketmq logger

yuz10 opened a new pull request #5653:
URL: https://github.com/apache/apisix/pull/5653


   ### What this PR does / why we need it:
   feature: add rocketmq-logger plugin 
   which send log to rocketmq.
   ### Pre-submission checklist:
   
   <!--
   Please follow the requirements:
   1. Use Draft if the PR is not ready to be reviewed
   2. Test is required for the feat/fix PR, unless you have a good reason
   3. Doc is required for the feat PR
   4. Use a new commit to resolve review instead of `push -f`
   5. Use "request review" to notify the reviewer once you have resolved the review
   -->
   
   - [x] Did you explain what problem does this PR solve? Or what new features have been added?
   - [x] Have you added corresponding test cases?
   - [x] Have you modified the corresponding document?
   - [x] Is this PR backward compatible? **If it is not backward compatible, please discuss on the [mailing list](https://github.com/apache/apisix/tree/master#community) first**
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] yuz10 commented on a change in pull request #5653: feat: rocketmq logger

Posted by GitBox <gi...@apache.org>.
yuz10 commented on a change in pull request #5653:
URL: https://github.com/apache/apisix/pull/5653#discussion_r762658128



##########
File path: apisix/plugins/rocketmq-logger.lua
##########
@@ -0,0 +1,256 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core     = require("apisix.core")
+local log_util = require("apisix.utils.log-util")
+local producer = require ("resty.rocketmq.producer")
+local acl_rpchook = require("resty.rocketmq.acl_rpchook")
+local batch_processor = require("apisix.utils.batch-processor")
+local plugin = require("apisix.plugin")
+
+local type     = type
+local ipairs   = ipairs
+local plugin_name = "rocketmq-logger"
+local stale_timer_running = false
+local ngx = ngx
+local timer_at = ngx.timer.at
+local buffers = {}
+
+local lrucache = core.lrucache.new({
+    type = "plugin",
+})
+
+local schema = {
+    type = "object",
+    properties = {
+        meta_format = {
+            type = "string",
+            default = "default",
+            enum = {"default", "origin"},
+        },
+        nameserver_list = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "string"
+            }
+        },
+        topic = {type = "string"},
+        key = {type = "string"},
+        tag = {type = "string"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        use_tls = {type = "boolean", default = false},
+        access_key = {type = "string", default = ""},
+        secret_key = {type = "string", default = ""},
+        name = {type = "string", default = "rocketmq logger"},
+        max_retry_count = {type = "integer", minimum = 0, default = 0},
+        retry_delay = {type = "integer", minimum = 0, default = 1},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        include_req_body = {type = "boolean", default = false},
+        include_req_body_expr = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "array",
+                items = {
+                    type = "string"
+                }
+            }
+        },
+        include_resp_body = {type = "boolean", default = false},
+        include_resp_body_expr = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "array",
+                items = {
+                    type = "string"
+                }
+            }
+        },
+    },
+    required = {"nameserver_list", "topic"}
+}
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+local _M = {
+    version = 0.1,
+    priority = 402,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+
+    local ok, err = core.schema.check(schema, conf)
+    if not ok then
+        return nil, err
+    end
+    return log_util.check_log_schema(conf)
+end
+
+
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do
+        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
+            core.log.warn("removing batch processor stale object, conf: ",
+                          core.json.delay_encode(key))
+            buffers[key] = nil
+        end
+    end
+
+    stale_timer_running = false
+end
+
+
+local function create_producer(nameserver_list, producer_config)
+    core.log.info("create new rocketmq producer instance")
+    local prod = producer.new(nameserver_list, "apisixLogProducer")
+    if producer_config.use_tls then
+        prod:setUseTLS(true)
+    end
+    if producer_config.access_key ~= '' then
+        local aclHook = acl_rpchook.new(producer_config.access_key, producer_config.secret_key)
+        prod:addRPCHook(aclHook)
+    end
+    prod:setTimeout(producer_config.timeout)
+    return prod
+end
+
+
+local function send_rocketmq_data(conf, log_message, prod)
+    local result, err = prod:send(conf.topic, log_message, conf.tag, conf.key)
+    if not result then
+        return false, "failed to send data to rocketmq topic: " .. err ..
+                ", nameserver_list: " .. core.json.encode(conf.nameserver_list)

Review comment:
       Sorry, I dont understand what you mean. the nameserver is add to err and test case will verify the error log if send fail. like https://github.com/yuz10/apisix/blob/master/t/plugin/rocketmq-logger.t#L862




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] spacewander commented on a change in pull request #5653: feat: rocketmq logger

Posted by GitBox <gi...@apache.org>.
spacewander commented on a change in pull request #5653:
URL: https://github.com/apache/apisix/pull/5653#discussion_r761585147



##########
File path: docs/en/latest/plugins/rocketmq-logger.md
##########
@@ -0,0 +1,234 @@
+---
+title: rocketmq-logger
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+## Summary
+
+- [**Name**](#name)
+- [**Attributes**](#attributes)
+- [**Info**](#info)
+- [**How To Enable**](#how-to-enable)
+- [**Test Plugin**](#test-plugin)
+- [**Disable Plugin**](#disable-plugin)
+
+## Name
+
+`rocketmq-logger` is a plugin which provides the ability to push requests log data as JSON objects to your external rocketmq clusters.
+
+ In case if you did not receive the log data don't worry give it some time it will automatically send the logs after the timer function expires in our Batch Processor.
+
+For more info on Batch-Processor in Apache APISIX please refer.
+[Batch-Processor](../batch-processor.md)
+
+## Attributes
+
+| Name             | Type    | Requirement | Default        | Valid   | Description                                                                              |
+| ---------------- | ------- | ----------- | -------------- | ------- | ---------------------------------------------------------------------------------------- |
+| nameserver_list  | object  | required    |                |         | An array of rocketmq nameservers.                                                               |
+| topic            | string  | required    |                |         | Target  topic to push data.                                                              |
+| key              | string  | optional    |                |         | Keys of messages to send.                                               |
+| tag              | string  | optional   |                |         | Tags of messages to send.                           |
+| timeout          | integer | optional    | 3              | [1,...] | Timeout for the upstream to send data.                                                   |
+| use_tls          | boolean | optional   | false          |         | Whether to open TLS                          |
+| access_key       | string  | optional   | ""             |         | access key for ACL, empty string means disable ACL.     |
+| secret_key       | string  | optional   | ""             |         | secret key for ACL.                         |
+| name             | string  | optional    | "rocketmq logger" |         | A  unique identifier to identity the batch processor.                                     |
+| meta_format      | enum    | optional    | "default"      | ["default","origin"] | `default`: collect the request information with default JSON way. `origin`: collect the request information with original HTTP request. [example](#examples-of-meta_format)|
+| batch_max_size   | integer | optional    | 1000           | [1,...] | Set the maximum number of logs sent in each batch. When the number of logs reaches the set maximum, all logs will be automatically pushed to the `rocketmq` service.                         |
+| inactive_timeout | integer | optional    | 5              | [1,...] | The maximum time to refresh the buffer (in seconds). When the maximum refresh time is reached, all logs will be automatically pushed to the `rocketmq` service regardless of whether the number of logs in the buffer reaches the set maximum number. |
+| buffer_duration  | integer | optional    | 60             | [1,...] | Maximum age in seconds of the oldest entry in a batch before the batch must be processed.|
+| max_retry_count  | integer | optional    | 0              | [0,...] | Maximum number of retries before removing from the processing pipe line.                 |
+| retry_delay      | integer | optional    | 1              | [0,...] | Number of seconds the process execution should be delayed if the execution fails.        |
+| include_req_body | boolean | optional    | false          | [false, true] | Whether to include the request body. false: indicates that the requested body is not included; true: indicates that the requested body is included. Note: if the request body is too big to be kept in the memory, it can't be logged due to Nginx's limitation. |
+| include_req_body_expr  | array  | optional    |          |         | When `include_req_body` is true, control the behavior based on the result of the [lua-resty-expr](https://github.com/api7/lua-resty-expr) expression. If present, only log the request body when the result is true. |
+| include_resp_body| boolean | optional    | false         | [false, true] | Whether to include the response body. The response body is included if and only if it is `true`. |
+| include_resp_body_expr  | array  | optional    |          |         | When `include_resp_body` is true, control the behavior based on the result of the [lua-resty-expr](https://github.com/api7/lua-resty-expr) expression. If present, only log the response body when the result is true. |
+
+### examples of meta_format
+
+- **default**:
+
+```json
+    {
+     "upstream": "127.0.0.1:1980",
+     "start_time": 1619414294760,
+     "client_ip": "127.0.0.1",
+     "service_id": "",
+     "route_id": "1",
+     "request": {
+       "querystring": {
+         "ab": "cd"
+       },
+       "size": 90,
+       "uri": "/hello?ab=cd",
+       "url": "http://localhost:1984/hello?ab=cd",
+       "headers": {
+         "host": "localhost",
+         "content-length": "6",
+         "connection": "close"
+       },
+       "body": "abcdef",
+       "method": "GET"
+     },
+     "response": {
+       "headers": {
+         "connection": "close",
+         "content-type": "text/plain; charset=utf-8",
+         "date": "Mon, 26 Apr 2021 05:18:14 GMT",
+         "server": "APISIX/2.5",
+         "transfer-encoding": "chunked"
+       },
+       "size": 190,
+       "status": 200
+     },
+     "server": {
+       "hostname": "localhost",
+       "version": "2.5"
+     },
+     "latency": 0
+    }
+```
+
+- **origin**:
+
+```http
+    GET /hello?ab=cd HTTP/1.1
+    host: localhost
+    content-length: 6
+    connection: close
+
+    abcdef
+```
+
+## Info
+
+The `message` will write to the buffer first.
+It will send to the rocketmq server when the buffer exceed the `batch_max_size`,
+or every `buffer_duration` flush the buffer.
+
+In case of success, returns `true`.
+In case of errors, returns `nil` with a string describing the error (`buffer overflow`).
+
+### Sample Nameserver list
+
+Specify the nameservers of the external rocketmq servers as below sample.
+
+```json
+{
+    "127.0.0.1":9876,

Review comment:
       Need to update the nameserver format.

##########
File path: docs/zh/latest/plugins/rocketmq-logger.md
##########
@@ -0,0 +1,229 @@
+---
+title: rocketmq-logger
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+## 目录
+
+- [**简介**](#简介)
+- [**属性**](#属性)
+- [**工作原理**](#工作原理)
+- [**如何启用**](#如何启用)
+- [**测试插件**](#测试插件)
+- [**禁用插件**](#禁用插件)
+
+## 简介
+
+`rocketmq-logger` 插件可以将接口请求日志以 JSON 的形式推送给外部 rocketmq 集群。
+
+如果在短时间内没有收到日志数据,请放心,它会在我们的批处理处理器中的计时器功能到期后自动发送日志。
+
+有关 Apache APISIX 中 Batch-Processor 的更多信息,请参考。
+[Batch-Processor](../batch-processor.md)
+
+## 属性
+
+| 名称             | 类型    | 必选项 | 默认值         | 有效值  | 描述                                             |
+| ---------------- | ------- | ------ | -------------- | ------- | ------------------------------------------------ |
+| nameserver_list  | object  | 必须   |                |         | 要推送的 rocketmq 的 nameserver 列表。        |
+| topic            | string  | 必须   |                |         | 要推送的 topic。                             |
+| key              | string  | 可选   |                |         | 发送消息的keys。                             |
+| tag              | string  | 可选   |                |         | 发送消息的tags。                             |
+| timeout          | integer | 可选   | 3              | [1,...] | 发送数据的超时时间。                          |
+| use_tls          | boolean | 可选   | false          |         | 是否开启TLS加密。                             |
+| access_key       | string  | 可选   | ""             |         | ACL认证的access key,空字符串表示不开启ACL。     |
+| secret_key       | string  | 可选   | ""             |         | ACL认证的secret key。                         |
+| name             | string  | 可选   | "rocketmq logger" |         | batch processor 的唯一标识。               |
+| meta_format      | enum    | 可选   | "default"      | ["default","origin"] | `default`:获取请求信息以默认的 JSON 编码方式。`origin`:获取请求信息以 HTTP 原始请求方式。[具体示例](#meta_format-参考示例)|
+| batch_max_size   | integer | 可选   | 1000           | [1,...] | 设置每批发送日志的最大条数,当日志条数达到设置的最大值时,会自动推送全部日志到 `rocketmq` 服务。|
+| inactive_timeout | integer | 可选   | 5              | [1,...] | 刷新缓冲区的最大时间(以秒为单位),当达到最大的刷新时间时,无论缓冲区中的日志数量是否达到设置的最大条数,也会自动将全部日志推送到 `rocketmq` 服务。 |
+| buffer_duration  | integer | 可选   | 60             | [1,...] | 必须先处理批次中最旧条目的最长期限(以秒为单位)。 |
+| max_retry_count  | integer | 可选   | 0              | [0,...] | 从处理管道中移除之前的最大重试次数。             |
+| retry_delay      | integer | 可选   | 1              | [0,...] | 如果执行失败,则应延迟执行流程的秒数。           |
+| include_req_body | boolean | 可选   | false          | [false, true] | 是否包括请求 body。false: 表示不包含请求的 body ;true: 表示包含请求的 body。注意:如果请求 body 没办法完全放在内存中,由于 Nginx 的限制,我们没有办法把它记录下来。|
+| include_req_body_expr | array  | 可选    |           |         | 当 `include_req_body` 开启时, 基于 [lua-resty-expr](https://github.com/api7/lua-resty-expr) 表达式的结果进行记录。如果该选项存在,只有在表达式为真的时候才会记录请求 body。 |
+| include_resp_body| boolean | 可选   | false          | [false, true] | 是否包括响应体。包含响应体,当为`true`。 |
+| include_resp_body_expr | array  | 可选    |           |         | 是否采集响体, 基于[lua-resty-expr](https://github.com/api7/lua-resty-expr)。 该选项需要开启 `include_resp_body`|
+
+### meta_format 参考示例
+
+- **default**:
+
+```json
+    {
+     "upstream": "127.0.0.1:1980",
+     "start_time": 1619414294760,
+     "client_ip": "127.0.0.1",
+     "service_id": "",
+     "route_id": "1",
+     "request": {
+       "querystring": {
+         "ab": "cd"
+       },
+       "size": 90,
+       "uri": "/hello?ab=cd",
+       "url": "http://localhost:1984/hello?ab=cd",
+       "headers": {
+         "host": "localhost",
+         "content-length": "6",
+         "connection": "close"
+       },
+       "body": "abcdef",
+       "method": "GET"
+     },
+     "response": {
+       "headers": {
+         "connection": "close",
+         "content-type": "text/plain; charset=utf-8",
+         "date": "Mon, 26 Apr 2021 05:18:14 GMT",
+         "server": "APISIX/2.5",
+         "transfer-encoding": "chunked"
+       },
+       "size": 190,
+       "status": 200
+     },
+     "server": {
+       "hostname": "localhost",
+       "version": "2.5"
+     },
+     "latency": 0
+    }
+```
+
+- **origin**:
+
+```http
+    GET /hello?ab=cd HTTP/1.1
+    host: localhost
+    content-length: 6
+    connection: close
+
+    abcdef
+```
+
+## 工作原理
+
+消息将首先写入缓冲区。
+当缓冲区超过 `batch_max_size` 时,它将发送到 rocketmq 服务器,
+或每个 `buffer_duration` 刷新缓冲区。
+
+如果成功,则返回 `true`。
+如果出现错误,则返回 `nil`,并带有描述错误的字符串(`buffer overflow`)。
+
+### Nameserver 列表
+
+配置多个nameserver地址如下:
+
+```json
+{
+    "127.0.0.1":9876,

Review comment:
       Ditto




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] yuz10 commented on a change in pull request #5653: feat: rocketmq logger

Posted by GitBox <gi...@apache.org>.
yuz10 commented on a change in pull request #5653:
URL: https://github.com/apache/apisix/pull/5653#discussion_r760836989



##########
File path: ci/pod/docker-compose.yml
##########
@@ -355,10 +355,34 @@ services:
     networks:
       apisix_net:
 
+  namesrv:

Review comment:
       @leslie-tsang thanks, I found another way




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] yuz10 commented on a change in pull request #5653: feat: rocketmq logger

Posted by GitBox <gi...@apache.org>.
yuz10 commented on a change in pull request #5653:
URL: https://github.com/apache/apisix/pull/5653#discussion_r760727805



##########
File path: apisix/plugins/rocketmq-logger.lua
##########
@@ -0,0 +1,271 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core     = require("apisix.core")
+local log_util = require("apisix.utils.log-util")
+local producer = require ("resty.rocketmq.producer")
+local acl_rpchook = require("resty.rocketmq.acl_rpchook")
+local batch_processor = require("apisix.utils.batch-processor")
+local plugin = require("apisix.plugin")
+
+local pairs    = pairs
+local type     = type
+local ipairs   = ipairs
+local plugin_name = "rocketmq-logger"
+local stale_timer_running = false
+local timer_at = ngx.timer.at
+local ngx = ngx
+local buffers = {}
+
+local lrucache = core.lrucache.new({
+    type = "plugin",
+})
+
+local schema = {
+    type = "object",
+    properties = {
+        meta_format = {
+            type = "string",
+            default = "default",
+            enum = {"default", "origin"},
+        },
+        nameserver_list = {
+            type = "object",
+            minProperties = 1,
+            patternProperties = {
+                [".*"] = {
+                    description = "the port of rocketmq nameserver",
+                    type = "integer",
+                    minimum = 1,
+                    maximum = 65535,
+                },
+            },
+        },
+        rocketmq_topic = {type = "string"},

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] yuz10 commented on a change in pull request #5653: feat: rocketmq logger

Posted by GitBox <gi...@apache.org>.
yuz10 commented on a change in pull request #5653:
URL: https://github.com/apache/apisix/pull/5653#discussion_r760727687



##########
File path: apisix/plugins/rocketmq-logger.lua
##########
@@ -0,0 +1,271 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core     = require("apisix.core")
+local log_util = require("apisix.utils.log-util")
+local producer = require ("resty.rocketmq.producer")
+local acl_rpchook = require("resty.rocketmq.acl_rpchook")
+local batch_processor = require("apisix.utils.batch-processor")
+local plugin = require("apisix.plugin")
+
+local pairs    = pairs
+local type     = type
+local ipairs   = ipairs
+local plugin_name = "rocketmq-logger"
+local stale_timer_running = false
+local timer_at = ngx.timer.at
+local ngx = ngx
+local buffers = {}
+
+local lrucache = core.lrucache.new({
+    type = "plugin",
+})
+
+local schema = {
+    type = "object",
+    properties = {
+        meta_format = {
+            type = "string",
+            default = "default",
+            enum = {"default", "origin"},
+        },
+        nameserver_list = {
+            type = "object",
+            minProperties = 1,
+            patternProperties = {

Review comment:
       chaged to  [ "127.0.0.1:9876" ]




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] spacewander commented on a change in pull request #5653: feat: rocketmq logger

Posted by GitBox <gi...@apache.org>.
spacewander commented on a change in pull request #5653:
URL: https://github.com/apache/apisix/pull/5653#discussion_r761745979



##########
File path: t/plugin/rocketmq-logger.t
##########
@@ -0,0 +1,1098 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+no_long_string();
+no_root_location();

Review comment:
       LGTM. We can improve the test in a separate PR:
   
   The newly added test file should use a preprocessor to reduce duplicate `--- request` & `--- no_error_log` section. You can take a look at:
   
   https://github.com/apache/apisix/blob/11433d740c5e19b757c61f59842d6de64e27fba8/t/plugin/gzip.t#L33=L42
   
   And please split it into rocketmq-logger.t & rocketmq-logger2.t. Each test file should be smaller than 800L.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] yuz10 commented on a change in pull request #5653: feat: rocketmq logger

Posted by GitBox <gi...@apache.org>.
yuz10 commented on a change in pull request #5653:
URL: https://github.com/apache/apisix/pull/5653#discussion_r762654690



##########
File path: apisix/plugins/rocketmq-logger.lua
##########
@@ -0,0 +1,256 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core     = require("apisix.core")
+local log_util = require("apisix.utils.log-util")
+local producer = require ("resty.rocketmq.producer")
+local acl_rpchook = require("resty.rocketmq.acl_rpchook")
+local batch_processor = require("apisix.utils.batch-processor")
+local plugin = require("apisix.plugin")
+
+local type     = type
+local ipairs   = ipairs
+local plugin_name = "rocketmq-logger"
+local stale_timer_running = false
+local ngx = ngx
+local timer_at = ngx.timer.at
+local buffers = {}
+
+local lrucache = core.lrucache.new({
+    type = "plugin",
+})
+
+local schema = {
+    type = "object",
+    properties = {
+        meta_format = {
+            type = "string",
+            default = "default",
+            enum = {"default", "origin"},
+        },
+        nameserver_list = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "string"
+            }
+        },
+        topic = {type = "string"},
+        key = {type = "string"},
+        tag = {type = "string"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        use_tls = {type = "boolean", default = false},
+        access_key = {type = "string", default = ""},
+        secret_key = {type = "string", default = ""},
+        name = {type = "string", default = "rocketmq logger"},
+        max_retry_count = {type = "integer", minimum = 0, default = 0},
+        retry_delay = {type = "integer", minimum = 0, default = 1},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        include_req_body = {type = "boolean", default = false},
+        include_req_body_expr = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "array",
+                items = {
+                    type = "string"
+                }
+            }
+        },
+        include_resp_body = {type = "boolean", default = false},
+        include_resp_body_expr = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "array",
+                items = {
+                    type = "string"
+                }
+            }
+        },
+    },
+    required = {"nameserver_list", "topic"}
+}
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+local _M = {
+    version = 0.1,
+    priority = 402,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+
+    local ok, err = core.schema.check(schema, conf)
+    if not ok then
+        return nil, err
+    end
+    return log_util.check_log_schema(conf)
+end
+
+
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do

Review comment:
       fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] vongosling commented on pull request #5653: feat: rocketmq logger

Posted by GitBox <gi...@apache.org>.
vongosling commented on pull request #5653:
URL: https://github.com/apache/apisix/pull/5653#issuecomment-984531134


   Pls keep going:-) I'd like to see this feature ready for both community cooperation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] yuz10 commented on a change in pull request #5653: feat: rocketmq logger

Posted by GitBox <gi...@apache.org>.
yuz10 commented on a change in pull request #5653:
URL: https://github.com/apache/apisix/pull/5653#discussion_r762665326



##########
File path: apisix/plugins/rocketmq-logger.lua
##########
@@ -0,0 +1,256 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core     = require("apisix.core")
+local log_util = require("apisix.utils.log-util")
+local producer = require ("resty.rocketmq.producer")
+local acl_rpchook = require("resty.rocketmq.acl_rpchook")
+local batch_processor = require("apisix.utils.batch-processor")
+local plugin = require("apisix.plugin")
+
+local type     = type
+local ipairs   = ipairs
+local plugin_name = "rocketmq-logger"
+local stale_timer_running = false
+local ngx = ngx
+local timer_at = ngx.timer.at
+local buffers = {}
+
+local lrucache = core.lrucache.new({
+    type = "plugin",
+})
+
+local schema = {
+    type = "object",
+    properties = {
+        meta_format = {
+            type = "string",
+            default = "default",
+            enum = {"default", "origin"},
+        },
+        nameserver_list = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "string"
+            }
+        },
+        topic = {type = "string"},
+        key = {type = "string"},
+        tag = {type = "string"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        use_tls = {type = "boolean", default = false},
+        access_key = {type = "string", default = ""},
+        secret_key = {type = "string", default = ""},
+        name = {type = "string", default = "rocketmq logger"},
+        max_retry_count = {type = "integer", minimum = 0, default = 0},
+        retry_delay = {type = "integer", minimum = 0, default = 1},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        include_req_body = {type = "boolean", default = false},
+        include_req_body_expr = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "array",
+                items = {
+                    type = "string"
+                }
+            }
+        },
+        include_resp_body = {type = "boolean", default = false},
+        include_resp_body_expr = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "array",
+                items = {
+                    type = "string"
+                }
+            }
+        },
+    },
+    required = {"nameserver_list", "topic"}
+}
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+local _M = {
+    version = 0.1,
+    priority = 402,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+
+    local ok, err = core.schema.check(schema, conf)
+    if not ok then
+        return nil, err
+    end
+    return log_util.check_log_schema(conf)
+end
+
+
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do
+        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
+            core.log.warn("removing batch processor stale object, conf: ",
+                          core.json.delay_encode(key))
+            buffers[key] = nil
+        end
+    end
+
+    stale_timer_running = false
+end
+
+
+local function create_producer(nameserver_list, producer_config)
+    core.log.info("create new rocketmq producer instance")
+    local prod = producer.new(nameserver_list, "apisixLogProducer")
+    if producer_config.use_tls then
+        prod:setUseTLS(true)
+    end
+    if producer_config.access_key ~= '' then
+        local aclHook = acl_rpchook.new(producer_config.access_key, producer_config.secret_key)
+        prod:addRPCHook(aclHook)
+    end
+    prod:setTimeout(producer_config.timeout)
+    return prod
+end
+
+
+local function send_rocketmq_data(conf, log_message, prod)
+    local result, err = prod:send(conf.topic, log_message, conf.tag, conf.key)
+    if not result then
+        return false, "failed to send data to rocketmq topic: " .. err ..
+                ", nameserver_list: " .. core.json.encode(conf.nameserver_list)

Review comment:
       Which nameserver is error can be returned from the lua-resty-rocketmq library, We can create an issue there.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] yuz10 commented on a change in pull request #5653: feat: rocketmq logger

Posted by GitBox <gi...@apache.org>.
yuz10 commented on a change in pull request #5653:
URL: https://github.com/apache/apisix/pull/5653#discussion_r760089341



##########
File path: docs/zh/latest/plugins/rocketmq-logger.md
##########
@@ -0,0 +1,232 @@
+---
+title: rocketmq-logger
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+## 目录
+
+- [**简介**](#简介)
+- [**属性**](#属性)
+- [**工作原理**](#工作原理)
+- [**如何启用**](#如何启用)
+- [**测试插件**](#测试插件)
+- [**禁用插件**](#禁用插件)
+
+## 简介
+
+`rocketmq-logger` 是一个插件,可用作ngx_lua nginx 模块的 rocketmq 客户端驱动程序。

Review comment:
       fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] leslie-tsang commented on a change in pull request #5653: feat: rocketmq logger

Posted by GitBox <gi...@apache.org>.
leslie-tsang commented on a change in pull request #5653:
URL: https://github.com/apache/apisix/pull/5653#discussion_r760768602



##########
File path: docs/zh/latest/plugins/rocketmq-logger.md
##########
@@ -0,0 +1,229 @@
+---
+title: rocketmq-logger
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+## 目录
+
+- [**简介**](#简介)
+- [**属性**](#属性)
+- [**工作原理**](#工作原理)
+- [**如何启用**](#如何启用)
+- [**测试插件**](#测试插件)
+- [**禁用插件**](#禁用插件)
+
+## 简介
+
+`rocketmq-logger` 插件利用ngx_lua客户端能力,可推送JSON格式的请求日志到外部rocketmq集群。
+
+它可以将接口请求日志以 JSON 的形式推送给外部 rocketmq 集群。如果在短时间内没有收到日志数据,请放心,它会在我们的批处理处理器中的计时器功能到期后自动发送日志。

Review comment:
       The translation seems to be out of sync.
   BTW, please add a space between English and Chinese, Ref to [document-style-guide](https://github.com/ruanyf/document-style-guide/blob/master/docs/text.md#%E5%AD%97%E9%97%B4%E8%B7%9D)

##########
File path: docs/en/latest/plugins/rocketmq-logger.md
##########
@@ -0,0 +1,234 @@
+---
+title: rocketmq-logger
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+## Summary
+
+- [**Name**](#name)
+- [**Attributes**](#attributes)
+- [**Info**](#info)
+- [**How To Enable**](#how-to-enable)
+- [**Test Plugin**](#test-plugin)
+- [**Disable Plugin**](#disable-plugin)
+
+## Name
+
+`rocketmq-logger` is a plugin which provides the ability to push requests log data as JSON objects to your external rocketmq clusters.
+
+This plugin provides the ability to push requests log data as JSON objects to your external rocketmq clusters. In case if you did not receive the log data don't worry give it some time it will automatically send the logs after the timer function expires in our Batch Processor.

Review comment:
       Why should we repeat it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] spacewander commented on a change in pull request #5653: feat: rocketmq logger

Posted by GitBox <gi...@apache.org>.
spacewander commented on a change in pull request #5653:
URL: https://github.com/apache/apisix/pull/5653#discussion_r760703274



##########
File path: apisix/plugins/rocketmq-logger.lua
##########
@@ -0,0 +1,271 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core     = require("apisix.core")
+local log_util = require("apisix.utils.log-util")
+local producer = require ("resty.rocketmq.producer")
+local acl_rpchook = require("resty.rocketmq.acl_rpchook")
+local batch_processor = require("apisix.utils.batch-processor")
+local plugin = require("apisix.plugin")
+
+local pairs    = pairs
+local type     = type
+local ipairs   = ipairs
+local plugin_name = "rocketmq-logger"
+local stale_timer_running = false
+local timer_at = ngx.timer.at
+local ngx = ngx
+local buffers = {}
+
+local lrucache = core.lrucache.new({
+    type = "plugin",
+})
+
+local schema = {
+    type = "object",
+    properties = {
+        meta_format = {
+            type = "string",
+            default = "default",
+            enum = {"default", "origin"},
+        },
+        nameserver_list = {
+            type = "object",
+            minProperties = 1,
+            patternProperties = {
+                [".*"] = {
+                    description = "the port of rocketmq nameserver",
+                    type = "integer",
+                    minimum = 1,
+                    maximum = 65535,
+                },
+            },
+        },
+        rocketmq_topic = {type = "string"},

Review comment:
       We can use `topic` directly? Since this is already a rocketmq plugin, there is no need to repeat rocketmq.

##########
File path: apisix/plugins/rocketmq-logger.lua
##########
@@ -0,0 +1,271 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core     = require("apisix.core")
+local log_util = require("apisix.utils.log-util")
+local producer = require ("resty.rocketmq.producer")
+local acl_rpchook = require("resty.rocketmq.acl_rpchook")
+local batch_processor = require("apisix.utils.batch-processor")
+local plugin = require("apisix.plugin")
+
+local pairs    = pairs
+local type     = type
+local ipairs   = ipairs
+local plugin_name = "rocketmq-logger"
+local stale_timer_running = false
+local timer_at = ngx.timer.at
+local ngx = ngx
+local buffers = {}
+
+local lrucache = core.lrucache.new({
+    type = "plugin",
+})
+
+local schema = {
+    type = "object",
+    properties = {
+        meta_format = {
+            type = "string",
+            default = "default",
+            enum = {"default", "origin"},
+        },
+        nameserver_list = {
+            type = "object",
+            minProperties = 1,
+            patternProperties = {

Review comment:
       Is it necessary to use IP:PORT KV structure? There is no way to configure two brokers with the same IP, even they can have different ports.

##########
File path: ci/pod/docker-compose.yml
##########
@@ -355,10 +355,34 @@ services:
     networks:
       apisix_net:
 
+  namesrv:

Review comment:
       Better to add a rocketmq prefix for the service, because this file is a common file.

##########
File path: ci/pod/docker-compose.yml
##########
@@ -355,10 +355,34 @@ services:
     networks:
       apisix_net:
 
+  namesrv:
+    image: apacherocketmq/rocketmq:4.6.0
+    container_name: rmqnamesrv
+    restart: unless-stopped
+    ports:
+      - "9876:9876"
+    command: sh mqnamesrv
+    networks:
+      rocketmq_net:
+
+  broker:

Review comment:
       Ditto




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] spacewander merged pull request #5653: feat: rocketmq logger

Posted by GitBox <gi...@apache.org>.
spacewander merged pull request #5653:
URL: https://github.com/apache/apisix/pull/5653


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] tokers commented on a change in pull request #5653: feat: rocketmq logger

Posted by GitBox <gi...@apache.org>.
tokers commented on a change in pull request #5653:
URL: https://github.com/apache/apisix/pull/5653#discussion_r762515837



##########
File path: apisix/plugins/rocketmq-logger.lua
##########
@@ -0,0 +1,256 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core     = require("apisix.core")
+local log_util = require("apisix.utils.log-util")
+local producer = require ("resty.rocketmq.producer")
+local acl_rpchook = require("resty.rocketmq.acl_rpchook")
+local batch_processor = require("apisix.utils.batch-processor")
+local plugin = require("apisix.plugin")
+
+local type     = type
+local ipairs   = ipairs
+local plugin_name = "rocketmq-logger"
+local stale_timer_running = false
+local ngx = ngx
+local timer_at = ngx.timer.at
+local buffers = {}
+
+local lrucache = core.lrucache.new({
+    type = "plugin",
+})
+
+local schema = {
+    type = "object",
+    properties = {
+        meta_format = {
+            type = "string",
+            default = "default",
+            enum = {"default", "origin"},
+        },
+        nameserver_list = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "string"
+            }
+        },
+        topic = {type = "string"},
+        key = {type = "string"},
+        tag = {type = "string"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        use_tls = {type = "boolean", default = false},
+        access_key = {type = "string", default = ""},
+        secret_key = {type = "string", default = ""},
+        name = {type = "string", default = "rocketmq logger"},
+        max_retry_count = {type = "integer", minimum = 0, default = 0},
+        retry_delay = {type = "integer", minimum = 0, default = 1},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        include_req_body = {type = "boolean", default = false},
+        include_req_body_expr = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "array",
+                items = {
+                    type = "string"
+                }
+            }
+        },
+        include_resp_body = {type = "boolean", default = false},
+        include_resp_body_expr = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "array",
+                items = {
+                    type = "string"
+                }
+            }
+        },
+    },
+    required = {"nameserver_list", "topic"}
+}
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+local _M = {
+    version = 0.1,
+    priority = 402,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+
+    local ok, err = core.schema.check(schema, conf)
+    if not ok then
+        return nil, err
+    end
+    return log_util.check_log_schema(conf)
+end
+
+
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do

Review comment:
       The `buffers` is used as a table, here we should use `pairs`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] spacewander commented on a change in pull request #5653: feat: rocketmq logger

Posted by GitBox <gi...@apache.org>.
spacewander commented on a change in pull request #5653:
URL: https://github.com/apache/apisix/pull/5653#discussion_r759777287



##########
File path: docs/en/latest/plugins/rocketmq-logger.md
##########
@@ -0,0 +1,237 @@
+---
+title: rocketmq-logger
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+## Summary
+
+- [**Name**](#name)
+- [**Attributes**](#attributes)
+- [**Info**](#info)
+- [**How To Enable**](#how-to-enable)
+- [**Test Plugin**](#test-plugin)
+- [**Disable Plugin**](#disable-plugin)
+
+## Name
+
+`rocketmq-logger` is a plugin which works as a rocketmq client driver for the ngx_lua nginx module.

Review comment:
       ```suggestion
   `rocketmq-logger` is a plugin which provides the ability to push requests log data as JSON objects to your external rocketmq clusters.
   ```
   
   We can merge the sentence of the next phase into it.

##########
File path: docs/zh/latest/plugins/rocketmq-logger.md
##########
@@ -0,0 +1,232 @@
+---
+title: rocketmq-logger
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+## 目录
+
+- [**简介**](#简介)
+- [**属性**](#属性)
+- [**工作原理**](#工作原理)
+- [**如何启用**](#如何启用)
+- [**测试插件**](#测试插件)
+- [**禁用插件**](#禁用插件)
+
+## 简介
+
+`rocketmq-logger` 是一个插件,可用作ngx_lua nginx 模块的 rocketmq 客户端驱动程序。

Review comment:
       Ditto




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] shuaijinchao commented on a change in pull request #5653: feat: rocketmq logger

Posted by GitBox <gi...@apache.org>.
shuaijinchao commented on a change in pull request #5653:
URL: https://github.com/apache/apisix/pull/5653#discussion_r759794711



##########
File path: apisix/plugins/rocketmq-logger.lua
##########
@@ -0,0 +1,271 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core     = require("apisix.core")
+local log_util = require("apisix.utils.log-util")
+local producer = require ("resty.rocketmq.producer")
+local acl_rpchook = require("resty.rocketmq.acl_rpchook")
+local batch_processor = require("apisix.utils.batch-processor")
+local plugin = require("apisix.plugin")
+
+local pairs    = pairs
+local type     = type
+local ipairs   = ipairs
+local plugin_name = "rocketmq-logger"
+local stale_timer_running = false
+local timer_at = ngx.timer.at
+local ngx = ngx
+local buffers = {}
+
+local lrucache = core.lrucache.new({
+    type = "plugin",
+})
+
+local schema = {
+    type = "object",
+    properties = {
+        meta_format = {
+            type = "string",
+            default = "default",
+            enum = {"default", "origin"},
+        },
+        nameserver_list = {
+            type = "object",
+            minProperties = 1,
+            patternProperties = {
+                [".*"] = {
+                    description = "the port of rocketmq nameserver",
+                    type = "integer",
+                    minimum = 1,
+                    maximum = 65535,
+                },
+            },
+        },
+        rocketmq_topic = {type = "string"},
+        key = {type = "string"},
+        tag = {type = "string"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        use_tls = {type = "boolean", default = false},
+        access_key = {type = "string", default = ""},
+        secret_key = {type = "string", default = ""},
+        name = {type = "string", default = "rocketmq logger"},
+        max_retry_count = {type = "integer", minimum = 0, default = 0},
+        retry_delay = {type = "integer", minimum = 0, default = 1},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        include_req_body = {type = "boolean", default = false},
+        include_req_body_expr = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "array",
+                items = {
+                    type = "string"
+                }
+            }
+        },
+        include_resp_body = {type = "boolean", default = false},
+        include_resp_body_expr = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "array",
+                items = {
+                    type = "string"
+                }
+            }
+        },
+    },
+    required = {"nameserver_list", "rocketmq_topic"}
+}
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+local _M = {
+    version = 0.1,
+    priority = 402,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+
+    local ok, err = core.schema.check(schema, conf)
+    if not ok then
+        return nil, err
+    end
+    return log_util.check_log_schema(conf)
+end
+
+
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do
+        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
+            core.log.warn("removing batch processor stale object, conf: ",
+                          core.json.delay_encode(key))
+            buffers[key] = nil
+        end
+    end
+
+    stale_timer_running = false
+end
+
+
+local function create_producer(nameserver_list, producer_config)
+    core.log.info("create new rocketmq producer instance")
+    local prod = producer.new(nameserver_list, "apisixLogProducer")
+    if producer_config.use_tls then
+        prod:setUseTLS(true)
+    end
+    if producer_config.access_key ~= '' then
+        local aclHook = acl_rpchook.new(producer_config.access_key, producer_config.secret_key)
+        prod:addRPCHook(aclHook)
+    end
+    prod:setTimeout(producer_config.timeout)
+    return prod
+end
+
+
+local function send_rocketmq_data(conf, log_message, prod)
+    local result, err = prod:send(conf.rocketmq_topic, log_message, conf.tag, conf.key)
+    if not result then
+        return false, "failed to send data to rocketmq topic: " .. err ..
+                ", nameserver_list: " .. core.json.encode(conf.nameserver_list)
+    end
+
+    core.log.info("queue: ", result.sendResult.messageQueue.queueId)
+
+    return true
+end
+
+
+function _M.body_filter(conf, ctx)
+    log_util.collect_body(conf, ctx)
+end
+
+
+function _M.log(conf, ctx)
+    local entry
+    if conf.meta_format == "origin" then
+        entry = log_util.get_req_original(ctx, conf)
+        -- core.log.info("origin entry: ", entry)

Review comment:
       Can remove debugging code




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] tzssangglass commented on a change in pull request #5653: feat: rocketmq logger

Posted by GitBox <gi...@apache.org>.
tzssangglass commented on a change in pull request #5653:
URL: https://github.com/apache/apisix/pull/5653#discussion_r762585556



##########
File path: apisix/plugins/rocketmq-logger.lua
##########
@@ -0,0 +1,256 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core     = require("apisix.core")
+local log_util = require("apisix.utils.log-util")
+local producer = require ("resty.rocketmq.producer")
+local acl_rpchook = require("resty.rocketmq.acl_rpchook")
+local batch_processor = require("apisix.utils.batch-processor")
+local plugin = require("apisix.plugin")
+
+local type     = type
+local ipairs   = ipairs
+local plugin_name = "rocketmq-logger"
+local stale_timer_running = false
+local ngx = ngx
+local timer_at = ngx.timer.at
+local buffers = {}
+
+local lrucache = core.lrucache.new({
+    type = "plugin",
+})
+
+local schema = {
+    type = "object",
+    properties = {
+        meta_format = {
+            type = "string",
+            default = "default",
+            enum = {"default", "origin"},
+        },
+        nameserver_list = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "string"
+            }
+        },
+        topic = {type = "string"},
+        key = {type = "string"},
+        tag = {type = "string"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        use_tls = {type = "boolean", default = false},
+        access_key = {type = "string", default = ""},
+        secret_key = {type = "string", default = ""},
+        name = {type = "string", default = "rocketmq logger"},
+        max_retry_count = {type = "integer", minimum = 0, default = 0},
+        retry_delay = {type = "integer", minimum = 0, default = 1},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        include_req_body = {type = "boolean", default = false},
+        include_req_body_expr = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "array",
+                items = {
+                    type = "string"
+                }
+            }
+        },
+        include_resp_body = {type = "boolean", default = false},
+        include_resp_body_expr = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "array",
+                items = {
+                    type = "string"
+                }
+            }
+        },
+    },
+    required = {"nameserver_list", "topic"}
+}
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+local _M = {
+    version = 0.1,
+    priority = 402,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+
+    local ok, err = core.schema.check(schema, conf)
+    if not ok then
+        return nil, err
+    end
+    return log_util.check_log_schema(conf)
+end
+
+
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do
+        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
+            core.log.warn("removing batch processor stale object, conf: ",
+                          core.json.delay_encode(key))
+            buffers[key] = nil
+        end
+    end
+
+    stale_timer_running = false
+end
+
+
+local function create_producer(nameserver_list, producer_config)
+    core.log.info("create new rocketmq producer instance")
+    local prod = producer.new(nameserver_list, "apisixLogProducer")
+    if producer_config.use_tls then
+        prod:setUseTLS(true)
+    end
+    if producer_config.access_key ~= '' then
+        local aclHook = acl_rpchook.new(producer_config.access_key, producer_config.secret_key)
+        prod:addRPCHook(aclHook)
+    end
+    prod:setTimeout(producer_config.timeout)
+    return prod
+end
+
+
+local function send_rocketmq_data(conf, log_message, prod)
+    local result, err = prod:send(conf.topic, log_message, conf.tag, conf.key)
+    if not result then
+        return false, "failed to send data to rocketmq topic: " .. err ..
+                ", nameserver_list: " .. core.json.encode(conf.nameserver_list)

Review comment:
       Can we get the nameserver for the current error?

##########
File path: t/plugin/rocketmq-logger.t
##########
@@ -0,0 +1,1098 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+no_long_string();
+no_root_location();
+run_tests;
+
+__DATA__
+
+=== TEST 1: sanity
+--- config
+    location /t {
+        content_by_lua_block {
+            local plugin = require("apisix.plugins.rocketmq-logger")
+            local ok, err = plugin.check_schema({
+                 topic = "test",
+                 key = "key1",
+                 nameserver_list = {
+                    "127.0.0.1:3"
+                 }
+            })
+            if not ok then
+                ngx.say(err)
+            end
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+done
+--- no_error_log
+[error]
+
+
+
+=== TEST 2: missing nameserver list
+--- config
+    location /t {
+        content_by_lua_block {
+            local plugin = require("apisix.plugins.rocketmq-logger")
+            local ok, err = plugin.check_schema({topic = "test", key= "key1"})
+            if not ok then
+                ngx.say(err)
+            end
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+property "nameserver_list" is required
+done
+--- no_error_log
+[error]
+
+
+
+=== TEST 3: wrong type of string
+--- config
+    location /t {
+        content_by_lua_block {
+            local plugin = require("apisix.plugins.rocketmq-logger")
+            local ok, err = plugin.check_schema({
+                nameserver_list = {
+                    "127.0.0.1:3000"
+                },
+                timeout = "10",
+                topic ="test",
+                key= "key1"
+            })
+            if not ok then
+                ngx.say(err)
+            end
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+property "timeout" validation failed: wrong type: expected integer, got string
+done
+--- no_error_log
+[error]
+
+
+
+=== TEST 4: set route(id: 1)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "plugins": {
+                            "rocketmq-logger": {
+                                "nameserver_list" : [ "127.0.0.1:9876" ],
+                                "topic" : "test2",
+                                "key" : "key1",
+                                "timeout" : 1,
+                                "batch_max_size": 1
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]],
+                [[{
+                    "node": {
+                        "value": {
+                            "plugins": {
+                                 "rocketmq-logger": {
+                                    "nameserver_list" : [ "127.0.0.1:9876" ],
+                                    "topic" : "test2",
+                                    "key" : "key1",
+                                    "timeout" : 1,
+                                    "batch_max_size": 1
+                                }
+                            },
+                            "upstream": {
+                                "nodes": {
+                                    "127.0.0.1:1980": 1
+                                },
+                                "type": "roundrobin"
+                            },
+                            "uri": "/hello"
+                        },
+                        "key": "/apisix/routes/1"
+                    },
+                    "action": "set"
+                }]]
+                )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+--- no_error_log
+[error]
+
+
+
+=== TEST 5: access
+--- request
+GET /hello
+--- response_body
+hello world
+--- no_error_log
+[error]
+--- wait: 2
+
+
+
+=== TEST 6: error log
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "plugins": {
+                             "rocketmq-logger": {
+                                    "nameserver_list" : [ "127.0.0.1:9877" ],
+                                    "topic" : "test2",
+                                    "producer_type": "sync",
+                                    "key" : "key1",
+                                    "batch_max_size": 1
+                             }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]],
+                [[{
+                    "node": {
+                        "value": {
+                            "plugins": {
+                                "rocketmq-logger": {
+                                    "nameserver_list" : [ "127.0.0.1:9877" ],
+                                    "topic" : "test2",
+                                    "producer_type": "sync",
+                                    "key" : "key1",
+                                    "batch_max_size": 1
+                                }
+                            },
+                            "upstream": {
+                                "nodes": {
+                                    "127.0.0.1:1980": 1
+                                },
+                                "type": "roundrobin"
+                            },
+                            "uri": "/hello"
+                        },
+                        "key": "/apisix/routes/1"
+                    },
+                    "action": "set"
+                }]]
+                )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+            local http = require "resty.http"
+            local httpc = http.new()
+            local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello"
+            local res, err = httpc:request_uri(uri, {method = "GET"})
+        }
+    }
+--- request
+GET /t
+--- error_log
+failed to send data to rocketmq topic
+[error]
+--- wait: 1
+
+
+
+=== TEST 7: set route(meta_format = origin, include_req_body = true)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "plugins": {
+                            "rocketmq-logger": {
+                                "nameserver_list" : [ "127.0.0.1:9876" ],
+                                "topic" : "test2",
+                                "key" : "key1",
+                                "timeout" : 1,
+                                "batch_max_size": 1,
+                                "include_req_body": true,
+                                "meta_format": "origin"
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]]
+                )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+--- no_error_log
+[error]
+
+
+
+=== TEST 8: hit route, report log to rocketmq
+--- request
+GET /hello?ab=cd
+abcdef
+--- response_body
+hello world
+--- no_error_log
+[error]
+--- error_log
+send data to rocketmq: GET /hello?ab=cd HTTP/1.1
+host: localhost
+content-length: 6
+connection: close
+
+abcdef
+--- wait: 2
+
+
+
+=== TEST 9: set route(meta_format = origin, include_req_body = false)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "plugins": {
+                            "rocketmq-logger": {
+                                "nameserver_list" : [ "127.0.0.1:9876" ],
+                                "topic" : "test2",
+                                "key" : "key1",
+                                "timeout" : 1,
+                                "batch_max_size": 1,
+                                "include_req_body": false,
+                                "meta_format": "origin"
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]]
+                )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+--- no_error_log
+[error]
+
+
+
+=== TEST 10: hit route, report log to rocketmq
+--- request
+GET /hello?ab=cd
+abcdef
+--- response_body
+hello world
+--- no_error_log
+[error]
+--- error_log
+send data to rocketmq: GET /hello?ab=cd HTTP/1.1
+host: localhost
+content-length: 6
+connection: close
+--- wait: 2
+
+
+
+=== TEST 11: set route(meta_format = default)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "plugins": {
+                            "rocketmq-logger": {
+                                "nameserver_list" :  [ "127.0.0.1:9876" ],
+                                "topic" : "test2",
+                                "key" : "key1",
+                                "timeout" : 1,
+                                "batch_max_size": 1,
+                                "include_req_body": false
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]]
+                )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+--- no_error_log
+[error]
+
+
+
+=== TEST 12: hit route, report log to rocketmq
+--- request
+GET /hello?ab=cd
+abcdef
+--- response_body
+hello world
+--- no_error_log
+[error]
+--- error_log_like eval
+qr/send data to rocketmq: \{.*"upstream":"127.0.0.1:1980"/
+--- wait: 2
+
+
+
+=== TEST 13: set route(id: 1), missing key field
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "plugins": {
+                            "rocketmq-logger": {
+                                "nameserver_list" : [ "127.0.0.1:9876" ],
+                                "topic" : "test2",
+                                "timeout" : 1,
+                                "batch_max_size": 1
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]],
+                [[{
+                    "node": {
+                        "value": {
+                            "plugins": {
+                                 "rocketmq-logger": {
+                                    "nameserver_list" : [ "127.0.0.1:9876" ],
+                                    "topic" : "test2",
+                                    "timeout" : 1,
+                                    "batch_max_size": 1
+                                }
+                            },
+                            "upstream": {
+                                "nodes": {
+                                    "127.0.0.1:1980": 1
+                                },
+                                "type": "roundrobin"
+                            },
+                            "uri": "/hello"
+                        },
+                        "key": "/apisix/routes/1"
+                    },
+                    "action": "set"
+                }]]
+                )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+--- no_error_log
+[error]
+
+
+
+=== TEST 14: access, test key field is optional
+--- request
+GET /hello
+--- response_body
+hello world
+--- no_error_log
+[error]
+--- wait: 2
+
+
+
+=== TEST 15: set route(meta_format = default), missing key field
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "plugins": {
+                            "rocketmq-logger": {
+                                "nameserver_list" :  [ "127.0.0.1:9876" ],
+                                "topic" : "test2",
+                                "timeout" : 1,
+                                "batch_max_size": 1,
+                                "include_req_body": false
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]]
+                )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+--- no_error_log
+[error]
+
+
+
+=== TEST 16: hit route, report log to rocketmq
+--- request
+GET /hello?ab=cd
+abcdef
+--- response_body
+hello world
+--- no_error_log
+[error]
+--- error_log_like eval
+qr/send data to rocketmq: \{.*"upstream":"127.0.0.1:1980"/
+--- wait: 2
+
+
+
+=== TEST 17: use the topic with 3 partitions
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "plugins": {
+                            "rocketmq-logger": {
+                                "nameserver_list" :  [ "127.0.0.1:9876" ],
+                                "topic" : "test3",
+                                "timeout" : 1,
+                                "batch_max_size": 1,
+                                "include_req_body": false
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]]
+                )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+--- no_error_log
+[error]
+
+
+
+=== TEST 18: report log to rocketmq by different partitions
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "plugins": {
+                            "rocketmq-logger": {
+                                "nameserver_list" :  [ "127.0.0.1:9876" ],
+                                "topic" : "test3",
+                                "producer_type": "sync",
+                                "timeout" : 1,
+                                "batch_max_size": 1,
+                                "include_req_body": false
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]]
+                )
+
+            t('/hello',ngx.HTTP_GET)
+            ngx.sleep(0.5)
+            t('/hello',ngx.HTTP_GET)
+            ngx.sleep(0.5)
+            t('/hello',ngx.HTTP_GET)
+            ngx.sleep(0.5)
+        }
+    }
+--- request
+GET /t
+--- timeout: 5s
+--- ignore_response
+--- no_error_log
+[error]
+--- error_log eval
+[qr/queue: 1/,
+qr/queue: 0/,
+qr/queue: 2/]
+
+
+
+=== TEST 19: report log to rocketmq by different partitions in async mode
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "plugins": {
+                            "rocketmq-logger": {
+                                "nameserver_list" :  [ "127.0.0.1:9876" ],
+                                "topic" : "test3",
+                                "producer_type": "async",
+                                "timeout" : 1,
+                                "batch_max_size": 1,
+                                "include_req_body": false
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]]
+                )
+            t('/hello',ngx.HTTP_GET)
+            ngx.sleep(0.5)
+            t('/hello',ngx.HTTP_GET)
+            ngx.sleep(0.5)
+            t('/hello',ngx.HTTP_GET)
+            ngx.sleep(0.5)
+        }
+    }
+--- request
+GET /t
+--- timeout: 5s
+--- ignore_response
+--- no_error_log
+[error]
+--- error_log eval
+[qr/queue: 1/,
+qr/queue: 0/,
+qr/queue: 2/]
+
+
+
+=== TEST 20: update the nameserver_list, generate different rocketmq producers
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "upstream": {
+                        "nodes": {
+                            "127.0.0.1:1980": 1
+                        },
+                        "type": "roundrobin"
+                    },
+                    "uri": "/hello"
+                }]]
+            )
+            ngx.sleep(0.5)
+
+            if code >= 300 then
+                ngx.status = code
+                ngx.say("fail")
+                return
+            end
+
+            code, body = t('/apisix/admin/routes/1/plugins',
+                ngx.HTTP_PATCH,
+                 [[{
+                        "rocketmq-logger": {
+                            "nameserver_list" : [ "127.0.0.1:9876" ],
+                            "topic" : "test2",
+                            "timeout" : 1,
+                            "batch_max_size": 1,
+                            "include_req_body": false
+                        }
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+                ngx.say("fail")
+                return
+            end
+
+            t('/hello',ngx.HTTP_GET)
+            ngx.sleep(0.5)
+
+            code, body = t('/apisix/admin/routes/1/plugins',
+                ngx.HTTP_PATCH,
+                 [[{
+                        "rocketmq-logger": {
+                            "nameserver_list" :  [ "127.0.0.1:19876" ],

Review comment:
       `19876` is a rocketmq node? I can't find where it enable……

##########
File path: t/plugin/rocketmq-logger.t
##########
@@ -0,0 +1,1098 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+no_long_string();
+no_root_location();
+run_tests;
+
+__DATA__
+
+=== TEST 1: sanity
+--- config
+    location /t {
+        content_by_lua_block {
+            local plugin = require("apisix.plugins.rocketmq-logger")
+            local ok, err = plugin.check_schema({
+                 topic = "test",
+                 key = "key1",
+                 nameserver_list = {
+                    "127.0.0.1:3"
+                 }
+            })
+            if not ok then
+                ngx.say(err)
+            end
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+done
+--- no_error_log
+[error]
+
+
+
+=== TEST 2: missing nameserver list
+--- config
+    location /t {
+        content_by_lua_block {
+            local plugin = require("apisix.plugins.rocketmq-logger")
+            local ok, err = plugin.check_schema({topic = "test", key= "key1"})
+            if not ok then
+                ngx.say(err)
+            end
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+property "nameserver_list" is required
+done
+--- no_error_log
+[error]
+
+
+
+=== TEST 3: wrong type of string
+--- config
+    location /t {
+        content_by_lua_block {
+            local plugin = require("apisix.plugins.rocketmq-logger")
+            local ok, err = plugin.check_schema({
+                nameserver_list = {
+                    "127.0.0.1:3000"
+                },
+                timeout = "10",
+                topic ="test",
+                key= "key1"
+            })
+            if not ok then
+                ngx.say(err)
+            end
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+property "timeout" validation failed: wrong type: expected integer, got string
+done
+--- no_error_log
+[error]
+
+
+
+=== TEST 4: set route(id: 1)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "plugins": {
+                            "rocketmq-logger": {
+                                "nameserver_list" : [ "127.0.0.1:9876" ],
+                                "topic" : "test2",
+                                "key" : "key1",
+                                "timeout" : 1,
+                                "batch_max_size": 1
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]],
+                [[{
+                    "node": {
+                        "value": {
+                            "plugins": {
+                                 "rocketmq-logger": {
+                                    "nameserver_list" : [ "127.0.0.1:9876" ],
+                                    "topic" : "test2",
+                                    "key" : "key1",
+                                    "timeout" : 1,
+                                    "batch_max_size": 1
+                                }
+                            },
+                            "upstream": {
+                                "nodes": {
+                                    "127.0.0.1:1980": 1
+                                },
+                                "type": "roundrobin"
+                            },
+                            "uri": "/hello"
+                        },
+                        "key": "/apisix/routes/1"
+                    },
+                    "action": "set"
+                }]]
+                )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+--- no_error_log
+[error]
+
+
+
+=== TEST 5: access
+--- request
+GET /hello
+--- response_body
+hello world
+--- no_error_log
+[error]
+--- wait: 2
+
+
+
+=== TEST 6: error log

Review comment:
       better to rename as `=== TEST 6: unavailable nameserver` ?

##########
File path: apisix/plugins/rocketmq-logger.lua
##########
@@ -0,0 +1,256 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core     = require("apisix.core")
+local log_util = require("apisix.utils.log-util")
+local producer = require ("resty.rocketmq.producer")
+local acl_rpchook = require("resty.rocketmq.acl_rpchook")
+local batch_processor = require("apisix.utils.batch-processor")
+local plugin = require("apisix.plugin")
+
+local type     = type
+local ipairs   = ipairs
+local plugin_name = "rocketmq-logger"
+local stale_timer_running = false
+local ngx = ngx
+local timer_at = ngx.timer.at
+local buffers = {}
+
+local lrucache = core.lrucache.new({
+    type = "plugin",
+})
+
+local schema = {
+    type = "object",
+    properties = {
+        meta_format = {
+            type = "string",
+            default = "default",
+            enum = {"default", "origin"},
+        },
+        nameserver_list = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "string"
+            }
+        },
+        topic = {type = "string"},
+        key = {type = "string"},
+        tag = {type = "string"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        use_tls = {type = "boolean", default = false},
+        access_key = {type = "string", default = ""},
+        secret_key = {type = "string", default = ""},
+        name = {type = "string", default = "rocketmq logger"},
+        max_retry_count = {type = "integer", minimum = 0, default = 0},
+        retry_delay = {type = "integer", minimum = 0, default = 1},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        include_req_body = {type = "boolean", default = false},
+        include_req_body_expr = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "array",
+                items = {
+                    type = "string"
+                }
+            }
+        },
+        include_resp_body = {type = "boolean", default = false},
+        include_resp_body_expr = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "array",
+                items = {
+                    type = "string"
+                }
+            }
+        },
+    },
+    required = {"nameserver_list", "topic"}
+}
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+local _M = {
+    version = 0.1,
+    priority = 402,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+
+    local ok, err = core.schema.check(schema, conf)
+    if not ok then
+        return nil, err
+    end
+    return log_util.check_log_schema(conf)
+end
+
+
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do
+        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
+            core.log.warn("removing batch processor stale object, conf: ",
+                          core.json.delay_encode(key))
+            buffers[key] = nil
+        end
+    end
+
+    stale_timer_running = false
+end
+
+
+local function create_producer(nameserver_list, producer_config)
+    core.log.info("create new rocketmq producer instance")
+    local prod = producer.new(nameserver_list, "apisixLogProducer")

Review comment:
       ok, I saw this has covered by test cases.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] tzssangglass commented on a change in pull request #5653: feat: rocketmq logger

Posted by GitBox <gi...@apache.org>.
tzssangglass commented on a change in pull request #5653:
URL: https://github.com/apache/apisix/pull/5653#discussion_r762583502



##########
File path: apisix/plugins/rocketmq-logger.lua
##########
@@ -0,0 +1,256 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core     = require("apisix.core")
+local log_util = require("apisix.utils.log-util")
+local producer = require ("resty.rocketmq.producer")
+local acl_rpchook = require("resty.rocketmq.acl_rpchook")
+local batch_processor = require("apisix.utils.batch-processor")
+local plugin = require("apisix.plugin")
+
+local type     = type
+local ipairs   = ipairs
+local plugin_name = "rocketmq-logger"
+local stale_timer_running = false
+local ngx = ngx
+local timer_at = ngx.timer.at
+local buffers = {}
+
+local lrucache = core.lrucache.new({
+    type = "plugin",
+})
+
+local schema = {
+    type = "object",
+    properties = {
+        meta_format = {
+            type = "string",
+            default = "default",
+            enum = {"default", "origin"},
+        },
+        nameserver_list = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "string"
+            }
+        },
+        topic = {type = "string"},
+        key = {type = "string"},
+        tag = {type = "string"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        use_tls = {type = "boolean", default = false},
+        access_key = {type = "string", default = ""},
+        secret_key = {type = "string", default = ""},
+        name = {type = "string", default = "rocketmq logger"},
+        max_retry_count = {type = "integer", minimum = 0, default = 0},
+        retry_delay = {type = "integer", minimum = 0, default = 1},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        include_req_body = {type = "boolean", default = false},
+        include_req_body_expr = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "array",
+                items = {
+                    type = "string"
+                }
+            }
+        },
+        include_resp_body = {type = "boolean", default = false},
+        include_resp_body_expr = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "array",
+                items = {
+                    type = "string"
+                }
+            }
+        },
+    },
+    required = {"nameserver_list", "topic"}
+}
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+local _M = {
+    version = 0.1,
+    priority = 402,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+
+    local ok, err = core.schema.check(schema, conf)
+    if not ok then
+        return nil, err
+    end
+    return log_util.check_log_schema(conf)
+end
+
+
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do
+        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
+            core.log.warn("removing batch processor stale object, conf: ",
+                          core.json.delay_encode(key))
+            buffers[key] = nil
+        end
+    end
+
+    stale_timer_running = false
+end
+
+
+local function create_producer(nameserver_list, producer_config)
+    core.log.info("create new rocketmq producer instance")
+    local prod = producer.new(nameserver_list, "apisixLogProducer")

Review comment:
       In some cases the nameserver_list will be updated. So we need a new producer with new nameserver_list.

##########
File path: apisix/plugins/rocketmq-logger.lua
##########
@@ -0,0 +1,256 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core     = require("apisix.core")
+local log_util = require("apisix.utils.log-util")
+local producer = require ("resty.rocketmq.producer")
+local acl_rpchook = require("resty.rocketmq.acl_rpchook")
+local batch_processor = require("apisix.utils.batch-processor")
+local plugin = require("apisix.plugin")
+
+local type     = type
+local ipairs   = ipairs
+local plugin_name = "rocketmq-logger"
+local stale_timer_running = false
+local ngx = ngx
+local timer_at = ngx.timer.at
+local buffers = {}
+
+local lrucache = core.lrucache.new({
+    type = "plugin",
+})
+
+local schema = {
+    type = "object",
+    properties = {
+        meta_format = {
+            type = "string",
+            default = "default",
+            enum = {"default", "origin"},
+        },
+        nameserver_list = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "string"
+            }
+        },
+        topic = {type = "string"},
+        key = {type = "string"},
+        tag = {type = "string"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        use_tls = {type = "boolean", default = false},
+        access_key = {type = "string", default = ""},
+        secret_key = {type = "string", default = ""},
+        name = {type = "string", default = "rocketmq logger"},
+        max_retry_count = {type = "integer", minimum = 0, default = 0},
+        retry_delay = {type = "integer", minimum = 0, default = 1},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        include_req_body = {type = "boolean", default = false},
+        include_req_body_expr = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "array",
+                items = {
+                    type = "string"
+                }
+            }
+        },
+        include_resp_body = {type = "boolean", default = false},
+        include_resp_body_expr = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "array",
+                items = {
+                    type = "string"
+                }
+            }
+        },
+    },
+    required = {"nameserver_list", "topic"}
+}
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+local _M = {
+    version = 0.1,
+    priority = 402,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+
+    local ok, err = core.schema.check(schema, conf)
+    if not ok then
+        return nil, err
+    end
+    return log_util.check_log_schema(conf)
+end
+
+
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do
+        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
+            core.log.warn("removing batch processor stale object, conf: ",
+                          core.json.delay_encode(key))
+            buffers[key] = nil
+        end
+    end
+
+    stale_timer_running = false
+end
+
+
+local function create_producer(nameserver_list, producer_config)
+    core.log.info("create new rocketmq producer instance")
+    local prod = producer.new(nameserver_list, "apisixLogProducer")

Review comment:
       how to renew a producer?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] leslie-tsang commented on a change in pull request #5653: feat: rocketmq logger

Posted by GitBox <gi...@apache.org>.
leslie-tsang commented on a change in pull request #5653:
URL: https://github.com/apache/apisix/pull/5653#discussion_r760763518



##########
File path: docs/en/latest/plugins/rocketmq-logger.md
##########
@@ -0,0 +1,234 @@
+---
+title: rocketmq-logger
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+## Summary
+
+- [**Name**](#name)
+- [**Attributes**](#attributes)
+- [**Info**](#info)
+- [**How To Enable**](#how-to-enable)
+- [**Test Plugin**](#test-plugin)
+- [**Disable Plugin**](#disable-plugin)
+
+## Name
+
+`rocketmq-logger` is a plugin which provides the ability to push requests log data as JSON objects to your external rocketmq clusters.
+
+This plugin provides the ability to push requests log data as JSON objects to your external rocketmq clusters. In case if you did not receive the log data don't worry give it some time it will automatically send the logs after the timer function expires in our Batch Processor.
+
+For more info on Batch-Processor in Apache APISIX please refer.
+[Batch-Processor](../batch-processor.md)
+
+## Attributes
+
+| Name             | Type    | Requirement | Default        | Valid   | Description                                                                              |
+| ---------------- | ------- | ----------- | -------------- | ------- | ---------------------------------------------------------------------------------------- |
+| nameserver_list  | object  | required    |                |         | An array of rocketmq nameservers.                                                               |
+| topic            | string  | required    |                |         | Target  topic to push data.                                                              |
+| key              | string  | optional    |                |         | Keys of messages to send.                                               |
+| tag              | string  | optional   |                |         | Tags of messages to send.                           |
+| timeout          | integer | optional    | 3              | [1,...] | Timeout for the upstream to send data.                                                   |
+| use_tls          | boolean | optional   | false          |         | Whether to open TLS                          |
+| access_key       | string  | optional   | ""             |         | access key for ACL, empty string means disable ACL.     |
+| secret_key       | string  | optional   | ""             |         | secret key for ACL。                         |
+| name             | string  | optional    | "rocketmq logger" |         | A  unique identifier to identity the batch processor.                                     |
+| meta_format      | enum    | optional    | "default"      | ["default","origin"] | `default`: collect the request information with default JSON way. `origin`: collect the request information with original HTTP request. [example](#examples-of-meta_format)|
+| batch_max_size   | integer | optional    | 1000           | [1,...] | Set the maximum number of logs sent in each batch. When the number of logs reaches the set maximum, all logs will be automatically pushed to the `rocketmq` service.                         |
+| inactive_timeout | integer | optional    | 5              | [1,...] | The maximum time to refresh the buffer (in seconds). When the maximum refresh time is reached, all logs will be automatically pushed to the `rocketmq` service regardless of whether the number of logs in the buffer reaches the set maximum number. |
+| buffer_duration  | integer | optional    | 60             | [1,...] | Maximum age in seconds of the oldest entry in a batch before the batch must be processed.|
+| max_retry_count  | integer | optional    | 0              | [0,...] | Maximum number of retries before removing from the processing pipe line.                 |
+| retry_delay      | integer | optional    | 1              | [0,...] | Number of seconds the process execution should be delayed if the execution fails.        |
+| include_req_body | boolean | optional    | false          | [false, true] | Whether to include the request body. false: indicates that the requested body is not included; true: indicates that the requested body is included. Note: if the request body is too big to be kept in the memory, it can't be logged due to Nginx's limitation. |
+| include_req_body_expr  | array  | optional    |          |         | When `include_req_body` is true, control the behavior based on the result of the [lua-resty-expr](https://github.com/api7/lua-resty-expr) expression. If present, only log the request body when the result is true. |
+| include_resp_body| boolean | optional    | false         | [false, true] | Whether to include the response body. The response body is included if and only if it is `true`. |
+| include_resp_body_expr  | array  | optional    |          |         | When `include_resp_body` is true, control the behavior based on the result of the [lua-resty-expr](https://github.com/api7/lua-resty-expr) expression. If present, only log the response body when the result is true. |
+
+### examples of meta_format
+
+- **default**:
+
+    ```json

Review comment:
       ```suggestion
   ```json
   ```
   Can we remove these spaces?

##########
File path: apisix/plugins/rocketmq-logger.lua
##########
@@ -0,0 +1,271 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core     = require("apisix.core")
+local log_util = require("apisix.utils.log-util")
+local producer = require ("resty.rocketmq.producer")
+local acl_rpchook = require("resty.rocketmq.acl_rpchook")
+local batch_processor = require("apisix.utils.batch-processor")
+local plugin = require("apisix.plugin")
+
+local pairs    = pairs
+local type     = type
+local ipairs   = ipairs
+local plugin_name = "rocketmq-logger"
+local stale_timer_running = false
+local timer_at = ngx.timer.at
+local ngx = ngx
+local buffers = {}

Review comment:
       ```suggestion
   local pairs    = pairs
   local type     = type
   local ipairs   = ipairs
   local plugin_name = "rocketmq-logger"
   local stale_timer_running = false
   local ngx = ngx
   local timer_at = ngx.timer.at
   local buffers = {}
   ```
   Would this be better?

##########
File path: ci/pod/docker-compose.yml
##########
@@ -355,10 +355,34 @@ services:
     networks:
       apisix_net:
 
+  namesrv:

Review comment:
       hello there, you can add a `hostname` to handle this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] yuz10 commented on a change in pull request #5653: feat: rocketmq logger

Posted by GitBox <gi...@apache.org>.
yuz10 commented on a change in pull request #5653:
URL: https://github.com/apache/apisix/pull/5653#discussion_r760838391



##########
File path: docs/zh/latest/plugins/rocketmq-logger.md
##########
@@ -0,0 +1,229 @@
+---
+title: rocketmq-logger
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+## 目录
+
+- [**简介**](#简介)
+- [**属性**](#属性)
+- [**工作原理**](#工作原理)
+- [**如何启用**](#如何启用)
+- [**测试插件**](#测试插件)
+- [**禁用插件**](#禁用插件)
+
+## 简介
+
+`rocketmq-logger` 插件利用ngx_lua客户端能力,可推送JSON格式的请求日志到外部rocketmq集群。
+
+它可以将接口请求日志以 JSON 的形式推送给外部 rocketmq 集群。如果在短时间内没有收到日志数据,请放心,它会在我们的批处理处理器中的计时器功能到期后自动发送日志。

Review comment:
       ok




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] tzssangglass commented on a change in pull request #5653: feat: rocketmq logger

Posted by GitBox <gi...@apache.org>.
tzssangglass commented on a change in pull request #5653:
URL: https://github.com/apache/apisix/pull/5653#discussion_r762663779



##########
File path: apisix/plugins/rocketmq-logger.lua
##########
@@ -0,0 +1,256 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core     = require("apisix.core")
+local log_util = require("apisix.utils.log-util")
+local producer = require ("resty.rocketmq.producer")
+local acl_rpchook = require("resty.rocketmq.acl_rpchook")
+local batch_processor = require("apisix.utils.batch-processor")
+local plugin = require("apisix.plugin")
+
+local type     = type
+local ipairs   = ipairs
+local plugin_name = "rocketmq-logger"
+local stale_timer_running = false
+local ngx = ngx
+local timer_at = ngx.timer.at
+local buffers = {}
+
+local lrucache = core.lrucache.new({
+    type = "plugin",
+})
+
+local schema = {
+    type = "object",
+    properties = {
+        meta_format = {
+            type = "string",
+            default = "default",
+            enum = {"default", "origin"},
+        },
+        nameserver_list = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "string"
+            }
+        },
+        topic = {type = "string"},
+        key = {type = "string"},
+        tag = {type = "string"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        use_tls = {type = "boolean", default = false},
+        access_key = {type = "string", default = ""},
+        secret_key = {type = "string", default = ""},
+        name = {type = "string", default = "rocketmq logger"},
+        max_retry_count = {type = "integer", minimum = 0, default = 0},
+        retry_delay = {type = "integer", minimum = 0, default = 1},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        include_req_body = {type = "boolean", default = false},
+        include_req_body_expr = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "array",
+                items = {
+                    type = "string"
+                }
+            }
+        },
+        include_resp_body = {type = "boolean", default = false},
+        include_resp_body_expr = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "array",
+                items = {
+                    type = "string"
+                }
+            }
+        },
+    },
+    required = {"nameserver_list", "topic"}
+}
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+local _M = {
+    version = 0.1,
+    priority = 402,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+
+    local ok, err = core.schema.check(schema, conf)
+    if not ok then
+        return nil, err
+    end
+    return log_util.check_log_schema(conf)
+end
+
+
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do
+        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
+            core.log.warn("removing batch processor stale object, conf: ",
+                          core.json.delay_encode(key))
+            buffers[key] = nil
+        end
+    end
+
+    stale_timer_running = false
+end
+
+
+local function create_producer(nameserver_list, producer_config)
+    core.log.info("create new rocketmq producer instance")
+    local prod = producer.new(nameserver_list, "apisixLogProducer")
+    if producer_config.use_tls then
+        prod:setUseTLS(true)
+    end
+    if producer_config.access_key ~= '' then
+        local aclHook = acl_rpchook.new(producer_config.access_key, producer_config.secret_key)
+        prod:addRPCHook(aclHook)
+    end
+    prod:setTimeout(producer_config.timeout)
+    return prod
+end
+
+
+local function send_rocketmq_data(conf, log_message, prod)
+    local result, err = prod:send(conf.topic, log_message, conf.tag, conf.key)
+    if not result then
+        return false, "failed to send data to rocketmq topic: " .. err ..
+                ", nameserver_list: " .. core.json.encode(conf.nameserver_list)

Review comment:
       if we have more nameservers in nameserver_list, one of them is unavailable and this message is sent to the unavailable nameserver, can the log here only print out the unavailable nameserver?
   
   Ok, if this is not convenient to do in this PR, we can optimise it later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] yuz10 commented on a change in pull request #5653: feat: rocketmq logger

Posted by GitBox <gi...@apache.org>.
yuz10 commented on a change in pull request #5653:
URL: https://github.com/apache/apisix/pull/5653#discussion_r762658128



##########
File path: apisix/plugins/rocketmq-logger.lua
##########
@@ -0,0 +1,256 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core     = require("apisix.core")
+local log_util = require("apisix.utils.log-util")
+local producer = require ("resty.rocketmq.producer")
+local acl_rpchook = require("resty.rocketmq.acl_rpchook")
+local batch_processor = require("apisix.utils.batch-processor")
+local plugin = require("apisix.plugin")
+
+local type     = type
+local ipairs   = ipairs
+local plugin_name = "rocketmq-logger"
+local stale_timer_running = false
+local ngx = ngx
+local timer_at = ngx.timer.at
+local buffers = {}
+
+local lrucache = core.lrucache.new({
+    type = "plugin",
+})
+
+local schema = {
+    type = "object",
+    properties = {
+        meta_format = {
+            type = "string",
+            default = "default",
+            enum = {"default", "origin"},
+        },
+        nameserver_list = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "string"
+            }
+        },
+        topic = {type = "string"},
+        key = {type = "string"},
+        tag = {type = "string"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        use_tls = {type = "boolean", default = false},
+        access_key = {type = "string", default = ""},
+        secret_key = {type = "string", default = ""},
+        name = {type = "string", default = "rocketmq logger"},
+        max_retry_count = {type = "integer", minimum = 0, default = 0},
+        retry_delay = {type = "integer", minimum = 0, default = 1},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        include_req_body = {type = "boolean", default = false},
+        include_req_body_expr = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "array",
+                items = {
+                    type = "string"
+                }
+            }
+        },
+        include_resp_body = {type = "boolean", default = false},
+        include_resp_body_expr = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "array",
+                items = {
+                    type = "string"
+                }
+            }
+        },
+    },
+    required = {"nameserver_list", "topic"}
+}
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+local _M = {
+    version = 0.1,
+    priority = 402,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+
+    local ok, err = core.schema.check(schema, conf)
+    if not ok then
+        return nil, err
+    end
+    return log_util.check_log_schema(conf)
+end
+
+
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do
+        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
+            core.log.warn("removing batch processor stale object, conf: ",
+                          core.json.delay_encode(key))
+            buffers[key] = nil
+        end
+    end
+
+    stale_timer_running = false
+end
+
+
+local function create_producer(nameserver_list, producer_config)
+    core.log.info("create new rocketmq producer instance")
+    local prod = producer.new(nameserver_list, "apisixLogProducer")
+    if producer_config.use_tls then
+        prod:setUseTLS(true)
+    end
+    if producer_config.access_key ~= '' then
+        local aclHook = acl_rpchook.new(producer_config.access_key, producer_config.secret_key)
+        prod:addRPCHook(aclHook)
+    end
+    prod:setTimeout(producer_config.timeout)
+    return prod
+end
+
+
+local function send_rocketmq_data(conf, log_message, prod)
+    local result, err = prod:send(conf.topic, log_message, conf.tag, conf.key)
+    if not result then
+        return false, "failed to send data to rocketmq topic: " .. err ..
+                ", nameserver_list: " .. core.json.encode(conf.nameserver_list)

Review comment:
       Sorry, I dont understand what you mean. the nameserver is add to err and test case will verify the error log if send fail




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] yuz10 commented on a change in pull request #5653: feat: rocketmq logger

Posted by GitBox <gi...@apache.org>.
yuz10 commented on a change in pull request #5653:
URL: https://github.com/apache/apisix/pull/5653#discussion_r762658128



##########
File path: apisix/plugins/rocketmq-logger.lua
##########
@@ -0,0 +1,256 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core     = require("apisix.core")
+local log_util = require("apisix.utils.log-util")
+local producer = require ("resty.rocketmq.producer")
+local acl_rpchook = require("resty.rocketmq.acl_rpchook")
+local batch_processor = require("apisix.utils.batch-processor")
+local plugin = require("apisix.plugin")
+
+local type     = type
+local ipairs   = ipairs
+local plugin_name = "rocketmq-logger"
+local stale_timer_running = false
+local ngx = ngx
+local timer_at = ngx.timer.at
+local buffers = {}
+
+local lrucache = core.lrucache.new({
+    type = "plugin",
+})
+
+local schema = {
+    type = "object",
+    properties = {
+        meta_format = {
+            type = "string",
+            default = "default",
+            enum = {"default", "origin"},
+        },
+        nameserver_list = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "string"
+            }
+        },
+        topic = {type = "string"},
+        key = {type = "string"},
+        tag = {type = "string"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        use_tls = {type = "boolean", default = false},
+        access_key = {type = "string", default = ""},
+        secret_key = {type = "string", default = ""},
+        name = {type = "string", default = "rocketmq logger"},
+        max_retry_count = {type = "integer", minimum = 0, default = 0},
+        retry_delay = {type = "integer", minimum = 0, default = 1},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        include_req_body = {type = "boolean", default = false},
+        include_req_body_expr = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "array",
+                items = {
+                    type = "string"
+                }
+            }
+        },
+        include_resp_body = {type = "boolean", default = false},
+        include_resp_body_expr = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "array",
+                items = {
+                    type = "string"
+                }
+            }
+        },
+    },
+    required = {"nameserver_list", "topic"}
+}
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+local _M = {
+    version = 0.1,
+    priority = 402,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+
+    local ok, err = core.schema.check(schema, conf)
+    if not ok then
+        return nil, err
+    end
+    return log_util.check_log_schema(conf)
+end
+
+
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do
+        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
+            core.log.warn("removing batch processor stale object, conf: ",
+                          core.json.delay_encode(key))
+            buffers[key] = nil
+        end
+    end
+
+    stale_timer_running = false
+end
+
+
+local function create_producer(nameserver_list, producer_config)
+    core.log.info("create new rocketmq producer instance")
+    local prod = producer.new(nameserver_list, "apisixLogProducer")
+    if producer_config.use_tls then
+        prod:setUseTLS(true)
+    end
+    if producer_config.access_key ~= '' then
+        local aclHook = acl_rpchook.new(producer_config.access_key, producer_config.secret_key)
+        prod:addRPCHook(aclHook)
+    end
+    prod:setTimeout(producer_config.timeout)
+    return prod
+end
+
+
+local function send_rocketmq_data(conf, log_message, prod)
+    local result, err = prod:send(conf.topic, log_message, conf.tag, conf.key)
+    if not result then
+        return false, "failed to send data to rocketmq topic: " .. err ..
+                ", nameserver_list: " .. core.json.encode(conf.nameserver_list)

Review comment:
       Sorry, I dont understand what you mean. the nameserver is add to err and test case will verify the error log if send fail. such as https://github.com/yuz10/apisix/blob/master/t/plugin/rocketmq-logger.t#L862




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] yuz10 commented on a change in pull request #5653: feat: rocketmq logger

Posted by GitBox <gi...@apache.org>.
yuz10 commented on a change in pull request #5653:
URL: https://github.com/apache/apisix/pull/5653#discussion_r760837218



##########
File path: docs/en/latest/plugins/rocketmq-logger.md
##########
@@ -0,0 +1,234 @@
+---
+title: rocketmq-logger
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+## Summary
+
+- [**Name**](#name)
+- [**Attributes**](#attributes)
+- [**Info**](#info)
+- [**How To Enable**](#how-to-enable)
+- [**Test Plugin**](#test-plugin)
+- [**Disable Plugin**](#disable-plugin)
+
+## Name
+
+`rocketmq-logger` is a plugin which provides the ability to push requests log data as JSON objects to your external rocketmq clusters.
+
+This plugin provides the ability to push requests log data as JSON objects to your external rocketmq clusters. In case if you did not receive the log data don't worry give it some time it will automatically send the logs after the timer function expires in our Batch Processor.
+
+For more info on Batch-Processor in Apache APISIX please refer.
+[Batch-Processor](../batch-processor.md)
+
+## Attributes
+
+| Name             | Type    | Requirement | Default        | Valid   | Description                                                                              |
+| ---------------- | ------- | ----------- | -------------- | ------- | ---------------------------------------------------------------------------------------- |
+| nameserver_list  | object  | required    |                |         | An array of rocketmq nameservers.                                                               |
+| topic            | string  | required    |                |         | Target  topic to push data.                                                              |
+| key              | string  | optional    |                |         | Keys of messages to send.                                               |
+| tag              | string  | optional   |                |         | Tags of messages to send.                           |
+| timeout          | integer | optional    | 3              | [1,...] | Timeout for the upstream to send data.                                                   |
+| use_tls          | boolean | optional   | false          |         | Whether to open TLS                          |
+| access_key       | string  | optional   | ""             |         | access key for ACL, empty string means disable ACL.     |
+| secret_key       | string  | optional   | ""             |         | secret key for ACL。                         |
+| name             | string  | optional    | "rocketmq logger" |         | A  unique identifier to identity the batch processor.                                     |
+| meta_format      | enum    | optional    | "default"      | ["default","origin"] | `default`: collect the request information with default JSON way. `origin`: collect the request information with original HTTP request. [example](#examples-of-meta_format)|
+| batch_max_size   | integer | optional    | 1000           | [1,...] | Set the maximum number of logs sent in each batch. When the number of logs reaches the set maximum, all logs will be automatically pushed to the `rocketmq` service.                         |
+| inactive_timeout | integer | optional    | 5              | [1,...] | The maximum time to refresh the buffer (in seconds). When the maximum refresh time is reached, all logs will be automatically pushed to the `rocketmq` service regardless of whether the number of logs in the buffer reaches the set maximum number. |
+| buffer_duration  | integer | optional    | 60             | [1,...] | Maximum age in seconds of the oldest entry in a batch before the batch must be processed.|
+| max_retry_count  | integer | optional    | 0              | [0,...] | Maximum number of retries before removing from the processing pipe line.                 |
+| retry_delay      | integer | optional    | 1              | [0,...] | Number of seconds the process execution should be delayed if the execution fails.        |
+| include_req_body | boolean | optional    | false          | [false, true] | Whether to include the request body. false: indicates that the requested body is not included; true: indicates that the requested body is included. Note: if the request body is too big to be kept in the memory, it can't be logged due to Nginx's limitation. |
+| include_req_body_expr  | array  | optional    |          |         | When `include_req_body` is true, control the behavior based on the result of the [lua-resty-expr](https://github.com/api7/lua-resty-expr) expression. If present, only log the request body when the result is true. |
+| include_resp_body| boolean | optional    | false         | [false, true] | Whether to include the response body. The response body is included if and only if it is `true`. |
+| include_resp_body_expr  | array  | optional    |          |         | When `include_resp_body` is true, control the behavior based on the result of the [lua-resty-expr](https://github.com/api7/lua-resty-expr) expression. If present, only log the response body when the result is true. |
+
+### examples of meta_format
+
+- **default**:
+
+    ```json

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] yuz10 commented on a change in pull request #5653: feat: rocketmq logger

Posted by GitBox <gi...@apache.org>.
yuz10 commented on a change in pull request #5653:
URL: https://github.com/apache/apisix/pull/5653#discussion_r760838167



##########
File path: docs/en/latest/plugins/rocketmq-logger.md
##########
@@ -0,0 +1,234 @@
+---
+title: rocketmq-logger
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+## Summary
+
+- [**Name**](#name)
+- [**Attributes**](#attributes)
+- [**Info**](#info)
+- [**How To Enable**](#how-to-enable)
+- [**Test Plugin**](#test-plugin)
+- [**Disable Plugin**](#disable-plugin)
+
+## Name
+
+`rocketmq-logger` is a plugin which provides the ability to push requests log data as JSON objects to your external rocketmq clusters.
+
+This plugin provides the ability to push requests log data as JSON objects to your external rocketmq clusters. In case if you did not receive the log data don't worry give it some time it will automatically send the logs after the timer function expires in our Batch Processor.

Review comment:
       duplication deleted now , that was copied from kafka-logger




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] membphis commented on pull request #5653: feat: rocketmq logger

Posted by GitBox <gi...@apache.org>.
membphis commented on pull request #5653:
URL: https://github.com/apache/apisix/pull/5653#issuecomment-987473648


   Thank you very much for your outstanding contribution @yuz10


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] yuz10 commented on a change in pull request #5653: feat: rocketmq logger

Posted by GitBox <gi...@apache.org>.
yuz10 commented on a change in pull request #5653:
URL: https://github.com/apache/apisix/pull/5653#discussion_r762656485



##########
File path: t/plugin/rocketmq-logger.t
##########
@@ -0,0 +1,1098 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+no_long_string();
+no_root_location();
+run_tests;
+
+__DATA__
+
+=== TEST 1: sanity
+--- config
+    location /t {
+        content_by_lua_block {
+            local plugin = require("apisix.plugins.rocketmq-logger")
+            local ok, err = plugin.check_schema({
+                 topic = "test",
+                 key = "key1",
+                 nameserver_list = {
+                    "127.0.0.1:3"
+                 }
+            })
+            if not ok then
+                ngx.say(err)
+            end
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+done
+--- no_error_log
+[error]
+
+
+
+=== TEST 2: missing nameserver list
+--- config
+    location /t {
+        content_by_lua_block {
+            local plugin = require("apisix.plugins.rocketmq-logger")
+            local ok, err = plugin.check_schema({topic = "test", key= "key1"})
+            if not ok then
+                ngx.say(err)
+            end
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+property "nameserver_list" is required
+done
+--- no_error_log
+[error]
+
+
+
+=== TEST 3: wrong type of string
+--- config
+    location /t {
+        content_by_lua_block {
+            local plugin = require("apisix.plugins.rocketmq-logger")
+            local ok, err = plugin.check_schema({
+                nameserver_list = {
+                    "127.0.0.1:3000"
+                },
+                timeout = "10",
+                topic ="test",
+                key= "key1"
+            })
+            if not ok then
+                ngx.say(err)
+            end
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+property "timeout" validation failed: wrong type: expected integer, got string
+done
+--- no_error_log
+[error]
+
+
+
+=== TEST 4: set route(id: 1)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "plugins": {
+                            "rocketmq-logger": {
+                                "nameserver_list" : [ "127.0.0.1:9876" ],
+                                "topic" : "test2",
+                                "key" : "key1",
+                                "timeout" : 1,
+                                "batch_max_size": 1
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]],
+                [[{
+                    "node": {
+                        "value": {
+                            "plugins": {
+                                 "rocketmq-logger": {
+                                    "nameserver_list" : [ "127.0.0.1:9876" ],
+                                    "topic" : "test2",
+                                    "key" : "key1",
+                                    "timeout" : 1,
+                                    "batch_max_size": 1
+                                }
+                            },
+                            "upstream": {
+                                "nodes": {
+                                    "127.0.0.1:1980": 1
+                                },
+                                "type": "roundrobin"
+                            },
+                            "uri": "/hello"
+                        },
+                        "key": "/apisix/routes/1"
+                    },
+                    "action": "set"
+                }]]
+                )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+--- no_error_log
+[error]
+
+
+
+=== TEST 5: access
+--- request
+GET /hello
+--- response_body
+hello world
+--- no_error_log
+[error]
+--- wait: 2
+
+
+
+=== TEST 6: error log
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "plugins": {
+                             "rocketmq-logger": {
+                                    "nameserver_list" : [ "127.0.0.1:9877" ],
+                                    "topic" : "test2",
+                                    "producer_type": "sync",
+                                    "key" : "key1",
+                                    "batch_max_size": 1
+                             }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]],
+                [[{
+                    "node": {
+                        "value": {
+                            "plugins": {
+                                "rocketmq-logger": {
+                                    "nameserver_list" : [ "127.0.0.1:9877" ],
+                                    "topic" : "test2",
+                                    "producer_type": "sync",
+                                    "key" : "key1",
+                                    "batch_max_size": 1
+                                }
+                            },
+                            "upstream": {
+                                "nodes": {
+                                    "127.0.0.1:1980": 1
+                                },
+                                "type": "roundrobin"
+                            },
+                            "uri": "/hello"
+                        },
+                        "key": "/apisix/routes/1"
+                    },
+                    "action": "set"
+                }]]
+                )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+            local http = require "resty.http"
+            local httpc = http.new()
+            local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello"
+            local res, err = httpc:request_uri(uri, {method = "GET"})
+        }
+    }
+--- request
+GET /t
+--- error_log
+failed to send data to rocketmq topic
+[error]
+--- wait: 1
+
+
+
+=== TEST 7: set route(meta_format = origin, include_req_body = true)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "plugins": {
+                            "rocketmq-logger": {
+                                "nameserver_list" : [ "127.0.0.1:9876" ],
+                                "topic" : "test2",
+                                "key" : "key1",
+                                "timeout" : 1,
+                                "batch_max_size": 1,
+                                "include_req_body": true,
+                                "meta_format": "origin"
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]]
+                )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+--- no_error_log
+[error]
+
+
+
+=== TEST 8: hit route, report log to rocketmq
+--- request
+GET /hello?ab=cd
+abcdef
+--- response_body
+hello world
+--- no_error_log
+[error]
+--- error_log
+send data to rocketmq: GET /hello?ab=cd HTTP/1.1
+host: localhost
+content-length: 6
+connection: close
+
+abcdef
+--- wait: 2
+
+
+
+=== TEST 9: set route(meta_format = origin, include_req_body = false)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "plugins": {
+                            "rocketmq-logger": {
+                                "nameserver_list" : [ "127.0.0.1:9876" ],
+                                "topic" : "test2",
+                                "key" : "key1",
+                                "timeout" : 1,
+                                "batch_max_size": 1,
+                                "include_req_body": false,
+                                "meta_format": "origin"
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]]
+                )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+--- no_error_log
+[error]
+
+
+
+=== TEST 10: hit route, report log to rocketmq
+--- request
+GET /hello?ab=cd
+abcdef
+--- response_body
+hello world
+--- no_error_log
+[error]
+--- error_log
+send data to rocketmq: GET /hello?ab=cd HTTP/1.1
+host: localhost
+content-length: 6
+connection: close
+--- wait: 2
+
+
+
+=== TEST 11: set route(meta_format = default)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "plugins": {
+                            "rocketmq-logger": {
+                                "nameserver_list" :  [ "127.0.0.1:9876" ],
+                                "topic" : "test2",
+                                "key" : "key1",
+                                "timeout" : 1,
+                                "batch_max_size": 1,
+                                "include_req_body": false
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]]
+                )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+--- no_error_log
+[error]
+
+
+
+=== TEST 12: hit route, report log to rocketmq
+--- request
+GET /hello?ab=cd
+abcdef
+--- response_body
+hello world
+--- no_error_log
+[error]
+--- error_log_like eval
+qr/send data to rocketmq: \{.*"upstream":"127.0.0.1:1980"/
+--- wait: 2
+
+
+
+=== TEST 13: set route(id: 1), missing key field
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "plugins": {
+                            "rocketmq-logger": {
+                                "nameserver_list" : [ "127.0.0.1:9876" ],
+                                "topic" : "test2",
+                                "timeout" : 1,
+                                "batch_max_size": 1
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]],
+                [[{
+                    "node": {
+                        "value": {
+                            "plugins": {
+                                 "rocketmq-logger": {
+                                    "nameserver_list" : [ "127.0.0.1:9876" ],
+                                    "topic" : "test2",
+                                    "timeout" : 1,
+                                    "batch_max_size": 1
+                                }
+                            },
+                            "upstream": {
+                                "nodes": {
+                                    "127.0.0.1:1980": 1
+                                },
+                                "type": "roundrobin"
+                            },
+                            "uri": "/hello"
+                        },
+                        "key": "/apisix/routes/1"
+                    },
+                    "action": "set"
+                }]]
+                )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+--- no_error_log
+[error]
+
+
+
+=== TEST 14: access, test key field is optional
+--- request
+GET /hello
+--- response_body
+hello world
+--- no_error_log
+[error]
+--- wait: 2
+
+
+
+=== TEST 15: set route(meta_format = default), missing key field
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "plugins": {
+                            "rocketmq-logger": {
+                                "nameserver_list" :  [ "127.0.0.1:9876" ],
+                                "topic" : "test2",
+                                "timeout" : 1,
+                                "batch_max_size": 1,
+                                "include_req_body": false
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]]
+                )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+--- no_error_log
+[error]
+
+
+
+=== TEST 16: hit route, report log to rocketmq
+--- request
+GET /hello?ab=cd
+abcdef
+--- response_body
+hello world
+--- no_error_log
+[error]
+--- error_log_like eval
+qr/send data to rocketmq: \{.*"upstream":"127.0.0.1:1980"/
+--- wait: 2
+
+
+
+=== TEST 17: use the topic with 3 partitions
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "plugins": {
+                            "rocketmq-logger": {
+                                "nameserver_list" :  [ "127.0.0.1:9876" ],
+                                "topic" : "test3",
+                                "timeout" : 1,
+                                "batch_max_size": 1,
+                                "include_req_body": false
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]]
+                )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+--- no_error_log
+[error]
+
+
+
+=== TEST 18: report log to rocketmq by different partitions
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "plugins": {
+                            "rocketmq-logger": {
+                                "nameserver_list" :  [ "127.0.0.1:9876" ],
+                                "topic" : "test3",
+                                "producer_type": "sync",
+                                "timeout" : 1,
+                                "batch_max_size": 1,
+                                "include_req_body": false
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]]
+                )
+
+            t('/hello',ngx.HTTP_GET)
+            ngx.sleep(0.5)
+            t('/hello',ngx.HTTP_GET)
+            ngx.sleep(0.5)
+            t('/hello',ngx.HTTP_GET)
+            ngx.sleep(0.5)
+        }
+    }
+--- request
+GET /t
+--- timeout: 5s
+--- ignore_response
+--- no_error_log
+[error]
+--- error_log eval
+[qr/queue: 1/,
+qr/queue: 0/,
+qr/queue: 2/]
+
+
+
+=== TEST 19: report log to rocketmq by different partitions in async mode
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "plugins": {
+                            "rocketmq-logger": {
+                                "nameserver_list" :  [ "127.0.0.1:9876" ],
+                                "topic" : "test3",
+                                "producer_type": "async",
+                                "timeout" : 1,
+                                "batch_max_size": 1,
+                                "include_req_body": false
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]]
+                )
+            t('/hello',ngx.HTTP_GET)
+            ngx.sleep(0.5)
+            t('/hello',ngx.HTTP_GET)
+            ngx.sleep(0.5)
+            t('/hello',ngx.HTTP_GET)
+            ngx.sleep(0.5)
+        }
+    }
+--- request
+GET /t
+--- timeout: 5s
+--- ignore_response
+--- no_error_log
+[error]
+--- error_log eval
+[qr/queue: 1/,
+qr/queue: 0/,
+qr/queue: 2/]
+
+
+
+=== TEST 20: update the nameserver_list, generate different rocketmq producers
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "upstream": {
+                        "nodes": {
+                            "127.0.0.1:1980": 1
+                        },
+                        "type": "roundrobin"
+                    },
+                    "uri": "/hello"
+                }]]
+            )
+            ngx.sleep(0.5)
+
+            if code >= 300 then
+                ngx.status = code
+                ngx.say("fail")
+                return
+            end
+
+            code, body = t('/apisix/admin/routes/1/plugins',
+                ngx.HTTP_PATCH,
+                 [[{
+                        "rocketmq-logger": {
+                            "nameserver_list" : [ "127.0.0.1:9876" ],
+                            "topic" : "test2",
+                            "timeout" : 1,
+                            "batch_max_size": 1,
+                            "include_req_body": false
+                        }
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+                ngx.say("fail")
+                return
+            end
+
+            t('/hello',ngx.HTTP_GET)
+            ngx.sleep(0.5)
+
+            code, body = t('/apisix/admin/routes/1/plugins',
+                ngx.HTTP_PATCH,
+                 [[{
+                        "rocketmq-logger": {
+                            "nameserver_list" :  [ "127.0.0.1:19876" ],

Review comment:
       19876 is just a test for nameserver change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] yuz10 commented on a change in pull request #5653: feat: rocketmq logger

Posted by GitBox <gi...@apache.org>.
yuz10 commented on a change in pull request #5653:
URL: https://github.com/apache/apisix/pull/5653#discussion_r760723620



##########
File path: ci/pod/docker-compose.yml
##########
@@ -355,10 +355,34 @@ services:
     networks:
       apisix_net:
 
+  namesrv:

Review comment:
       the conf in contaner use this name as a host address, if changed, broker will fail to connect nameserver. and I cant find a way to change the config file, do you have any idea?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] yuz10 commented on a change in pull request #5653: feat: rocketmq logger

Posted by GitBox <gi...@apache.org>.
yuz10 commented on a change in pull request #5653:
URL: https://github.com/apache/apisix/pull/5653#discussion_r760837115



##########
File path: apisix/plugins/rocketmq-logger.lua
##########
@@ -0,0 +1,271 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core     = require("apisix.core")
+local log_util = require("apisix.utils.log-util")
+local producer = require ("resty.rocketmq.producer")
+local acl_rpchook = require("resty.rocketmq.acl_rpchook")
+local batch_processor = require("apisix.utils.batch-processor")
+local plugin = require("apisix.plugin")
+
+local pairs    = pairs
+local type     = type
+local ipairs   = ipairs
+local plugin_name = "rocketmq-logger"
+local stale_timer_running = false
+local timer_at = ngx.timer.at
+local ngx = ngx
+local buffers = {}

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org