You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by sp...@apache.org on 2021/02/02 09:07:51 UTC
[apisix] branch master updated: chore(kafka-logger): delay log
partition_id (#3481)
This is an automated email from the ASF dual-hosted git repository.
spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new 436beab chore(kafka-logger): delay log partition_id (#3481)
436beab is described below
commit 436beab278b656a95b44beca2e4b2684f3d334a7
Author: 罗泽轩 <sp...@gmail.com>
AuthorDate: Tue Feb 2 17:07:28 2021 +0800
chore(kafka-logger): delay log partition_id (#3481)
---
apisix/core/log.lua | 48 +++++++++++++++++++++++++++++++++++++++++
apisix/plugins/kafka-logger.lua | 7 +++---
2 files changed, 52 insertions(+), 3 deletions(-)
diff --git a/apisix/core/log.lua b/apisix/core/log.lua
index f36996a..c94d95d 100644
--- a/apisix/core/log.lua
+++ b/apisix/core/log.lua
@@ -18,7 +18,12 @@
local ngx = ngx
local ngx_log = ngx.log
local require = require
+local select = select
local setmetatable = setmetatable
+local tostring = tostring
+local unpack = unpack
+-- avoid loading other module since core.log is the most foundational one
+local tab_clear = require("table.clear")
local _M = {version = 0.4}
@@ -87,4 +92,47 @@ setmetatable(_M, {__index = function(self, cmd)
end})
+local delay_tab = setmetatable({
+ func = function() end,
+ args = {},
+ res = nil,
+ }, {
+ __tostring = function(self)
+ -- the `__tostring` will be called twice, the first to get the length and
+ -- the second to get the data
+ if self.res then
+ local res = self.res
+ -- avoid unexpected reference
+ self.res = nil
+ return res
+ end
+
+ local res, err = self.func(unpack(self.args))
+ if err then
+ ngx.log(ngx.WARN, "failed to exec: ", err)
+ end
+
+ -- avoid unexpected reference
+ tab_clear(self.args)
+ self.res = tostring(res)
+ return self.res
+ end
+})
+
+
+-- It works well with log.$level, eg: log.info(..., log.delay_exec(func, ...))
+-- Should not use it elsewhere.
+function _M.delay_exec(func, ...)
+ delay_tab.func = func
+
+ tab_clear(delay_tab.args)
+ for i = 1, select('#', ...) do
+ delay_tab.args[i] = select(i, ...)
+ end
+
+ delay_tab.res = nil
+ return delay_tab
+end
+
+
return _M
diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua
index a545b3b..9bc8103 100644
--- a/apisix/plugins/kafka-logger.lua
+++ b/apisix/plugins/kafka-logger.lua
@@ -70,7 +70,7 @@ function _M.check_schema(conf)
end
-local function partition_id(sendbuffer, topic, log_message)
+local function get_partition_id(sendbuffer, topic, log_message)
if not sendbuffer.topics[topic] then
core.log.info("current topic in sendbuffer has no message")
return nil
@@ -113,8 +113,9 @@ local function send_kafka_data(conf, log_message, prod)
end
local ok, err = prod:send(conf.kafka_topic, conf.key, log_message)
- core.log.info("partition_id: ", partition_id(prod.sendbuffer,
- conf.kafka_topic, log_message))
+ core.log.info("partition_id: ",
+ core.log.delay_exec(get_partition_id,
+ prod.sendbuffer, conf.kafka_topic, log_message))
if not ok then
return nil, "failed to send data to Kafka topic: " .. err