You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by GitBox <gi...@apache.org> on 2020/03/27 15:48:37 UTC

[GitHub] [incubator-apisix] sshniro opened a new pull request #1355: Updating the UDP logger to use the batch processor util

sshniro opened a new pull request #1355: Updating the UDP logger to use the batch processor util
URL: https://github.com/apache/incubator-apisix/pull/1355
 
 
   ### Summary
   Updates the UDP logger to use the batch processor util
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] sshniro commented on a change in pull request #1355: Updating the UDP logger to use the batch processor util

Posted by GitBox <gi...@apache.org>.
sshniro commented on a change in pull request #1355: Updating the UDP logger to use the batch processor util
URL: https://github.com/apache/incubator-apisix/pull/1355#discussion_r402149113
 
 

 ##########
 File path: lua/apisix/plugins/udp-logger.lua
 ##########
 @@ -46,38 +51,85 @@ function _M.check_schema(conf)
     return core.schema.check(schema, conf)
 end
 
-local function log(premature, conf, log_message)
-    if premature then
-        return
-    end
-
+local function send_udp_data(conf, log_message)
+    local err_msg
+    local res = true
     local sock = udp()
-    sock:settimeout(conf.timeout)
-
+    sock:settimeout(conf.timeout * 1000)
     local ok, err = sock:setpeername(conf.host, conf.port)
+
     if not ok then
-        core.log.error("failed to connect to UDP server: host[",
-                       conf.host, "] port[", conf.port, "] ", err)
-        return
+        return nil, "failed to connect to UDP server: host[" .. conf.host
+                    .. "] port[" .. tostring(conf.port) .. "] err: " .. err
     end
 
     ok, err = sock:send(log_message)
     if not ok then
-        core.log.error("failed to send data to UDP server: host[",
-                       conf.host, "] port[", conf.port, "] ", err)
+        res = false
+        err_msg = "failed to send data to UDP server: host[" .. conf.host
+                  .. "] port[" .. tostring(conf.port) .. "] err:" .. err
     end
 
     ok, err = sock:close()
     if not ok then
         core.log.error("failed to close the UDP connection, host[",
-                       conf.host, "] port[", conf.port, "] ", err)
+                        conf.host, "] port[", conf.port, "] ", err)
     end
+
+    return res, err_msg
 end
 
 
 function _M.log(conf)
-    return timer_at(0, log, conf, core.json.encode(log_util.get_full_log(ngx)))
+    local entry = log_util.get_full_log(ngx)
+
+    if not entry.route_id then
+        core.log.error("failed to obtain the route id for tcp logger")
+        return
+    end
+
+    local log_buffer = buffers[entry.route_id]
+
+    if log_buffer then
+        log_buffer:push(entry)
+        return
+    end
+
+    -- Generate a function to be executed by the batch processor
+    local func = function(entries, batch_max_size)
+        local data, err
+        if batch_max_size == 1 then
+            data, err = core.json.encode(entries[1]) -- encode as single {}
+        else
+            data, err = core.json.encode(entries) -- encode as array [{}]
+        end
+
+        if not data then
+            core.log.error('error occurred while encoding the token: ', err)
+        end
+
+        return send_udp_data(conf, data)
 
 Review comment:
   Shall put a nil check in the batch processor when pushing the entry?
   
   https://github.com/apache/incubator-apisix/blob/69d2039520d128922c95e1c7b5d19f7fecd4b6b5/apisix/utils/batch-processor.lua#L136

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] membphis commented on a change in pull request #1355: Updating the UDP logger to use the batch processor util

Posted by GitBox <gi...@apache.org>.
membphis commented on a change in pull request #1355: Updating the UDP logger to use the batch processor util
URL: https://github.com/apache/incubator-apisix/pull/1355#discussion_r401363228
 
 

 ##########
 File path: lua/apisix/plugins/udp-logger.lua
 ##########
 @@ -46,38 +51,85 @@ function _M.check_schema(conf)
     return core.schema.check(schema, conf)
 end
 
-local function log(premature, conf, log_message)
-    if premature then
-        return
-    end
-
+local function send_udp_data(conf, log_message)
+    local err_msg
+    local res = true
     local sock = udp()
-    sock:settimeout(conf.timeout)
-
+    sock:settimeout(conf.timeout * 1000)
     local ok, err = sock:setpeername(conf.host, conf.port)
+
     if not ok then
-        core.log.error("failed to connect to UDP server: host[",
-                       conf.host, "] port[", conf.port, "] ", err)
-        return
+        return nil, "failed to connect to UDP server: host[" .. conf.host
+                    .. "] port[" .. tostring(conf.port) .. "] err: " .. err
     end
 
     ok, err = sock:send(log_message)
     if not ok then
-        core.log.error("failed to send data to UDP server: host[",
-                       conf.host, "] port[", conf.port, "] ", err)
+        res = false
+        err_msg = "failed to send data to UDP server: host[" .. conf.host
+                  .. "] port[" .. tostring(conf.port) .. "] err:" .. err
     end
 
     ok, err = sock:close()
     if not ok then
         core.log.error("failed to close the UDP connection, host[",
-                       conf.host, "] port[", conf.port, "] ", err)
+                        conf.host, "] port[", conf.port, "] ", err)
     end
+
+    return res, err_msg
 end
 
 
 function _M.log(conf)
-    return timer_at(0, log, conf, core.json.encode(log_util.get_full_log(ngx)))
+    local entry = log_util.get_full_log(ngx)
+
+    if not entry.route_id then
+        core.log.error("failed to obtain the route id for tcp logger")
+        return
+    end
+
+    local log_buffer = buffers[entry.route_id]
+
+    if log_buffer then
+        log_buffer:push(entry)
+        return
+    end
+
+    -- Generate a function to be executed by the batch processor
+    local func = function(entries, batch_max_size)
 
 Review comment:
   this function can be a top-level local function

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] sshniro commented on a change in pull request #1355: Updating the UDP logger to use the batch processor util

Posted by GitBox <gi...@apache.org>.
sshniro commented on a change in pull request #1355: Updating the UDP logger to use the batch processor util
URL: https://github.com/apache/incubator-apisix/pull/1355#discussion_r402149234
 
 

 ##########
 File path: doc/plugins/udp-logger.md
 ##########
 @@ -33,15 +33,21 @@ This will provide the ability to send Log data requests as JSON objects to Monit
 
 ## Attributes
 
-|Name          |Requirement  |Description|
-|---------     |--------|-----------|
-| host |required| IP address or the Hostname of the UDP server.|
-| port |required| Target upstream port.|
-| timeout |optional|Timeout for the upstream to send data.|
+|Name           |Requirement    |Description|
+|---------      |--------       |-----------|
+|host           |required       | IP address or the Hostname of the UDP server.|
+|port           |required       | Target upstream port.|
+|timeout        |optional       |Timeout for the upstream to send data.|
+|name           |optional       |A unique identifier to identity the batch processor|
+|batch_max_size |optional       |Max size of each batch, default is 1000|
+|inactive_timeout|optional      |Maximum age in seconds when the buffer will be flushed if inactive, default is 5s|
+|buffer_duration|optional       |Maximum age in seconds of the oldest entry in a batch before the batch must be processed, default is 60|
+|max_retry_count|optional       |Maximum number of retries before removing from the processing pipe line; default is zero|
+|retry_delay    |optional       |Number of seconds the process execution should be delayed if the execution fails; default is 1|
 
 Review comment:
   Okay

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] sshniro commented on issue #1355: Updating the UDP logger to use the batch processor util

Posted by GitBox <gi...@apache.org>.
sshniro commented on issue #1355: Updating the UDP logger to use the batch processor util
URL: https://github.com/apache/incubator-apisix/pull/1355#issuecomment-608122373
 
 
   @membphis rebased.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] sshniro commented on issue #1355: Updating the UDP logger to use the batch processor util

Posted by GitBox <gi...@apache.org>.
sshniro commented on issue #1355: Updating the UDP logger to use the batch processor util
URL: https://github.com/apache/incubator-apisix/pull/1355#issuecomment-606423207
 
 
   @membphis The following PR is also updated with the review comments of the tcp logger

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] sshniro commented on a change in pull request #1355: Updating the UDP logger to use the batch processor util

Posted by GitBox <gi...@apache.org>.
sshniro commented on a change in pull request #1355: Updating the UDP logger to use the batch processor util
URL: https://github.com/apache/incubator-apisix/pull/1355#discussion_r402146777
 
 

 ##########
 File path: lua/apisix/plugins/udp-logger.lua
 ##########
 @@ -46,38 +51,85 @@ function _M.check_schema(conf)
     return core.schema.check(schema, conf)
 end
 
-local function log(premature, conf, log_message)
-    if premature then
-        return
-    end
-
+local function send_udp_data(conf, log_message)
+    local err_msg
+    local res = true
     local sock = udp()
-    sock:settimeout(conf.timeout)
-
+    sock:settimeout(conf.timeout * 1000)
     local ok, err = sock:setpeername(conf.host, conf.port)
+
     if not ok then
-        core.log.error("failed to connect to UDP server: host[",
-                       conf.host, "] port[", conf.port, "] ", err)
-        return
+        return nil, "failed to connect to UDP server: host[" .. conf.host
+                    .. "] port[" .. tostring(conf.port) .. "] err: " .. err
     end
 
     ok, err = sock:send(log_message)
     if not ok then
-        core.log.error("failed to send data to UDP server: host[",
-                       conf.host, "] port[", conf.port, "] ", err)
+        res = false
+        err_msg = "failed to send data to UDP server: host[" .. conf.host
+                  .. "] port[" .. tostring(conf.port) .. "] err:" .. err
     end
 
     ok, err = sock:close()
     if not ok then
         core.log.error("failed to close the UDP connection, host[",
-                       conf.host, "] port[", conf.port, "] ", err)
+                        conf.host, "] port[", conf.port, "] ", err)
     end
+
+    return res, err_msg
 end
 
 
 function _M.log(conf)
-    return timer_at(0, log, conf, core.json.encode(log_util.get_full_log(ngx)))
+    local entry = log_util.get_full_log(ngx)
+
+    if not entry.route_id then
+        core.log.error("failed to obtain the route id for tcp logger")
+        return
+    end
+
+    local log_buffer = buffers[entry.route_id]
+
+    if log_buffer then
+        log_buffer:push(entry)
+        return
+    end
+
+    -- Generate a function to be executed by the batch processor
+    local func = function(entries, batch_max_size)
+        local data, err
+        if batch_max_size == 1 then
+            data, err = core.json.encode(entries[1]) -- encode as single {}
+        else
+            data, err = core.json.encode(entries) -- encode as array [{}]
+        end
+
+        if not data then
+            core.log.error('error occurred while encoding the token: ', err)
+        end
+
+        return send_udp_data(conf, data)
+    end
+
+    local config = {
+        name = conf.name,
+        retry_delay = conf.retry_delay,
+        batch_max_size = conf.batch_max_size,
+        max_retry_count = conf.max_retry_count,
+        buffer_duration = conf.buffer_duration,
+        inactive_timeout = conf.inactive_timeout,
+    }
+
+    local err
+    log_buffer, err = batch_processor:new(func, config)
+
+    if not log_buffer then
+        core.log.error("error when creating the batch processor: ", err)
+        return
+    end
+
+    buffers[entry.route_id] = log_buffer
 
 Review comment:
   Hi @membphis any suggestion on how to handle this? periodically check the buffer and remove stale objects?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] sshniro commented on a change in pull request #1355: Updating the UDP logger to use the batch processor util

Posted by GitBox <gi...@apache.org>.
sshniro commented on a change in pull request #1355: Updating the UDP logger to use the batch processor util
URL: https://github.com/apache/incubator-apisix/pull/1355#discussion_r402148439
 
 

 ##########
 File path: lua/apisix/plugins/udp-logger.lua
 ##########
 @@ -46,38 +51,85 @@ function _M.check_schema(conf)
     return core.schema.check(schema, conf)
 end
 
-local function log(premature, conf, log_message)
-    if premature then
-        return
-    end
-
+local function send_udp_data(conf, log_message)
+    local err_msg
+    local res = true
     local sock = udp()
-    sock:settimeout(conf.timeout)
-
+    sock:settimeout(conf.timeout * 1000)
     local ok, err = sock:setpeername(conf.host, conf.port)
+
     if not ok then
-        core.log.error("failed to connect to UDP server: host[",
-                       conf.host, "] port[", conf.port, "] ", err)
-        return
+        return nil, "failed to connect to UDP server: host[" .. conf.host
+                    .. "] port[" .. tostring(conf.port) .. "] err: " .. err
     end
 
     ok, err = sock:send(log_message)
     if not ok then
-        core.log.error("failed to send data to UDP server: host[",
-                       conf.host, "] port[", conf.port, "] ", err)
+        res = false
+        err_msg = "failed to send data to UDP server: host[" .. conf.host
+                  .. "] port[" .. tostring(conf.port) .. "] err:" .. err
     end
 
     ok, err = sock:close()
     if not ok then
         core.log.error("failed to close the UDP connection, host[",
-                       conf.host, "] port[", conf.port, "] ", err)
+                        conf.host, "] port[", conf.port, "] ", err)
     end
+
+    return res, err_msg
 end
 
 
 function _M.log(conf)
-    return timer_at(0, log, conf, core.json.encode(log_util.get_full_log(ngx)))
+    local entry = log_util.get_full_log(ngx)
+
+    if not entry.route_id then
+        core.log.error("failed to obtain the route id for tcp logger")
+        return
+    end
+
+    local log_buffer = buffers[entry.route_id]
+
+    if log_buffer then
+        log_buffer:push(entry)
+        return
+    end
+
+    -- Generate a function to be executed by the batch processor
+    local func = function(entries, batch_max_size)
 
 Review comment:
   If we move it to a top-level function then we also need to persist the entire `conf` object coming from the M.log function. As it is used by the `send_udp_data` function.
   
   Now we are using the conf object via the closure. Shall I move it to top then?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] membphis commented on issue #1355: Updating the UDP logger to use the batch processor util

Posted by GitBox <gi...@apache.org>.
membphis commented on issue #1355: Updating the UDP logger to use the batch processor util
URL: https://github.com/apache/incubator-apisix/pull/1355#issuecomment-607040726
 
 
   @sshniro please rebase your branch

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] moonming merged pull request #1355: Updating the UDP logger to use the batch processor util

Posted by GitBox <gi...@apache.org>.
moonming merged pull request #1355: Updating the UDP logger to use the batch processor util
URL: https://github.com/apache/incubator-apisix/pull/1355
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] membphis commented on a change in pull request #1355: Updating the UDP logger to use the batch processor util

Posted by GitBox <gi...@apache.org>.
membphis commented on a change in pull request #1355: Updating the UDP logger to use the batch processor util
URL: https://github.com/apache/incubator-apisix/pull/1355#discussion_r401362947
 
 

 ##########
 File path: lua/apisix/plugins/udp-logger.lua
 ##########
 @@ -46,38 +51,85 @@ function _M.check_schema(conf)
     return core.schema.check(schema, conf)
 end
 
-local function log(premature, conf, log_message)
-    if premature then
-        return
-    end
-
+local function send_udp_data(conf, log_message)
+    local err_msg
+    local res = true
     local sock = udp()
-    sock:settimeout(conf.timeout)
-
+    sock:settimeout(conf.timeout * 1000)
     local ok, err = sock:setpeername(conf.host, conf.port)
+
     if not ok then
-        core.log.error("failed to connect to UDP server: host[",
-                       conf.host, "] port[", conf.port, "] ", err)
-        return
+        return nil, "failed to connect to UDP server: host[" .. conf.host
+                    .. "] port[" .. tostring(conf.port) .. "] err: " .. err
     end
 
     ok, err = sock:send(log_message)
     if not ok then
-        core.log.error("failed to send data to UDP server: host[",
-                       conf.host, "] port[", conf.port, "] ", err)
+        res = false
+        err_msg = "failed to send data to UDP server: host[" .. conf.host
+                  .. "] port[" .. tostring(conf.port) .. "] err:" .. err
     end
 
     ok, err = sock:close()
     if not ok then
         core.log.error("failed to close the UDP connection, host[",
-                       conf.host, "] port[", conf.port, "] ", err)
+                        conf.host, "] port[", conf.port, "] ", err)
     end
+
+    return res, err_msg
 end
 
 
 function _M.log(conf)
-    return timer_at(0, log, conf, core.json.encode(log_util.get_full_log(ngx)))
+    local entry = log_util.get_full_log(ngx)
+
+    if not entry.route_id then
+        core.log.error("failed to obtain the route id for tcp logger")
+        return
+    end
+
+    local log_buffer = buffers[entry.route_id]
+
+    if log_buffer then
+        log_buffer:push(entry)
+        return
+    end
+
+    -- Generate a function to be executed by the batch processor
+    local func = function(entries, batch_max_size)
+        local data, err
+        if batch_max_size == 1 then
+            data, err = core.json.encode(entries[1]) -- encode as single {}
+        else
+            data, err = core.json.encode(entries) -- encode as array [{}]
+        end
+
+        if not data then
+            core.log.error('error occurred while encoding the token: ', err)
+        end
+
+        return send_udp_data(conf, data)
 
 Review comment:
   if the `data` is a `nil` value, do we should skip it?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] membphis commented on a change in pull request #1355: Updating the UDP logger to use the batch processor util

Posted by GitBox <gi...@apache.org>.
membphis commented on a change in pull request #1355: Updating the UDP logger to use the batch processor util
URL: https://github.com/apache/incubator-apisix/pull/1355#discussion_r401363026
 
 

 ##########
 File path: lua/apisix/plugins/udp-logger.lua
 ##########
 @@ -46,38 +51,85 @@ function _M.check_schema(conf)
     return core.schema.check(schema, conf)
 end
 
-local function log(premature, conf, log_message)
-    if premature then
-        return
-    end
-
+local function send_udp_data(conf, log_message)
+    local err_msg
+    local res = true
     local sock = udp()
-    sock:settimeout(conf.timeout)
-
+    sock:settimeout(conf.timeout * 1000)
     local ok, err = sock:setpeername(conf.host, conf.port)
+
     if not ok then
-        core.log.error("failed to connect to UDP server: host[",
-                       conf.host, "] port[", conf.port, "] ", err)
-        return
+        return nil, "failed to connect to UDP server: host[" .. conf.host
+                    .. "] port[" .. tostring(conf.port) .. "] err: " .. err
     end
 
     ok, err = sock:send(log_message)
     if not ok then
-        core.log.error("failed to send data to UDP server: host[",
-                       conf.host, "] port[", conf.port, "] ", err)
+        res = false
+        err_msg = "failed to send data to UDP server: host[" .. conf.host
+                  .. "] port[" .. tostring(conf.port) .. "] err:" .. err
     end
 
     ok, err = sock:close()
     if not ok then
         core.log.error("failed to close the UDP connection, host[",
-                       conf.host, "] port[", conf.port, "] ", err)
+                        conf.host, "] port[", conf.port, "] ", err)
     end
+
+    return res, err_msg
 end
 
 
 function _M.log(conf)
-    return timer_at(0, log, conf, core.json.encode(log_util.get_full_log(ngx)))
+    local entry = log_util.get_full_log(ngx)
+
+    if not entry.route_id then
+        core.log.error("failed to obtain the route id for tcp logger")
+        return
+    end
+
+    local log_buffer = buffers[entry.route_id]
+
+    if log_buffer then
+        log_buffer:push(entry)
+        return
+    end
+
+    -- Generate a function to be executed by the batch processor
+    local func = function(entries, batch_max_size)
+        local data, err
+        if batch_max_size == 1 then
+            data, err = core.json.encode(entries[1]) -- encode as single {}
+        else
+            data, err = core.json.encode(entries) -- encode as array [{}]
+        end
+
+        if not data then
+            core.log.error('error occurred while encoding the token: ', err)
+        end
+
+        return send_udp_data(conf, data)
 
 Review comment:
   it is useless if we send a `nil` value

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] sshniro commented on a change in pull request #1355: Updating the UDP logger to use the batch processor util

Posted by GitBox <gi...@apache.org>.
sshniro commented on a change in pull request #1355: Updating the UDP logger to use the batch processor util
URL: https://github.com/apache/incubator-apisix/pull/1355#discussion_r402630157
 
 

 ##########
 File path: lua/apisix/plugins/udp-logger.lua
 ##########
 @@ -46,38 +51,85 @@ function _M.check_schema(conf)
     return core.schema.check(schema, conf)
 end
 
-local function log(premature, conf, log_message)
-    if premature then
-        return
-    end
-
+local function send_udp_data(conf, log_message)
+    local err_msg
+    local res = true
     local sock = udp()
-    sock:settimeout(conf.timeout)
-
+    sock:settimeout(conf.timeout * 1000)
     local ok, err = sock:setpeername(conf.host, conf.port)
+
     if not ok then
-        core.log.error("failed to connect to UDP server: host[",
-                       conf.host, "] port[", conf.port, "] ", err)
-        return
+        return nil, "failed to connect to UDP server: host[" .. conf.host
+                    .. "] port[" .. tostring(conf.port) .. "] err: " .. err
     end
 
     ok, err = sock:send(log_message)
     if not ok then
-        core.log.error("failed to send data to UDP server: host[",
-                       conf.host, "] port[", conf.port, "] ", err)
+        res = false
+        err_msg = "failed to send data to UDP server: host[" .. conf.host
+                  .. "] port[" .. tostring(conf.port) .. "] err:" .. err
     end
 
     ok, err = sock:close()
     if not ok then
         core.log.error("failed to close the UDP connection, host[",
-                       conf.host, "] port[", conf.port, "] ", err)
+                        conf.host, "] port[", conf.port, "] ", err)
     end
+
+    return res, err_msg
 end
 
 
 function _M.log(conf)
-    return timer_at(0, log, conf, core.json.encode(log_util.get_full_log(ngx)))
+    local entry = log_util.get_full_log(ngx)
+
+    if not entry.route_id then
+        core.log.error("failed to obtain the route id for tcp logger")
+        return
+    end
+
+    local log_buffer = buffers[entry.route_id]
+
+    if log_buffer then
+        log_buffer:push(entry)
+        return
+    end
+
+    -- Generate a function to be executed by the batch processor
+    local func = function(entries, batch_max_size)
+        local data, err
+        if batch_max_size == 1 then
+            data, err = core.json.encode(entries[1]) -- encode as single {}
+        else
+            data, err = core.json.encode(entries) -- encode as array [{}]
+        end
+
+        if not data then
+            core.log.error('error occurred while encoding the token: ', err)
+        end
+
+        return send_udp_data(conf, data)
 
 Review comment:
   Resolved.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] sshniro commented on a change in pull request #1355: Updating the UDP logger to use the batch processor util

Posted by GitBox <gi...@apache.org>.
sshniro commented on a change in pull request #1355: Updating the UDP logger to use the batch processor util
URL: https://github.com/apache/incubator-apisix/pull/1355#discussion_r400666937
 
 

 ##########
 File path: doc/plugins/udp-logger.md
 ##########
 @@ -33,15 +33,21 @@ This will provide the ability to send Log data requests as JSON objects to Monit
 
 ## Attributes
 
-|Name          |Requirement  |Description|
-|---------     |--------|-----------|
-| host |required| IP address or the Hostname of the UDP server.|
-| port |required| Target upstream port.|
-| timeout |optional|Timeout for the upstream to send data.|
+|Name           |Requirement    |Description|
+|---------      |--------       |-----------|
+|host           |required       | IP address or the Hostname of the UDP server.|
+|port           |required       | Target upstream port.|
+|timeout        |optional       |Timeout for the upstream to send data.|
+|name           |optional       |A unique identifier to identity the batch processor|
+|batch_max_size |optional       |Max size of each batch, default is 1000|
+|inactive_timeout|optional      |Maximum age in seconds when the buffer will be flushed if inactive, default is 5s|
+|buffer_duration|optional       |Maximum age in seconds of the oldest entry in a batch before the batch must be processed, default is 60|
+|max_retry_count|optional       |Maximum number of retries before removing from the processing pipe line; default is zero|
+|retry_delay    |optional       |Number of seconds the process execution should be delayed if the execution fails; default is 1|
 
 Review comment:
   Do we need a retry attribute for UDP, because the UDP does not check for acks?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] membphis commented on issue #1355: Updating the UDP logger to use the batch processor util

Posted by GitBox <gi...@apache.org>.
membphis commented on issue #1355: Updating the UDP logger to use the batch processor util
URL: https://github.com/apache/incubator-apisix/pull/1355#issuecomment-608234823
 
 
   I have now restarted Travis's job and failed before.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] sshniro commented on a change in pull request #1355: Updating the UDP logger to use the batch processor util

Posted by GitBox <gi...@apache.org>.
sshniro commented on a change in pull request #1355: Updating the UDP logger to use the batch processor util
URL: https://github.com/apache/incubator-apisix/pull/1355#discussion_r402149113
 
 

 ##########
 File path: lua/apisix/plugins/udp-logger.lua
 ##########
 @@ -46,38 +51,85 @@ function _M.check_schema(conf)
     return core.schema.check(schema, conf)
 end
 
-local function log(premature, conf, log_message)
-    if premature then
-        return
-    end
-
+local function send_udp_data(conf, log_message)
+    local err_msg
+    local res = true
     local sock = udp()
-    sock:settimeout(conf.timeout)
-
+    sock:settimeout(conf.timeout * 1000)
     local ok, err = sock:setpeername(conf.host, conf.port)
+
     if not ok then
-        core.log.error("failed to connect to UDP server: host[",
-                       conf.host, "] port[", conf.port, "] ", err)
-        return
+        return nil, "failed to connect to UDP server: host[" .. conf.host
+                    .. "] port[" .. tostring(conf.port) .. "] err: " .. err
     end
 
     ok, err = sock:send(log_message)
     if not ok then
-        core.log.error("failed to send data to UDP server: host[",
-                       conf.host, "] port[", conf.port, "] ", err)
+        res = false
+        err_msg = "failed to send data to UDP server: host[" .. conf.host
+                  .. "] port[" .. tostring(conf.port) .. "] err:" .. err
     end
 
     ok, err = sock:close()
     if not ok then
         core.log.error("failed to close the UDP connection, host[",
-                       conf.host, "] port[", conf.port, "] ", err)
+                        conf.host, "] port[", conf.port, "] ", err)
     end
+
+    return res, err_msg
 end
 
 
 function _M.log(conf)
-    return timer_at(0, log, conf, core.json.encode(log_util.get_full_log(ngx)))
+    local entry = log_util.get_full_log(ngx)
+
+    if not entry.route_id then
+        core.log.error("failed to obtain the route id for tcp logger")
+        return
+    end
+
+    local log_buffer = buffers[entry.route_id]
+
+    if log_buffer then
+        log_buffer:push(entry)
+        return
+    end
+
+    -- Generate a function to be executed by the batch processor
+    local func = function(entries, batch_max_size)
+        local data, err
+        if batch_max_size == 1 then
+            data, err = core.json.encode(entries[1]) -- encode as single {}
+        else
+            data, err = core.json.encode(entries) -- encode as array [{}]
+        end
+
+        if not data then
+            core.log.error('error occurred while encoding the token: ', err)
+        end
+
+        return send_udp_data(conf, data)
 
 Review comment:
   Shall put a nil check in the batch processor when pushing the entry?
   
   https://github.com/apache/incubator-apisix/blob/69d2039520d128922c95e1c7b5d19f7fecd4b6b5/apisix/utils/batch-processor.lua#L136

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] membphis commented on a change in pull request #1355: Updating the UDP logger to use the batch processor util

Posted by GitBox <gi...@apache.org>.
membphis commented on a change in pull request #1355: Updating the UDP logger to use the batch processor util
URL: https://github.com/apache/incubator-apisix/pull/1355#discussion_r401359249
 
 

 ##########
 File path: doc/plugins/udp-logger.md
 ##########
 @@ -33,15 +33,21 @@ This will provide the ability to send Log data requests as JSON objects to Monit
 
 ## Attributes
 
-|Name          |Requirement  |Description|
-|---------     |--------|-----------|
-| host |required| IP address or the Hostname of the UDP server.|
-| port |required| Target upstream port.|
-| timeout |optional|Timeout for the upstream to send data.|
+|Name           |Requirement    |Description|
+|---------      |--------       |-----------|
+|host           |required       | IP address or the Hostname of the UDP server.|
+|port           |required       | Target upstream port.|
+|timeout        |optional       |Timeout for the upstream to send data.|
+|name           |optional       |A unique identifier to identity the batch processor|
+|batch_max_size |optional       |Max size of each batch, default is 1000|
+|inactive_timeout|optional      |Maximum age in seconds when the buffer will be flushed if inactive, default is 5s|
+|buffer_duration|optional       |Maximum age in seconds of the oldest entry in a batch before the batch must be processed, default is 60|
+|max_retry_count|optional       |Maximum number of retries before removing from the processing pipe line; default is zero|
+|retry_delay    |optional       |Number of seconds the process execution should be delayed if the execution fails; default is 1|
 
 Review comment:
   I don't think the `retry` attribute is needed

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] membphis edited a comment on issue #1355: Updating the UDP logger to use the batch processor util

Posted by GitBox <gi...@apache.org>.
membphis edited a comment on issue #1355: Updating the UDP logger to use the batch processor util
URL: https://github.com/apache/incubator-apisix/pull/1355#issuecomment-608234823
 
 
   I have restarted Travis's job and failed before.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] membphis commented on a change in pull request #1355: Updating the UDP logger to use the batch processor util

Posted by GitBox <gi...@apache.org>.
membphis commented on a change in pull request #1355: Updating the UDP logger to use the batch processor util
URL: https://github.com/apache/incubator-apisix/pull/1355#discussion_r401362090
 
 

 ##########
 File path: lua/apisix/plugins/udp-logger.lua
 ##########
 @@ -46,38 +51,85 @@ function _M.check_schema(conf)
     return core.schema.check(schema, conf)
 end
 
-local function log(premature, conf, log_message)
-    if premature then
-        return
-    end
-
+local function send_udp_data(conf, log_message)
+    local err_msg
+    local res = true
     local sock = udp()
-    sock:settimeout(conf.timeout)
-
+    sock:settimeout(conf.timeout * 1000)
     local ok, err = sock:setpeername(conf.host, conf.port)
+
     if not ok then
-        core.log.error("failed to connect to UDP server: host[",
-                       conf.host, "] port[", conf.port, "] ", err)
-        return
+        return nil, "failed to connect to UDP server: host[" .. conf.host
+                    .. "] port[" .. tostring(conf.port) .. "] err: " .. err
     end
 
     ok, err = sock:send(log_message)
     if not ok then
-        core.log.error("failed to send data to UDP server: host[",
-                       conf.host, "] port[", conf.port, "] ", err)
+        res = false
+        err_msg = "failed to send data to UDP server: host[" .. conf.host
+                  .. "] port[" .. tostring(conf.port) .. "] err:" .. err
     end
 
     ok, err = sock:close()
     if not ok then
         core.log.error("failed to close the UDP connection, host[",
-                       conf.host, "] port[", conf.port, "] ", err)
+                        conf.host, "] port[", conf.port, "] ", err)
     end
+
+    return res, err_msg
 end
 
 
 function _M.log(conf)
-    return timer_at(0, log, conf, core.json.encode(log_util.get_full_log(ngx)))
+    local entry = log_util.get_full_log(ngx)
+
+    if not entry.route_id then
+        core.log.error("failed to obtain the route id for tcp logger")
+        return
+    end
+
+    local log_buffer = buffers[entry.route_id]
+
+    if log_buffer then
+        log_buffer:push(entry)
+        return
+    end
+
+    -- Generate a function to be executed by the batch processor
+    local func = function(entries, batch_max_size)
+        local data, err
+        if batch_max_size == 1 then
+            data, err = core.json.encode(entries[1]) -- encode as single {}
+        else
+            data, err = core.json.encode(entries) -- encode as array [{}]
+        end
+
+        if not data then
+            core.log.error('error occurred while encoding the token: ', err)
+        end
+
+        return send_udp_data(conf, data)
+    end
+
+    local config = {
+        name = conf.name,
+        retry_delay = conf.retry_delay,
+        batch_max_size = conf.batch_max_size,
+        max_retry_count = conf.max_retry_count,
+        buffer_duration = conf.buffer_duration,
+        inactive_timeout = conf.inactive_timeout,
+    }
+
+    local err
+    log_buffer, err = batch_processor:new(func, config)
+
+    if not log_buffer then
+        core.log.error("error when creating the batch processor: ", err)
+        return
+    end
+
+    buffers[entry.route_id] = log_buffer
 
 Review comment:
   We need a way to control the number of objects in `buffers` to avoid memory overflow.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services