You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by sp...@apache.org on 2021/02/02 09:07:51 UTC

[apisix] branch master updated: chore(kafka-logger): delay log partition_id (#3481)

This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new 436beab  chore(kafka-logger): delay log partition_id (#3481)
436beab is described below

commit 436beab278b656a95b44beca2e4b2684f3d334a7
Author: 罗泽轩 <sp...@gmail.com>
AuthorDate: Tue Feb 2 17:07:28 2021 +0800

    chore(kafka-logger): delay log partition_id (#3481)
---
 apisix/core/log.lua             | 48 +++++++++++++++++++++++++++++++++++++++++
 apisix/plugins/kafka-logger.lua |  7 +++---
 2 files changed, 52 insertions(+), 3 deletions(-)

diff --git a/apisix/core/log.lua b/apisix/core/log.lua
index f36996a..c94d95d 100644
--- a/apisix/core/log.lua
+++ b/apisix/core/log.lua
@@ -18,7 +18,12 @@
 local ngx = ngx
 local ngx_log  = ngx.log
 local require  = require
+local select = select
 local setmetatable = setmetatable
+local tostring = tostring
+local unpack = unpack
+-- avoid loading other module since core.log is the most foundational one
+local tab_clear = require("table.clear")
 
 
 local _M = {version = 0.4}
@@ -87,4 +92,47 @@ setmetatable(_M, {__index = function(self, cmd)
 end})
 
 
+local delay_tab = setmetatable({
+    func = function() end,
+    args = {},
+    res = nil,
+    }, {
+    __tostring = function(self)
+        -- the `__tostring` will be called twice, the first to get the length and
+        -- the second to get the data
+        if self.res then
+            local res = self.res
+            -- avoid unexpected reference
+            self.res = nil
+            return res
+        end
+
+        local res, err = self.func(unpack(self.args))
+        if err then
+            ngx.log(ngx.WARN, "failed to exec: ", err)
+        end
+
+        -- avoid unexpected reference
+        tab_clear(self.args)
+        self.res = tostring(res)
+        return self.res
+    end
+})
+
+
+-- It works well with log.$level, eg: log.info(..., log.delay_exec(func, ...))
+-- Should not use it elsewhere.
+function _M.delay_exec(func, ...)
+    delay_tab.func = func
+
+    tab_clear(delay_tab.args)
+    for i = 1, select('#', ...) do
+        delay_tab.args[i] = select(i, ...)
+    end
+
+    delay_tab.res = nil
+    return delay_tab
+end
+
+
 return _M
diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua
index a545b3b..9bc8103 100644
--- a/apisix/plugins/kafka-logger.lua
+++ b/apisix/plugins/kafka-logger.lua
@@ -70,7 +70,7 @@ function _M.check_schema(conf)
 end
 
 
-local function partition_id(sendbuffer, topic, log_message)
+local function get_partition_id(sendbuffer, topic, log_message)
     if not sendbuffer.topics[topic] then
         core.log.info("current topic in sendbuffer has no message")
         return nil
@@ -113,8 +113,9 @@ local function send_kafka_data(conf, log_message, prod)
     end
 
     local ok, err = prod:send(conf.kafka_topic, conf.key, log_message)
-    core.log.info("partition_id: ", partition_id(prod.sendbuffer,
-            conf.kafka_topic, log_message))
+    core.log.info("partition_id: ",
+                  core.log.delay_exec(get_partition_id,
+                                      prod.sendbuffer, conf.kafka_topic, log_message))
 
     if not ok then
         return nil, "failed to send data to Kafka topic: " .. err