You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by GitBox <gi...@apache.org> on 2020/02/12 06:29:34 UTC

[GitHub] [incubator-apisix] sshniro opened a new pull request #1121: Batch processor implementation to aggregate logs in batch

sshniro opened a new pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121
 
 
   Fix for #1076 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] sshniro commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
sshniro commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r380445446
 
 

 ##########
 File path: t/plugin/batch-processor.t
 ##########
 @@ -0,0 +1,367 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+log_level('debug');
+repeat_each(1);
+no_long_string();
+no_root_location();
+run_tests;
+
+__DATA__
+
+=== TEST 1: send invalid arguments for constructor
+--- config
+    location /t {
+        content_by_lua_block {
+            local Batch = require("apisix.plugins.batch-processor")
+            local config = {
+                max_retry_count  = 2,
+                batch_max_size = 1,
+                process_delay  = 0,
+                retry_delay  = 0,
+            }
+            local func_to_send = function(elements)
+                return true
+            end
+            local log_buffer, err = Batch:new("", config)
+
+            if log_buffer then
+                log_buffer:push({hello='world'})
+                ngx.say("done")
+            end
+
+            if not log_buffer then
+                ngx.say("failed")
+            end
+
+        }
+    }
+--- request
+GET /t
+--- response_body
+failed
+--- wait: 0.5
+
+
+
+=== TEST 2: sanity
+--- config
+    location /t {
+        content_by_lua_block {
+            local Batch = require("apisix.plugins.batch-processor")
+            local func_to_send = function(elements)
+                return true
+            end
+
+            local config = {
+                max_retry_count  = 2,
+                batch_max_size = 1,
+                process_delay  = 0,
+                retry_delay  = 0,
+            }
+
+            local log_buffer, err = Batch:new(func_to_send, config)
+
+            if not log_buffer then
+                ngx.say(err)
+            end
+
+            log_buffer:push({hello='world'})
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+done
+--- error_log
+Batch Processor[log buffer] successfully processed the entries
+--- wait: 0.5
+
+
+
+=== TEST 3: batch processor timeout exceeded
+--- config
+    location /t {
+        content_by_lua_block {
+            local Batch = require("apisix.plugins.batch-processor")
+            local config = {
+                max_retry_count  = 2,
+                batch_max_size = 2,
+                process_delay  = 0,
+                retry_delay  = 0,
+                inactive_timeout = 1
+            }
+            local func_to_send = function(elements)
+                return true
+            end
+            local log_buffer, err = Batch:new(func_to_send, config)
+
+            if not log_buffer then
+                ngx.say(err)
+            end
+
+            log_buffer:push({hello='world'})
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+done
+--- error_log
+BatchProcessor[log buffer] buffer duration exceeded, activating buffer flush
+Batch Processor[log buffer] successfully processed the entries
+--- wait: 3
+
+
+
+=== TEST 4: batch processor batch max size exceeded
+--- config
+    location /t {
+        content_by_lua_block {
+            local Batch = require("apisix.plugins.batch-processor")
+            local config = {
+                max_retry_count  = 2,
+                batch_max_size = 2,
+                process_delay  = 0,
+                retry_delay  = 0,
+            }
+            local func_to_send = function(elements)
+                return true
+            end
+            local log_buffer, err = Batch:new(func_to_send, config)
+
+            if not log_buffer then
+                ngx.say(err)
+            end
+
+            log_buffer:push({hello='world'})
+            log_buffer:push({hello='world'})
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+done
+--- no_error_log
+BatchProcessor[log buffer] activating flush due to no activity
+--- error_log
+batch processor[log buffer] batch max size has exceeded
+Batch Processor[log buffer] successfully processed the entries
+--- wait: 0.5
+
+
+
+=== TEST 5: first failed to process and second try success
+--- config
+    location /t {
+        content_by_lua_block {
+            local Batch = require("apisix.plugins.batch-processor")
+            local core = require("apisix.core")
+            local retry = false
+            local config = {
+                max_retry_count  = 2,
+                batch_max_size = 2,
+                process_delay  = 0,
+                retry_delay  = 0,
+            }
+            local func_to_send = function(elements)
+                if not retry then
+                    retry = true
+                    return false
+                end
+                return true
+            end
+            local log_buffer, err = Batch:new(func_to_send, config)
+
+            if not log_buffer then
+                ngx.say(err)
+            end
+
+            log_buffer:push({hello='world'})
+            log_buffer:push({hello='world'})
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+done
+--- error_log
+Batch Processor[log buffer] failed to process entries
+Batch Processor[log buffer] successfully processed the entries
+--- wait: 0.5
+
+
+
+=== TEST 6: Exceeding max retry count
+--- config
+    location /t {
+        content_by_lua_block {
+            local Batch = require("apisix.plugins.batch-processor")
+            local config = {
+                max_retry_count  = 2,
+                batch_max_size = 2,
+                process_delay  = 0,
+                retry_delay  = 0,
+            }
+            local func_to_send = function(elements)
+                return false
+            end
+            local log_buffer, err = Batch:new(func_to_send, config)
+
+            if not log_buffer then
+                ngx.say(err)
+            end
+
+            log_buffer:push({hello='world'})
+            log_buffer:push({hello='world'})
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+done
+--- no_error_log
+BatchProcessor[log buffer] buffer duration exceeded, activating buffer flush
+--- error_log
+Batch Processor[log buffer] failed to process entries
+Batch Processor[log buffer] exceeded the max_retry_count
+--- wait: 0.5
+
+
+
+=== TEST 7: two batches
+--- config
+    location /t {
+        content_by_lua_block {
+            local Batch = require("apisix.plugins.batch-processor")
+            local core = require("apisix.core")
+            local count = 0
+            local config = {
+                max_retry_count  = 2,
+                batch_max_size = 2,
+                process_delay  = 0,
+                retry_delay  = 0,
+            }
+            local func_to_send = function(elements)
+                count = count + 1
+                core.log.info("batch[", count , "] sent")
 
 Review comment:
   Added additional test case to json encode and print the elements.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] sshniro commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
sshniro commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r379939237
 
 

 ##########
 File path: lua/apisix/plugins/batch-processor.lua
 ##########
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+    __index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+    type = "object",
+    properties = {
+        name = {type = "string", default = "log buffer"},
+        max_retry_count = {type = "integer", minimum = 0, default= 0},
+        retry_delay = {type = "integer", minimum = 0, default= 1},
+        buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted
+        inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive
+        batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted
+    }
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+    local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+    if not hdl then
+        core.log.error("failed to create process timer: ", err)
+        return
+    end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+    if premature then
+        return
+    end
+
+    local ok, err = batch_processor.func(batch.entries)
+    if ok then
+        ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name))
+
+    else
+        batch.retry_count = batch.retry_count + 1
+        if batch.retry_count < batch_processor.max_retry_count then
+            core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err)
+            schedule_func_exec(batch_processor, batch_processor.retry_delay, batch)
+        else
+            core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count))
+        end
+    end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
+    if premature then
+        return
+    end
+
+    if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or
+            now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then
+        ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name))
 
 Review comment:
   Thanks. Refactored the debug logs.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r380402546
 
 

 ##########
 File path: lua/apisix/plugins/batch-processor.lua
 ##########
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+    __index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+    type = "object",
+    properties = {
+        name = {type = "string", default = "log buffer"},
+        max_retry_count = {type = "integer", minimum = 0, default= 0},
+        retry_delay = {type = "integer", minimum = 0, default= 1},
+        buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted
+        inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive
+        batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted
+    }
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+    local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+    if not hdl then
+        core.log.error("failed to create process timer: ", err)
+        return
+    end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+    if premature then
+        return
+    end
+
+    local ok, err = batch_processor.func(batch.entries)
+    if ok then
+        ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name))
+
+    else
+        batch.retry_count = batch.retry_count + 1
+        if batch.retry_count < batch_processor.max_retry_count then
+            core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err)
+            schedule_func_exec(batch_processor, batch_processor.retry_delay, batch)
+        else
+            core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count))
+        end
+    end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
+    if premature then
+        return
+    end
+
+    if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or
+            now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then
+        ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name))
+        batch_processor:process_buffer()
+        batch_processor.isTimerRunning = false
+        return
+    end
+
+    -- buffer duration did not exceed or the buffer is active, extending the timer
+    ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", batch_processor.name))
 
 Review comment:
   any news?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] sshniro commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
sshniro commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r380126166
 
 

 ##########
 File path: lua/apisix/plugins/batch-processor.lua
 ##########
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+    __index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+    type = "object",
+    properties = {
+        name = {type = "string", default = "log buffer"},
+        max_retry_count = {type = "integer", minimum = 0, default= 0},
+        retry_delay = {type = "integer", minimum = 0, default= 1},
+        buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted
+        inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive
+        batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted
+    }
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+    local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+    if not hdl then
+        core.log.error("failed to create process timer: ", err)
+        return
+    end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+    if premature then
+        return
+    end
+
+    local ok, err = batch_processor.func(batch.entries)
+    if ok then
+        ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name))
+
+    else
+        batch.retry_count = batch.retry_count + 1
+        if batch.retry_count < batch_processor.max_retry_count then
+            core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err)
+            schedule_func_exec(batch_processor, batch_processor.retry_delay, batch)
+        else
+            core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count))
+        end
+    end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
+    if premature then
+        return
+    end
+
+    if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or
+            now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then
+        ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name))
+        batch_processor:process_buffer()
+        batch_processor.isTimerRunning = false
+        return
+    end
+
+    -- buffer duration did not exceed or the buffer is active, extending the timer
+    ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", batch_processor.name))
+    create_buffer_timer(batch_processor)
+end
+
+
+create_buffer_timer = function(batch_processor)
+    local hdl, err = timer_at(batch_processor.inactive_timeout, flush_buffer, batch_processor)
+    if not hdl then
+        core.log.error("failed to create buffer timer: ", err)
+        return
+    end
+    batch_processor.isTimerRunning = true
+end
+
+
+function Batch_Processor:new(func, config)
+    local ok, err = core.schema.check(schema, config)
+
+    if not ok then
+        return err
+    end
+
+    if not(type(func) == "function") then
+        return nil, "Invalid argument, arg #1 must be a function"
+    end
+
+    local batch_processor = {
+        func = func,
+        buffer_duration = config.buffer_duration,
+        inactive_timeout = config.inactive_timeout,
+        max_retry_count = config.max_retry_count,
+        batch_max_size = config.batch_max_size,
+        retry_delay = config.retry_delay,
+        name = config.name,
+        batch_to_process = {},
+        entry_buffer = { entries = {}, retry_count = 0},
+        isTimerRunning = false,
+        first_entry_t = 0,
+        last_entry_t = 0
+    }
+
+    return setmetatable(batch_processor, Batch_Processor_mt)
+end
+
+
+function Batch_Processor:push(entry)
+    -- if the batch size is one then immediately send for processing
+    if self.batch_max_size == 1 then
+        local batch = { entries = { entry }, retry_count = 0 }
+        schedule_func_exec(self, 0, batch)
+    end
+
+    local entries = self.entry_buffer.entries
+    table.insert(entries, entry)
+
+    if #entries == 1 then
+        self.first_entry_t = now()
+    end
+    self.last_entry_t = now()
+
+    if self.batch_max_size <= #entries then
+        ngx_log(DEBUG, fmt("batch processor[%s] batch max size has exceeded", self.name))
+        self:process_buffer()
+    elseif not self.isTimerRunning then
+        create_buffer_timer(self)
+    end
+
+end
+
+
+function Batch_Processor:process_buffer()
+    if #self.entry_buffer.entries > 0 then
+        ngx_log(DEBUG, fmt("tranferring buffer entries to processing pipe line, buffercount[%d]", #self.entry_buffer.entries))
+        self.batch_to_process[#self.batch_to_process + 1] = self.entry_buffer
+        self.entry_buffer = { entries = {}, retry_count = 0 }
+    end
+
+    if(#self.batch_to_process > 0) then
+        repeat
 
 Review comment:
   Thanks, refactored.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] sshniro commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
sshniro commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r379939571
 
 

 ##########
 File path: lua/apisix/plugins/batch-processor.lua
 ##########
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+    __index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+    type = "object",
+    properties = {
+        name = {type = "string", default = "log buffer"},
+        max_retry_count = {type = "integer", minimum = 0, default= 0},
+        retry_delay = {type = "integer", minimum = 0, default= 1},
+        buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted
+        inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive
+        batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted
+    }
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+    local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+    if not hdl then
+        core.log.error("failed to create process timer: ", err)
+        return
+    end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+    if premature then
+        return
+    end
+
+    local ok, err = batch_processor.func(batch.entries)
+    if ok then
+        ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name))
+
+    else
+        batch.retry_count = batch.retry_count + 1
+        if batch.retry_count < batch_processor.max_retry_count then
+            core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err)
+            schedule_func_exec(batch_processor, batch_processor.retry_delay, batch)
+        else
+            core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count))
+        end
+    end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
+    if premature then
+        return
+    end
+
+    if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or
+            now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then
+        ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name))
+        batch_processor:process_buffer()
+        batch_processor.isTimerRunning = false
+        return
+    end
+
+    -- buffer duration did not exceed or the buffer is active, extending the timer
+    ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", batch_processor.name))
+    create_buffer_timer(batch_processor)
+end
+
+
+create_buffer_timer = function(batch_processor)
 
 Review comment:
   This is used as a forward declaration in the local variable( line 32). Because the create buffer timer is referenced in a previous method (line 93) to delay the timer if there is any activity found in the buffer.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] sshniro commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
sshniro commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r379939571
 
 

 ##########
 File path: lua/apisix/plugins/batch-processor.lua
 ##########
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+    __index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+    type = "object",
+    properties = {
+        name = {type = "string", default = "log buffer"},
+        max_retry_count = {type = "integer", minimum = 0, default= 0},
+        retry_delay = {type = "integer", minimum = 0, default= 1},
+        buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted
+        inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive
+        batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted
+    }
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+    local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+    if not hdl then
+        core.log.error("failed to create process timer: ", err)
+        return
+    end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+    if premature then
+        return
+    end
+
+    local ok, err = batch_processor.func(batch.entries)
+    if ok then
+        ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name))
+
+    else
+        batch.retry_count = batch.retry_count + 1
+        if batch.retry_count < batch_processor.max_retry_count then
+            core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err)
+            schedule_func_exec(batch_processor, batch_processor.retry_delay, batch)
+        else
+            core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count))
+        end
+    end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
+    if premature then
+        return
+    end
+
+    if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or
+            now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then
+        ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name))
+        batch_processor:process_buffer()
+        batch_processor.isTimerRunning = false
+        return
+    end
+
+    -- buffer duration did not exceed or the buffer is active, extending the timer
+    ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", batch_processor.name))
+    create_buffer_timer(batch_processor)
+end
+
+
+create_buffer_timer = function(batch_processor)
 
 Review comment:
   This is used as a forward declaration in the local variable( line 32). Because the create buffer timer is referenced in a previous method (flush_buffer, line 93) to delay the timer if there is any activity found in the buffer.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] sshniro commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
sshniro commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r380445490
 
 

 ##########
 File path: lua/apisix/plugins/batch-processor.lua
 ##########
 @@ -0,0 +1,181 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local ipairs = ipairs
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+    __index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+    type = "object",
+    properties = {
+        name = {type = "string", default = "log buffer"},
+        max_retry_count = {type = "integer", minimum = 0, default= 0},
+        retry_delay = {type = "integer", minimum = 0, default= 1},
+        buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted
+        inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive
+        batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted
+    }
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+    local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+    if not hdl then
+        core.log.error("failed to create process timer: ", err)
+        return
+    end
+end
+
+
+function execute_func(premature, batch_processor, batch)
+    if premature then
+        return
+    end
+
+    local ok, err = batch_processor.func(batch.entries)
+    if not ok then
+        batch.retry_count = batch.retry_count + 1
+        if batch.retry_count < batch_processor.max_retry_count then
+            core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err)
+            schedule_func_exec(batch_processor, batch_processor.retry_delay, batch)
+        else
+            core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries",
+                batch_processor.name, batch.retry_count))
+        end
+        return
+    end
+
+    core.log.debug(fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name))
+end
+
+
+local function flush_buffer(premature, batch_processor)
+    if premature then
+        return
+    end
+
+    if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or
+            now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then
+        core.log.debug(fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush",
+            batch_processor.name))
+        batch_processor:process_buffer()
+        batch_processor.isTimerRunning = false
+        return
+    end
+
+    -- buffer duration did not exceed or the buffer is active, extending the timer
+    ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", batch_processor.name))
+    create_buffer_timer(batch_processor)
+end
+
+
+function create_buffer_timer(batch_processor)
+    local hdl, err = timer_at(batch_processor.inactive_timeout, flush_buffer, batch_processor)
+    if not hdl then
+        core.log.error("failed to create buffer timer: ", err)
+        return
+    end
+    batch_processor.isTimerRunning = true
+end
+
+
+function Batch_Processor:new(func, config)
+    local ok, err = core.schema.check(schema, config)
+
 
 Review comment:
   Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] sshniro commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
sshniro commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r378760095
 
 

 ##########
 File path: lua/apisix/plugins/batch-processor.lua
 ##########
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+    __index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+    type = "object",
+    properties = {
+        name = {type = "string", default = "log buffer"},
+        max_retry_count = {type = "integer", minimum = 0, default= 0},
+        retry_delay = {type = "integer", minimum = 0, default= 1},
+        buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted
+        inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive
+        batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted
+    }
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+    local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+    if not hdl then
+        core.log.error("failed to create process timer: ", err)
+        return
+    end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+    if premature then
+        return
+    end
+
+    local ok, err = batch_processor.func(batch.entries)
+    if ok then
+        ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name))
+
+    else
+        batch.retry_count = batch.retry_count + 1
+        if batch.retry_count < batch_processor.max_retry_count then
+            core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err)
+            schedule_func_exec(batch_processor, batch_processor.retry_delay, batch)
+        else
+            core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count))
+        end
+    end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
+    if premature then
+        return
+    end
+
+    if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or
+            now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then
+        ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name))
 
 Review comment:
   Hi, @membphis I've tried the `core.log.debug` option but the debug logs do not appear in the `t/servroot/logs/error.log` file and only `ngx_log(DEBUG,"")` works.
   
   Do I have to enable any parameter to do this?
   The `log_level('debug');` option is also set in the test file.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r379944773
 
 

 ##########
 File path: lua/apisix/plugins/batch-processor.lua
 ##########
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+    __index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+    type = "object",
+    properties = {
+        name = {type = "string", default = "log buffer"},
+        max_retry_count = {type = "integer", minimum = 0, default= 0},
+        retry_delay = {type = "integer", minimum = 0, default= 1},
+        buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted
+        inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive
+        batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted
+    }
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+    local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+    if not hdl then
+        core.log.error("failed to create process timer: ", err)
+        return
+    end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+    if premature then
+        return
+    end
+
+    local ok, err = batch_processor.func(batch.entries)
+    if ok then
+        ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name))
+
+    else
+        batch.retry_count = batch.retry_count + 1
+        if batch.retry_count < batch_processor.max_retry_count then
+            core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err)
+            schedule_func_exec(batch_processor, batch_processor.retry_delay, batch)
+        else
+            core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count))
+        end
+    end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
+    if premature then
+        return
+    end
+
+    if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or
+            now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then
+        ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name))
+        batch_processor:process_buffer()
+        batch_processor.isTimerRunning = false
+        return
+    end
+
+    -- buffer duration did not exceed or the buffer is active, extending the timer
+    ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", batch_processor.name))
+    create_buffer_timer(batch_processor)
+end
+
+
+create_buffer_timer = function(batch_processor)
 
 Review comment:
   good style for this case:  `function create_buffer_timer(batch_processor)`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r379900594
 
 

 ##########
 File path: lua/apisix/plugins/batch-processor.lua
 ##########
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+    __index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+    type = "object",
+    properties = {
+        name = {type = "string", default = "log buffer"},
+        max_retry_count = {type = "integer", minimum = 0, default= 0},
+        retry_delay = {type = "integer", minimum = 0, default= 1},
+        buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted
+        inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive
+        batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted
+    }
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+    local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+    if not hdl then
+        core.log.error("failed to create process timer: ", err)
+        return
+    end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+    if premature then
+        return
+    end
+
+    local ok, err = batch_processor.func(batch.entries)
+    if ok then
+        ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name))
+
+    else
+        batch.retry_count = batch.retry_count + 1
+        if batch.retry_count < batch_processor.max_retry_count then
+            core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err)
+            schedule_func_exec(batch_processor, batch_processor.retry_delay, batch)
+        else
+            core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count))
+        end
+    end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
+    if premature then
+        return
+    end
+
+    if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or
+            now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then
+        ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name))
 
 Review comment:
   @sshniro the #1124 has been merged, you can continue your job now ^_^

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] sshniro commented on issue #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
sshniro commented on issue #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#issuecomment-588053122
 
 
   Hi @membphis, all the review comments have been fixed now.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] moonming closed pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
moonming closed pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] sshniro commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
sshniro commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r380445570
 
 

 ##########
 File path: lua/apisix/plugins/batch-processor.lua
 ##########
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+    __index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+    type = "object",
+    properties = {
+        name = {type = "string", default = "log buffer"},
+        max_retry_count = {type = "integer", minimum = 0, default= 0},
+        retry_delay = {type = "integer", minimum = 0, default= 1},
+        buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted
+        inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive
+        batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted
+    }
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+    local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+    if not hdl then
+        core.log.error("failed to create process timer: ", err)
+        return
+    end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+    if premature then
+        return
+    end
+
+    local ok, err = batch_processor.func(batch.entries)
+    if ok then
+        ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name))
+
+    else
+        batch.retry_count = batch.retry_count + 1
+        if batch.retry_count < batch_processor.max_retry_count then
+            core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err)
+            schedule_func_exec(batch_processor, batch_processor.retry_delay, batch)
+        else
+            core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count))
+        end
+    end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
+    if premature then
+        return
+    end
+
+    if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or
+            now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then
+        ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name))
+        batch_processor:process_buffer()
+        batch_processor.isTimerRunning = false
+        return
+    end
+
+    -- buffer duration did not exceed or the buffer is active, extending the timer
+    ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", batch_processor.name))
 
 Review comment:
   Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] sshniro commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
sshniro commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r380125618
 
 

 ##########
 File path: lua/apisix/plugins/batch-processor.lua
 ##########
 @@ -0,0 +1,180 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+    __index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+    type = "object",
+    properties = {
+        name = {type = "string", default = "log buffer"},
+        max_retry_count = {type = "integer", minimum = 0, default= 0},
+        retry_delay = {type = "integer", minimum = 0, default= 1},
+        buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted
+        inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive
+        batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted
+    }
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+    local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+    if not hdl then
+        core.log.error("failed to create process timer: ", err)
+        return
+    end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+    if premature then
+        return
+    end
+
+    local ok, err = batch_processor.func(batch.entries)
+    if ok then
 
 Review comment:
   Refactored accordingly.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r378772964
 
 

 ##########
 File path: lua/apisix/plugins/batch-processor.lua
 ##########
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+    __index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+    type = "object",
+    properties = {
+        name = {type = "string", default = "log buffer"},
+        max_retry_count = {type = "integer", minimum = 0, default= 0},
+        retry_delay = {type = "integer", minimum = 0, default= 1},
+        buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted
+        inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive
+        batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted
+    }
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+    local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+    if not hdl then
+        core.log.error("failed to create process timer: ", err)
+        return
+    end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+    if premature then
+        return
+    end
+
+    local ok, err = batch_processor.func(batch.entries)
+    if ok then
+        ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name))
+
+    else
+        batch.retry_count = batch.retry_count + 1
+        if batch.retry_count < batch_processor.max_retry_count then
+            core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err)
+            schedule_func_exec(batch_processor, batch_processor.retry_delay, batch)
+        else
+            core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count))
+        end
+    end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
+    if premature then
+        return
+    end
+
+    if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or
+            now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then
+        ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name))
 
 Review comment:
   that is a bug, I will submit a new PR to fix this bug later

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] sshniro commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
sshniro commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r379939191
 
 

 ##########
 File path: lua/apisix/plugins/batch-processor.lua
 ##########
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+    __index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+    type = "object",
+    properties = {
+        name = {type = "string", default = "log buffer"},
+        max_retry_count = {type = "integer", minimum = 0, default= 0},
+        retry_delay = {type = "integer", minimum = 0, default= 1},
+        buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted
+        inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive
+        batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted
+    }
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+    local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+    if not hdl then
+        core.log.error("failed to create process timer: ", err)
+        return
+    end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+    if premature then
+        return
+    end
+
+    local ok, err = batch_processor.func(batch.entries)
+    if ok then
+        ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name))
+
+    else
+        batch.retry_count = batch.retry_count + 1
+        if batch.retry_count < batch_processor.max_retry_count then
+            core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err)
+            schedule_func_exec(batch_processor, batch_processor.retry_delay, batch)
+        else
+            core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count))
+        end
+    end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
+    if premature then
+        return
+    end
+
+    if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or
+            now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then
+        ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name))
+        batch_processor:process_buffer()
+        batch_processor.isTimerRunning = false
+        return
+    end
+
+    -- buffer duration did not exceed or the buffer is active, extending the timer
+    ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", batch_processor.name))
+    create_buffer_timer(batch_processor)
+end
+
+
+create_buffer_timer = function(batch_processor)
+    local hdl, err = timer_at(batch_processor.inactive_timeout, flush_buffer, batch_processor)
+    if not hdl then
+        core.log.error("failed to create buffer timer: ", err)
+        return
+    end
+    batch_processor.isTimerRunning = true
+end
+
+
+function Batch_Processor:new(func, config)
+    local ok, err = core.schema.check(schema, config)
+
+    if not ok then
+        return err
+    end
+
+    if not(type(func) == "function") then
+        return nil, "Invalid argument, arg #1 must be a function"
+    end
+
+    local batch_processor = {
+        func = func,
+        buffer_duration = config.buffer_duration,
+        inactive_timeout = config.inactive_timeout,
+        max_retry_count = config.max_retry_count,
+        batch_max_size = config.batch_max_size,
+        retry_delay = config.retry_delay,
+        name = config.name,
+        batch_to_process = {},
+        entry_buffer = { entries = {}, retry_count = 0},
+        isTimerRunning = false,
+        first_entry_t = 0,
+        last_entry_t = 0
+    }
+
+    return setmetatable(batch_processor, Batch_Processor_mt)
+end
+
+
+function Batch_Processor:push(entry)
+    -- if the batch size is one then immediately send for processing
+    if self.batch_max_size == 1 then
+        local batch = { entries = { entry }, retry_count = 0 }
+        schedule_func_exec(self, 0, batch)
+    end
+
+    local entries = self.entry_buffer.entries
+    table.insert(entries, entry)
+
+    if #entries == 1 then
+        self.first_entry_t = now()
+    end
+    self.last_entry_t = now()
+
+    if self.batch_max_size <= #entries then
+        ngx_log(DEBUG, fmt("batch processor[%s] batch max size has exceeded", self.name))
+        self:process_buffer()
+    elseif not self.isTimerRunning then
 
 Review comment:
   Reformatted the code.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] membphis commented on issue #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
membphis commented on issue #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#issuecomment-588184428
 
 
   @sshniro please rebase your branch, I updated the tools about checking code style.
   
   I think the CI will fail after this PR was merged to `master` branch.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r378638012
 
 

 ##########
 File path: lua/apisix/plugins/batch-processor.lua
 ##########
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+    __index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+    type = "object",
+    properties = {
+        name = {type = "string", default = "log buffer"},
+        max_retry_count = {type = "integer", minimum = 0, default= 0},
+        retry_delay = {type = "integer", minimum = 0, default= 1},
+        buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted
+        inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive
+        batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted
+    }
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+    local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+    if not hdl then
+        core.log.error("failed to create process timer: ", err)
+        return
+    end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+    if premature then
+        return
+    end
+
+    local ok, err = batch_processor.func(batch.entries)
+    if ok then
+        ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name))
+
+    else
+        batch.retry_count = batch.retry_count + 1
+        if batch.retry_count < batch_processor.max_retry_count then
+            core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err)
+            schedule_func_exec(batch_processor, batch_processor.retry_delay, batch)
+        else
+            core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count))
+        end
+    end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
+    if premature then
+        return
+    end
+
+    if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or
+            now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then
+        ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name))
+        batch_processor:process_buffer()
+        batch_processor.isTimerRunning = false
+        return
+    end
+
+    -- buffer duration did not exceed or the buffer is active, extending the timer
+    ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", batch_processor.name))
+    create_buffer_timer(batch_processor)
+end
+
+
+create_buffer_timer = function(batch_processor)
 
 Review comment:
   code style: `local function create_buffer_timer(....)`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r378644301
 
 

 ##########
 File path: lua/apisix/plugins/batch-processor.lua
 ##########
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+    __index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+    type = "object",
+    properties = {
+        name = {type = "string", default = "log buffer"},
+        max_retry_count = {type = "integer", minimum = 0, default= 0},
+        retry_delay = {type = "integer", minimum = 0, default= 1},
+        buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted
+        inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive
+        batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted
+    }
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+    local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+    if not hdl then
+        core.log.error("failed to create process timer: ", err)
+        return
+    end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+    if premature then
+        return
+    end
+
+    local ok, err = batch_processor.func(batch.entries)
+    if ok then
+        ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name))
+
+    else
+        batch.retry_count = batch.retry_count + 1
+        if batch.retry_count < batch_processor.max_retry_count then
+            core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err)
+            schedule_func_exec(batch_processor, batch_processor.retry_delay, batch)
+        else
+            core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count))
+        end
+    end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
+    if premature then
+        return
+    end
+
+    if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or
+            now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then
+        ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name))
+        batch_processor:process_buffer()
+        batch_processor.isTimerRunning = false
+        return
+    end
+
+    -- buffer duration did not exceed or the buffer is active, extending the timer
+    ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", batch_processor.name))
+    create_buffer_timer(batch_processor)
+end
+
+
+create_buffer_timer = function(batch_processor)
+    local hdl, err = timer_at(batch_processor.inactive_timeout, flush_buffer, batch_processor)
+    if not hdl then
+        core.log.error("failed to create buffer timer: ", err)
+        return
+    end
+    batch_processor.isTimerRunning = true
+end
+
+
+function Batch_Processor:new(func, config)
+    local ok, err = core.schema.check(schema, config)
+
+    if not ok then
+        return err
+    end
+
+    if not(type(func) == "function") then
+        return nil, "Invalid argument, arg #1 must be a function"
+    end
+
+    local batch_processor = {
+        func = func,
+        buffer_duration = config.buffer_duration,
+        inactive_timeout = config.inactive_timeout,
+        max_retry_count = config.max_retry_count,
+        batch_max_size = config.batch_max_size,
+        retry_delay = config.retry_delay,
+        name = config.name,
+        batch_to_process = {},
+        entry_buffer = { entries = {}, retry_count = 0},
+        isTimerRunning = false,
+        first_entry_t = 0,
+        last_entry_t = 0
+    }
+
+    return setmetatable(batch_processor, Batch_Processor_mt)
+end
+
+
+function Batch_Processor:push(entry)
+    -- if the batch size is one then immediately send for processing
+    if self.batch_max_size == 1 then
+        local batch = { entries = { entry }, retry_count = 0 }
+        schedule_func_exec(self, 0, batch)
+    end
+
+    local entries = self.entry_buffer.entries
+    table.insert(entries, entry)
+
+    if #entries == 1 then
+        self.first_entry_t = now()
+    end
+    self.last_entry_t = now()
+
+    if self.batch_max_size <= #entries then
+        ngx_log(DEBUG, fmt("batch processor[%s] batch max size has exceeded", self.name))
+        self:process_buffer()
+    elseif not self.isTimerRunning then
+        create_buffer_timer(self)
+    end
+
+end
+
+
+function Batch_Processor:process_buffer()
+    if #self.entry_buffer.entries > 0 then
+        ngx_log(DEBUG, fmt("tranferring buffer entries to processing pipe line, buffercount[%d]", #self.entry_buffer.entries))
+        self.batch_to_process[#self.batch_to_process + 1] = self.entry_buffer
+        self.entry_buffer = { entries = {}, retry_count = 0 }
+    end
+
+    if(#self.batch_to_process > 0) then
+        repeat
 
 Review comment:
   Avoid using `table.move`, this API is slow.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] sshniro commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
sshniro commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r379290144
 
 

 ##########
 File path: t/plugin/batch-processor-refac.t
 ##########
 @@ -0,0 +1,374 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+log_level('debug');
+repeat_each(1);
+no_long_string();
+no_root_location();
+run_tests;
+
+__DATA__
+
+=== TEST 1: wrong configuration parameters
+--- config
+    location /t {
+        content_by_lua_block {
+            local Batch = require("apisix.plugins.batch-processor")
+            local func_to_send = function(elements)
+                return true
+            end
+
+            local config = {
+                max_retry_count  = 2,
+                batch_max_size = 1,
+                process_delay  = 0,
+                retry_delay  = 0,
+            }
+
+            local log_buffer, err = Batch:new("", config)
+
+            if log_buffer then
+                log_buffer:push({hello='world'})
+                ngx.say("done")
+            end
+
+            if not log_buffer then
+                ngx.say("failed")
+            end
+
+        }
+    }
+--- request
+GET /t
+--- response_body
+failed
+--- wait: 0.5
+
+
+=== TEST 2: sanity
+--- config
+    location /t {
+        content_by_lua_block {
+            local Batch = require("apisix.plugins.batch-processor")
+            local func_to_send = function(elements)
+                return true
+            end
+
+            local config = {
+                max_retry_count  = 2,
+                batch_max_size = 1,
+                process_delay  = 0,
+                retry_delay  = 0,
+            }
+
+            local log_buffer, err = Batch:new(func_to_send, config)
+
+            if not log_buffer then
+                ngx.say(err)
+            end
+
+            log_buffer:push({hello='world'})
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+done
+--- error_log
+Batch Processor[log buffer] successfully processed the entries
+--- wait: 0.5
+
+=== TEST 3: batch processor timeout exceeded
 
 Review comment:
   Thanks, having issues reindexing in Ubuntu 18. I will update it during the weekend.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] sshniro commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
sshniro commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r380126056
 
 

 ##########
 File path: lua/apisix/plugins/batch-processor.lua
 ##########
 @@ -0,0 +1,180 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+    __index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+    type = "object",
+    properties = {
+        name = {type = "string", default = "log buffer"},
+        max_retry_count = {type = "integer", minimum = 0, default= 0},
+        retry_delay = {type = "integer", minimum = 0, default= 1},
+        buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted
+        inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive
+        batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted
+    }
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+    local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+    if not hdl then
+        core.log.error("failed to create process timer: ", err)
+        return
+    end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+    if premature then
+        return
+    end
+
+    local ok, err = batch_processor.func(batch.entries)
+    if ok then
+        ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name))
+
+    else
+        batch.retry_count = batch.retry_count + 1
+        if batch.retry_count < batch_processor.max_retry_count then
+            core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err)
+            schedule_func_exec(batch_processor, batch_processor.retry_delay, batch)
+        else
+            core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count))
+        end
+    end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+    if premature then
+        return
+    end
+
+    if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or
+            now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then
+        core.log.debug(fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name))
 
 Review comment:
   Refactored similar lines.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r379945418
 
 

 ##########
 File path: lua/apisix/plugins/batch-processor.lua
 ##########
 @@ -0,0 +1,180 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+    __index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+    type = "object",
+    properties = {
+        name = {type = "string", default = "log buffer"},
+        max_retry_count = {type = "integer", minimum = 0, default= 0},
+        retry_delay = {type = "integer", minimum = 0, default= 1},
+        buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted
+        inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive
+        batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted
+    }
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+    local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+    if not hdl then
+        core.log.error("failed to create process timer: ", err)
+        return
+    end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+    if premature then
+        return
+    end
+
+    local ok, err = batch_processor.func(batch.entries)
+    if ok then
 
 Review comment:
   please take a look at this new style:
   
   ```lua
   local ok , err = ...
   if not ok then
       batch.retry_count = batch.retry_count + 1
       ... 
   end
   
   core.log.debug(...)
   return
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] sshniro commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
sshniro commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r380200719
 
 

 ##########
 File path: lua/apisix/plugins/batch-processor.lua
 ##########
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+    __index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+    type = "object",
+    properties = {
+        name = {type = "string", default = "log buffer"},
+        max_retry_count = {type = "integer", minimum = 0, default= 0},
+        retry_delay = {type = "integer", minimum = 0, default= 1},
+        buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted
+        inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive
+        batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted
+    }
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+    local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+    if not hdl then
+        core.log.error("failed to create process timer: ", err)
+        return
+    end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+    if premature then
+        return
+    end
+
+    local ok, err = batch_processor.func(batch.entries)
+    if ok then
+        ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name))
+
+    else
+        batch.retry_count = batch.retry_count + 1
+        if batch.retry_count < batch_processor.max_retry_count then
+            core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err)
+            schedule_func_exec(batch_processor, batch_processor.retry_delay, batch)
+        else
+            core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count))
+        end
+    end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
+    if premature then
+        return
+    end
+
+    if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or
+            now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then
+        ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name))
+        batch_processor:process_buffer()
+        batch_processor.isTimerRunning = false
+        return
+    end
+
+    -- buffer duration did not exceed or the buffer is active, extending the timer
+    ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", batch_processor.name))
+    create_buffer_timer(batch_processor)
+end
+
+
+create_buffer_timer = function(batch_processor)
 
 Review comment:
   Resolved

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] moonming merged pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
moonming merged pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r378637883
 
 

 ##########
 File path: lua/apisix/plugins/batch-processor.lua
 ##########
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+    __index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+    type = "object",
+    properties = {
+        name = {type = "string", default = "log buffer"},
+        max_retry_count = {type = "integer", minimum = 0, default= 0},
+        retry_delay = {type = "integer", minimum = 0, default= 1},
+        buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted
+        inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive
+        batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted
+    }
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+    local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+    if not hdl then
+        core.log.error("failed to create process timer: ", err)
+        return
+    end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+    if premature then
+        return
+    end
+
+    local ok, err = batch_processor.func(batch.entries)
+    if ok then
+        ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name))
+
+    else
+        batch.retry_count = batch.retry_count + 1
+        if batch.retry_count < batch_processor.max_retry_count then
+            core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err)
+            schedule_func_exec(batch_processor, batch_processor.retry_delay, batch)
+        else
+            core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count))
+        end
+    end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
 
 Review comment:
   please remove this useless blank line

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r380403114
 
 

 ##########
 File path: lua/apisix/plugins/batch-processor.lua
 ##########
 @@ -0,0 +1,181 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local ipairs = ipairs
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+    __index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+    type = "object",
+    properties = {
+        name = {type = "string", default = "log buffer"},
+        max_retry_count = {type = "integer", minimum = 0, default= 0},
+        retry_delay = {type = "integer", minimum = 0, default= 1},
+        buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted
+        inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive
+        batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted
+    }
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+    local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+    if not hdl then
+        core.log.error("failed to create process timer: ", err)
+        return
+    end
+end
+
+
+function execute_func(premature, batch_processor, batch)
+    if premature then
+        return
+    end
+
+    local ok, err = batch_processor.func(batch.entries)
+    if not ok then
+        batch.retry_count = batch.retry_count + 1
+        if batch.retry_count < batch_processor.max_retry_count then
+            core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err)
+            schedule_func_exec(batch_processor, batch_processor.retry_delay, batch)
+        else
+            core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries",
+                batch_processor.name, batch.retry_count))
+        end
+        return
+    end
+
+    core.log.debug(fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name))
+end
+
+
+local function flush_buffer(premature, batch_processor)
+    if premature then
+        return
+    end
+
+    if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or
+            now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then
+        core.log.debug(fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush",
+            batch_processor.name))
+        batch_processor:process_buffer()
+        batch_processor.isTimerRunning = false
+        return
+    end
+
+    -- buffer duration did not exceed or the buffer is active, extending the timer
+    ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", batch_processor.name))
+    create_buffer_timer(batch_processor)
+end
+
+
+function create_buffer_timer(batch_processor)
+    local hdl, err = timer_at(batch_processor.inactive_timeout, flush_buffer, batch_processor)
+    if not hdl then
+        core.log.error("failed to create buffer timer: ", err)
+        return
+    end
+    batch_processor.isTimerRunning = true
+end
+
+
+function Batch_Processor:new(func, config)
+    local ok, err = core.schema.check(schema, config)
+
+    if not ok then
+        return err
+    end
+
+    if not(type(func) == "function") then
+        return nil, "Invalid argument, arg #1 must be a function"
+    end
+
+    local batch_processor = {
+        func = func,
+        buffer_duration = config.buffer_duration,
+        inactive_timeout = config.inactive_timeout,
+        max_retry_count = config.max_retry_count,
+        batch_max_size = config.batch_max_size,
+        retry_delay = config.retry_delay,
+        name = config.name,
+        batch_to_process = {},
+        entry_buffer = { entries = {}, retry_count = 0},
+        isTimerRunning = false,
 
 Review comment:
   name style: `is_timer_running`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] sshniro opened a new pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
sshniro opened a new pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121
 
 
   Fix for #1076 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] sshniro commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
sshniro commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r379939157
 
 

 ##########
 File path: lua/apisix/plugins/batch-processor.lua
 ##########
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+    __index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+    type = "object",
+    properties = {
+        name = {type = "string", default = "log buffer"},
+        max_retry_count = {type = "integer", minimum = 0, default= 0},
+        retry_delay = {type = "integer", minimum = 0, default= 1},
+        buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted
+        inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive
+        batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted
+    }
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+    local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+    if not hdl then
+        core.log.error("failed to create process timer: ", err)
+        return
+    end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+    if premature then
+        return
+    end
+
+    local ok, err = batch_processor.func(batch.entries)
+    if ok then
+        ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name))
+
+    else
+        batch.retry_count = batch.retry_count + 1
+        if batch.retry_count < batch_processor.max_retry_count then
+            core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err)
+            schedule_func_exec(batch_processor, batch_processor.retry_delay, batch)
+        else
+            core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count))
+        end
+    end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
+    if premature then
+        return
+    end
+
+    if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or
+            now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then
+        ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name))
+        batch_processor:process_buffer()
+        batch_processor.isTimerRunning = false
+        return
+    end
+
+    -- buffer duration did not exceed or the buffer is active, extending the timer
+    ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", batch_processor.name))
+    create_buffer_timer(batch_processor)
+end
+
+
+create_buffer_timer = function(batch_processor)
+    local hdl, err = timer_at(batch_processor.inactive_timeout, flush_buffer, batch_processor)
+    if not hdl then
+        core.log.error("failed to create buffer timer: ", err)
+        return
+    end
+    batch_processor.isTimerRunning = true
+end
+
+
+function Batch_Processor:new(func, config)
+    local ok, err = core.schema.check(schema, config)
+
+    if not ok then
+        return err
+    end
+
+    if not(type(func) == "function") then
+        return nil, "Invalid argument, arg #1 must be a function"
+    end
+
+    local batch_processor = {
+        func = func,
+        buffer_duration = config.buffer_duration,
+        inactive_timeout = config.inactive_timeout,
+        max_retry_count = config.max_retry_count,
+        batch_max_size = config.batch_max_size,
+        retry_delay = config.retry_delay,
+        name = config.name,
+        batch_to_process = {},
+        entry_buffer = { entries = {}, retry_count = 0},
+        isTimerRunning = false,
+        first_entry_t = 0,
+        last_entry_t = 0
+    }
+
+    return setmetatable(batch_processor, Batch_Processor_mt)
+end
+
+
+function Batch_Processor:push(entry)
+    -- if the batch size is one then immediately send for processing
+    if self.batch_max_size == 1 then
+        local batch = { entries = { entry }, retry_count = 0 }
+        schedule_func_exec(self, 0, batch)
+    end
+
+    local entries = self.entry_buffer.entries
+    table.insert(entries, entry)
+
+    if #entries == 1 then
+        self.first_entry_t = now()
+    end
+    self.last_entry_t = now()
+
+    if self.batch_max_size <= #entries then
+        ngx_log(DEBUG, fmt("batch processor[%s] batch max size has exceeded", self.name))
+        self:process_buffer()
+    elseif not self.isTimerRunning then
+        create_buffer_timer(self)
+    end
+
+end
+
+
+function Batch_Processor:process_buffer()
+    if #self.entry_buffer.entries > 0 then
+        ngx_log(DEBUG, fmt("tranferring buffer entries to processing pipe line, buffercount[%d]", #self.entry_buffer.entries))
+        self.batch_to_process[#self.batch_to_process + 1] = self.entry_buffer
+        self.entry_buffer = { entries = {}, retry_count = 0 }
+    end
+
+    if(#self.batch_to_process > 0) then
+        repeat
 
 Review comment:
   Hi @membphis can I get some guidance on this? To avoid using the remove method, 
   1. Should I use a deep copy approach 
   ```lua
   local temp = deepcopy(self.batch_to_process)
   self.batch_to_process = {}
   -- start processing the temp batch
   ```
   
   2. Or use a [reindexing approach](https://stackoverflow.com/a/53038524/6164611) as discussed in this answer?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r379944560
 
 

 ##########
 File path: lua/apisix/plugins/batch-processor.lua
 ##########
 @@ -0,0 +1,180 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+    __index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+    type = "object",
+    properties = {
+        name = {type = "string", default = "log buffer"},
+        max_retry_count = {type = "integer", minimum = 0, default= 0},
+        retry_delay = {type = "integer", minimum = 0, default= 1},
+        buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted
+        inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive
+        batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted
+    }
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+    local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+    if not hdl then
+        core.log.error("failed to create process timer: ", err)
+        return
+    end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+    if premature then
+        return
+    end
+
+    local ok, err = batch_processor.func(batch.entries)
+    if ok then
+        ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name))
+
+    else
+        batch.retry_count = batch.retry_count + 1
+        if batch.retry_count < batch_processor.max_retry_count then
+            core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err)
+            schedule_func_exec(batch_processor, batch_processor.retry_delay, batch)
+        else
+            core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count))
+        end
+    end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+    if premature then
+        return
+    end
+
+    if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or
+            now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then
+        core.log.debug(fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name))
 
 Review comment:
   code style: this line is too long
   
   The single line of code should be less than 80 characters, and please fix other similar points.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r378637750
 
 

 ##########
 File path: lua/apisix/plugins/batch-processor.lua
 ##########
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+    __index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+    type = "object",
+    properties = {
+        name = {type = "string", default = "log buffer"},
+        max_retry_count = {type = "integer", minimum = 0, default= 0},
+        retry_delay = {type = "integer", minimum = 0, default= 1},
+        buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted
+        inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive
+        batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted
+    }
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+    local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+    if not hdl then
+        core.log.error("failed to create process timer: ", err)
+        return
+    end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+    if premature then
+        return
+    end
+
+    local ok, err = batch_processor.func(batch.entries)
+    if ok then
+        ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name))
+
+    else
+        batch.retry_count = batch.retry_count + 1
+        if batch.retry_count < batch_processor.max_retry_count then
+            core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err)
+            schedule_func_exec(batch_processor, batch_processor.retry_delay, batch)
+        else
+            core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count))
+        end
+    end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
+    if premature then
+        return
+    end
+
+    if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or
+            now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then
+        ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name))
 
 Review comment:
   change to `core.log.debug(...)`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r379945467
 
 

 ##########
 File path: lua/apisix/plugins/batch-processor.lua
 ##########
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+    __index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+    type = "object",
+    properties = {
+        name = {type = "string", default = "log buffer"},
+        max_retry_count = {type = "integer", minimum = 0, default= 0},
+        retry_delay = {type = "integer", minimum = 0, default= 1},
+        buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted
+        inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive
+        batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted
+    }
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+    local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+    if not hdl then
+        core.log.error("failed to create process timer: ", err)
+        return
+    end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+    if premature then
+        return
+    end
+
+    local ok, err = batch_processor.func(batch.entries)
+    if ok then
+        ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name))
+
+    else
+        batch.retry_count = batch.retry_count + 1
+        if batch.retry_count < batch_processor.max_retry_count then
+            core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err)
+            schedule_func_exec(batch_processor, batch_processor.retry_delay, batch)
+        else
+            core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count))
+        end
+    end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
+    if premature then
+        return
+    end
+
+    if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or
+            now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then
+        ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name))
+        batch_processor:process_buffer()
+        batch_processor.isTimerRunning = false
+        return
+    end
+
+    -- buffer duration did not exceed or the buffer is active, extending the timer
+    ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", batch_processor.name))
+    create_buffer_timer(batch_processor)
+end
+
+
+create_buffer_timer = function(batch_processor)
 
 Review comment:
   same problem with function `execute_func`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r380402502
 
 

 ##########
 File path: lua/apisix/plugins/batch-processor.lua
 ##########
 @@ -0,0 +1,181 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local ipairs = ipairs
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+    __index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+    type = "object",
+    properties = {
+        name = {type = "string", default = "log buffer"},
+        max_retry_count = {type = "integer", minimum = 0, default= 0},
+        retry_delay = {type = "integer", minimum = 0, default= 1},
+        buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted
+        inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive
+        batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted
+    }
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+    local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+    if not hdl then
+        core.log.error("failed to create process timer: ", err)
+        return
+    end
+end
+
+
+function execute_func(premature, batch_processor, batch)
+    if premature then
+        return
+    end
+
+    local ok, err = batch_processor.func(batch.entries)
+    if not ok then
+        batch.retry_count = batch.retry_count + 1
+        if batch.retry_count < batch_processor.max_retry_count then
+            core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err)
+            schedule_func_exec(batch_processor, batch_processor.retry_delay, batch)
+        else
+            core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries",
+                batch_processor.name, batch.retry_count))
+        end
+        return
+    end
+
+    core.log.debug(fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name))
+end
+
+
+local function flush_buffer(premature, batch_processor)
+    if premature then
+        return
+    end
+
+    if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or
+            now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then
+        core.log.debug(fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush",
+            batch_processor.name))
+        batch_processor:process_buffer()
+        batch_processor.isTimerRunning = false
+        return
+    end
+
+    -- buffer duration did not exceed or the buffer is active, extending the timer
+    ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", batch_processor.name))
+    create_buffer_timer(batch_processor)
+end
+
+
+function create_buffer_timer(batch_processor)
+    local hdl, err = timer_at(batch_processor.inactive_timeout, flush_buffer, batch_processor)
+    if not hdl then
+        core.log.error("failed to create buffer timer: ", err)
+        return
+    end
+    batch_processor.isTimerRunning = true
+end
+
+
+function Batch_Processor:new(func, config)
+    local ok, err = core.schema.check(schema, config)
+
 
 Review comment:
   remove this blank line

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] sshniro commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
sshniro commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r378760095
 
 

 ##########
 File path: lua/apisix/plugins/batch-processor.lua
 ##########
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+    __index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+    type = "object",
+    properties = {
+        name = {type = "string", default = "log buffer"},
+        max_retry_count = {type = "integer", minimum = 0, default= 0},
+        retry_delay = {type = "integer", minimum = 0, default= 1},
+        buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted
+        inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive
+        batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted
+    }
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+    local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+    if not hdl then
+        core.log.error("failed to create process timer: ", err)
+        return
+    end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+    if premature then
+        return
+    end
+
+    local ok, err = batch_processor.func(batch.entries)
+    if ok then
+        ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name))
+
+    else
+        batch.retry_count = batch.retry_count + 1
+        if batch.retry_count < batch_processor.max_retry_count then
+            core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err)
+            schedule_func_exec(batch_processor, batch_processor.retry_delay, batch)
+        else
+            core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count))
+        end
+    end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
+    if premature then
+        return
+    end
+
+    if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or
+            now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then
+        ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name))
 
 Review comment:
   Hi, @membphis I've tried the `core.log.debug` option but the logs do not appear in the `t/servroot/logs/error.log` file and only `ngx_log(DEBUG,"")` works.
   
   Do I have to enable any parameter to do this?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] sshniro commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
sshniro commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r380445446
 
 

 ##########
 File path: t/plugin/batch-processor.t
 ##########
 @@ -0,0 +1,367 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+log_level('debug');
+repeat_each(1);
+no_long_string();
+no_root_location();
+run_tests;
+
+__DATA__
+
+=== TEST 1: send invalid arguments for constructor
+--- config
+    location /t {
+        content_by_lua_block {
+            local Batch = require("apisix.plugins.batch-processor")
+            local config = {
+                max_retry_count  = 2,
+                batch_max_size = 1,
+                process_delay  = 0,
+                retry_delay  = 0,
+            }
+            local func_to_send = function(elements)
+                return true
+            end
+            local log_buffer, err = Batch:new("", config)
+
+            if log_buffer then
+                log_buffer:push({hello='world'})
+                ngx.say("done")
+            end
+
+            if not log_buffer then
+                ngx.say("failed")
+            end
+
+        }
+    }
+--- request
+GET /t
+--- response_body
+failed
+--- wait: 0.5
+
+
+
+=== TEST 2: sanity
+--- config
+    location /t {
+        content_by_lua_block {
+            local Batch = require("apisix.plugins.batch-processor")
+            local func_to_send = function(elements)
+                return true
+            end
+
+            local config = {
+                max_retry_count  = 2,
+                batch_max_size = 1,
+                process_delay  = 0,
+                retry_delay  = 0,
+            }
+
+            local log_buffer, err = Batch:new(func_to_send, config)
+
+            if not log_buffer then
+                ngx.say(err)
+            end
+
+            log_buffer:push({hello='world'})
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+done
+--- error_log
+Batch Processor[log buffer] successfully processed the entries
+--- wait: 0.5
+
+
+
+=== TEST 3: batch processor timeout exceeded
+--- config
+    location /t {
+        content_by_lua_block {
+            local Batch = require("apisix.plugins.batch-processor")
+            local config = {
+                max_retry_count  = 2,
+                batch_max_size = 2,
+                process_delay  = 0,
+                retry_delay  = 0,
+                inactive_timeout = 1
+            }
+            local func_to_send = function(elements)
+                return true
+            end
+            local log_buffer, err = Batch:new(func_to_send, config)
+
+            if not log_buffer then
+                ngx.say(err)
+            end
+
+            log_buffer:push({hello='world'})
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+done
+--- error_log
+BatchProcessor[log buffer] buffer duration exceeded, activating buffer flush
+Batch Processor[log buffer] successfully processed the entries
+--- wait: 3
+
+
+
+=== TEST 4: batch processor batch max size exceeded
+--- config
+    location /t {
+        content_by_lua_block {
+            local Batch = require("apisix.plugins.batch-processor")
+            local config = {
+                max_retry_count  = 2,
+                batch_max_size = 2,
+                process_delay  = 0,
+                retry_delay  = 0,
+            }
+            local func_to_send = function(elements)
+                return true
+            end
+            local log_buffer, err = Batch:new(func_to_send, config)
+
+            if not log_buffer then
+                ngx.say(err)
+            end
+
+            log_buffer:push({hello='world'})
+            log_buffer:push({hello='world'})
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+done
+--- no_error_log
+BatchProcessor[log buffer] activating flush due to no activity
+--- error_log
+batch processor[log buffer] batch max size has exceeded
+Batch Processor[log buffer] successfully processed the entries
+--- wait: 0.5
+
+
+
+=== TEST 5: first failed to process and second try success
+--- config
+    location /t {
+        content_by_lua_block {
+            local Batch = require("apisix.plugins.batch-processor")
+            local core = require("apisix.core")
+            local retry = false
+            local config = {
+                max_retry_count  = 2,
+                batch_max_size = 2,
+                process_delay  = 0,
+                retry_delay  = 0,
+            }
+            local func_to_send = function(elements)
+                if not retry then
+                    retry = true
+                    return false
+                end
+                return true
+            end
+            local log_buffer, err = Batch:new(func_to_send, config)
+
+            if not log_buffer then
+                ngx.say(err)
+            end
+
+            log_buffer:push({hello='world'})
+            log_buffer:push({hello='world'})
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+done
+--- error_log
+Batch Processor[log buffer] failed to process entries
+Batch Processor[log buffer] successfully processed the entries
+--- wait: 0.5
+
+
+
+=== TEST 6: Exceeding max retry count
+--- config
+    location /t {
+        content_by_lua_block {
+            local Batch = require("apisix.plugins.batch-processor")
+            local config = {
+                max_retry_count  = 2,
+                batch_max_size = 2,
+                process_delay  = 0,
+                retry_delay  = 0,
+            }
+            local func_to_send = function(elements)
+                return false
+            end
+            local log_buffer, err = Batch:new(func_to_send, config)
+
+            if not log_buffer then
+                ngx.say(err)
+            end
+
+            log_buffer:push({hello='world'})
+            log_buffer:push({hello='world'})
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+done
+--- no_error_log
+BatchProcessor[log buffer] buffer duration exceeded, activating buffer flush
+--- error_log
+Batch Processor[log buffer] failed to process entries
+Batch Processor[log buffer] exceeded the max_retry_count
+--- wait: 0.5
+
+
+
+=== TEST 7: two batches
+--- config
+    location /t {
+        content_by_lua_block {
+            local Batch = require("apisix.plugins.batch-processor")
+            local core = require("apisix.core")
+            local count = 0
+            local config = {
+                max_retry_count  = 2,
+                batch_max_size = 2,
+                process_delay  = 0,
+                retry_delay  = 0,
+            }
+            local func_to_send = function(elements)
+                count = count + 1
+                core.log.info("batch[", count , "] sent")
 
 Review comment:
   Added an additional test case (test case 10) to json encode and print the elements.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r378643354
 
 

 ##########
 File path: lua/apisix/plugins/batch-processor.lua
 ##########
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+    __index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+    type = "object",
+    properties = {
+        name = {type = "string", default = "log buffer"},
+        max_retry_count = {type = "integer", minimum = 0, default= 0},
+        retry_delay = {type = "integer", minimum = 0, default= 1},
+        buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted
+        inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive
+        batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted
+    }
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+    local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+    if not hdl then
+        core.log.error("failed to create process timer: ", err)
+        return
+    end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+    if premature then
+        return
+    end
+
+    local ok, err = batch_processor.func(batch.entries)
+    if ok then
+        ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name))
+
+    else
+        batch.retry_count = batch.retry_count + 1
+        if batch.retry_count < batch_processor.max_retry_count then
+            core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err)
+            schedule_func_exec(batch_processor, batch_processor.retry_delay, batch)
+        else
+            core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count))
+        end
+    end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
+    if premature then
+        return
+    end
+
+    if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or
+            now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then
+        ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name))
+        batch_processor:process_buffer()
+        batch_processor.isTimerRunning = false
+        return
+    end
+
+    -- buffer duration did not exceed or the buffer is active, extending the timer
+    ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", batch_processor.name))
+    create_buffer_timer(batch_processor)
+end
+
+
+create_buffer_timer = function(batch_processor)
+    local hdl, err = timer_at(batch_processor.inactive_timeout, flush_buffer, batch_processor)
+    if not hdl then
+        core.log.error("failed to create buffer timer: ", err)
+        return
+    end
+    batch_processor.isTimerRunning = true
+end
+
+
+function Batch_Processor:new(func, config)
+    local ok, err = core.schema.check(schema, config)
+
+    if not ok then
+        return err
+    end
+
+    if not(type(func) == "function") then
+        return nil, "Invalid argument, arg #1 must be a function"
+    end
+
+    local batch_processor = {
+        func = func,
+        buffer_duration = config.buffer_duration,
+        inactive_timeout = config.inactive_timeout,
+        max_retry_count = config.max_retry_count,
+        batch_max_size = config.batch_max_size,
+        retry_delay = config.retry_delay,
+        name = config.name,
+        batch_to_process = {},
+        entry_buffer = { entries = {}, retry_count = 0},
+        isTimerRunning = false,
+        first_entry_t = 0,
+        last_entry_t = 0
+    }
+
+    return setmetatable(batch_processor, Batch_Processor_mt)
+end
+
+
+function Batch_Processor:push(entry)
+    -- if the batch size is one then immediately send for processing
+    if self.batch_max_size == 1 then
+        local batch = { entries = { entry }, retry_count = 0 }
+        schedule_func_exec(self, 0, batch)
+    end
+
+    local entries = self.entry_buffer.entries
+    table.insert(entries, entry)
+
+    if #entries == 1 then
+        self.first_entry_t = now()
+    end
+    self.last_entry_t = now()
+
+    if self.batch_max_size <= #entries then
+        ngx_log(DEBUG, fmt("batch processor[%s] batch max size has exceeded", self.name))
 
 Review comment:
   ditto

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r378643680
 
 

 ##########
 File path: lua/apisix/plugins/batch-processor.lua
 ##########
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+    __index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+    type = "object",
+    properties = {
+        name = {type = "string", default = "log buffer"},
+        max_retry_count = {type = "integer", minimum = 0, default= 0},
+        retry_delay = {type = "integer", minimum = 0, default= 1},
+        buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted
+        inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive
+        batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted
+    }
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+    local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+    if not hdl then
+        core.log.error("failed to create process timer: ", err)
+        return
+    end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+    if premature then
+        return
+    end
+
+    local ok, err = batch_processor.func(batch.entries)
+    if ok then
+        ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name))
+
+    else
+        batch.retry_count = batch.retry_count + 1
+        if batch.retry_count < batch_processor.max_retry_count then
+            core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err)
+            schedule_func_exec(batch_processor, batch_processor.retry_delay, batch)
+        else
+            core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count))
+        end
+    end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
+    if premature then
+        return
+    end
+
+    if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or
+            now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then
+        ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name))
+        batch_processor:process_buffer()
+        batch_processor.isTimerRunning = false
+        return
+    end
+
+    -- buffer duration did not exceed or the buffer is active, extending the timer
+    ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", batch_processor.name))
+    create_buffer_timer(batch_processor)
+end
+
+
+create_buffer_timer = function(batch_processor)
+    local hdl, err = timer_at(batch_processor.inactive_timeout, flush_buffer, batch_processor)
+    if not hdl then
+        core.log.error("failed to create buffer timer: ", err)
+        return
+    end
+    batch_processor.isTimerRunning = true
+end
+
+
+function Batch_Processor:new(func, config)
+    local ok, err = core.schema.check(schema, config)
+
+    if not ok then
+        return err
+    end
+
+    if not(type(func) == "function") then
+        return nil, "Invalid argument, arg #1 must be a function"
+    end
+
+    local batch_processor = {
+        func = func,
+        buffer_duration = config.buffer_duration,
+        inactive_timeout = config.inactive_timeout,
+        max_retry_count = config.max_retry_count,
+        batch_max_size = config.batch_max_size,
+        retry_delay = config.retry_delay,
+        name = config.name,
+        batch_to_process = {},
+        entry_buffer = { entries = {}, retry_count = 0},
+        isTimerRunning = false,
+        first_entry_t = 0,
+        last_entry_t = 0
+    }
+
+    return setmetatable(batch_processor, Batch_Processor_mt)
+end
+
+
+function Batch_Processor:push(entry)
+    -- if the batch size is one then immediately send for processing
+    if self.batch_max_size == 1 then
+        local batch = { entries = { entry }, retry_count = 0 }
+        schedule_func_exec(self, 0, batch)
+    end
+
+    local entries = self.entry_buffer.entries
+    table.insert(entries, entry)
+
+    if #entries == 1 then
+        self.first_entry_t = now()
+    end
+    self.last_entry_t = now()
+
+    if self.batch_max_size <= #entries then
+        ngx_log(DEBUG, fmt("batch processor[%s] batch max size has exceeded", self.name))
+        self:process_buffer()
+    elseif not self.isTimerRunning then
 
 Review comment:
   code style, this is better:
   
   ```lua
   if self.batch_max_size <= #entries then
       ... ...
       return
   end
   
   if not self.isTimerRunning then
       ... ...
   end
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r378637838
 
 

 ##########
 File path: lua/apisix/plugins/batch-processor.lua
 ##########
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+    __index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+    type = "object",
+    properties = {
+        name = {type = "string", default = "log buffer"},
+        max_retry_count = {type = "integer", minimum = 0, default= 0},
+        retry_delay = {type = "integer", minimum = 0, default= 1},
+        buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted
+        inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive
+        batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted
+    }
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+    local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+    if not hdl then
+        core.log.error("failed to create process timer: ", err)
+        return
+    end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+    if premature then
+        return
+    end
+
+    local ok, err = batch_processor.func(batch.entries)
+    if ok then
+        ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name))
+
+    else
+        batch.retry_count = batch.retry_count + 1
+        if batch.retry_count < batch_processor.max_retry_count then
+            core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err)
+            schedule_func_exec(batch_processor, batch_processor.retry_delay, batch)
+        else
+            core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count))
+        end
+    end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
+    if premature then
+        return
+    end
+
+    if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or
+            now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then
+        ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name))
+        batch_processor:process_buffer()
+        batch_processor.isTimerRunning = false
+        return
+    end
+
+    -- buffer duration did not exceed or the buffer is active, extending the timer
+    ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", batch_processor.name))
 
 Review comment:
   ditto

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] sshniro commented on issue #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
sshniro commented on issue #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#issuecomment-585051696
 
 
   @membphis please do have a look.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] sshniro commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
sshniro commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r380445469
 
 

 ##########
 File path: lua/apisix/plugins/batch-processor.lua
 ##########
 @@ -0,0 +1,181 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local ipairs = ipairs
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+    __index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+    type = "object",
+    properties = {
+        name = {type = "string", default = "log buffer"},
+        max_retry_count = {type = "integer", minimum = 0, default= 0},
+        retry_delay = {type = "integer", minimum = 0, default= 1},
+        buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted
+        inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive
+        batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted
+    }
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+    local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+    if not hdl then
+        core.log.error("failed to create process timer: ", err)
+        return
+    end
+end
+
+
+function execute_func(premature, batch_processor, batch)
+    if premature then
+        return
+    end
+
+    local ok, err = batch_processor.func(batch.entries)
+    if not ok then
+        batch.retry_count = batch.retry_count + 1
+        if batch.retry_count < batch_processor.max_retry_count then
+            core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err)
+            schedule_func_exec(batch_processor, batch_processor.retry_delay, batch)
+        else
+            core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries",
+                batch_processor.name, batch.retry_count))
+        end
+        return
+    end
+
+    core.log.debug(fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name))
+end
+
+
+local function flush_buffer(premature, batch_processor)
+    if premature then
+        return
+    end
+
+    if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or
+            now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then
+        core.log.debug(fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush",
+            batch_processor.name))
+        batch_processor:process_buffer()
+        batch_processor.isTimerRunning = false
+        return
+    end
+
+    -- buffer duration did not exceed or the buffer is active, extending the timer
+    ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", batch_processor.name))
+    create_buffer_timer(batch_processor)
+end
+
+
+function create_buffer_timer(batch_processor)
+    local hdl, err = timer_at(batch_processor.inactive_timeout, flush_buffer, batch_processor)
+    if not hdl then
+        core.log.error("failed to create buffer timer: ", err)
+        return
+    end
+    batch_processor.isTimerRunning = true
+end
+
+
+function Batch_Processor:new(func, config)
+    local ok, err = core.schema.check(schema, config)
+
+    if not ok then
+        return err
+    end
+
+    if not(type(func) == "function") then
+        return nil, "Invalid argument, arg #1 must be a function"
+    end
+
+    local batch_processor = {
+        func = func,
+        buffer_duration = config.buffer_duration,
+        inactive_timeout = config.inactive_timeout,
+        max_retry_count = config.max_retry_count,
+        batch_max_size = config.batch_max_size,
+        retry_delay = config.retry_delay,
+        name = config.name,
+        batch_to_process = {},
+        entry_buffer = { entries = {}, retry_count = 0},
+        isTimerRunning = false,
 
 Review comment:
   Refactored.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] membphis commented on issue #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
membphis commented on issue #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#issuecomment-588677956
 
 
   @sshniro many thx ^_^

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r379231789
 
 

 ##########
 File path: lua/apisix/plugins/batch-processor.lua
 ##########
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+    __index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+    type = "object",
+    properties = {
+        name = {type = "string", default = "log buffer"},
+        max_retry_count = {type = "integer", minimum = 0, default= 0},
+        retry_delay = {type = "integer", minimum = 0, default= 1},
+        buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted
+        inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive
+        batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted
+    }
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+    local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+    if not hdl then
+        core.log.error("failed to create process timer: ", err)
+        return
+    end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+    if premature then
+        return
+    end
+
+    local ok, err = batch_processor.func(batch.entries)
+    if ok then
+        ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name))
+
+    else
+        batch.retry_count = batch.retry_count + 1
+        if batch.retry_count < batch_processor.max_retry_count then
+            core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err)
+            schedule_func_exec(batch_processor, batch_processor.retry_delay, batch)
+        else
+            core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count))
+        end
+    end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
+    if premature then
+        return
+    end
+
+    if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or
+            now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then
+        ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name))
 
 Review comment:
   I have submitted a PR to fix it: https://github.com/apache/incubator-apisix/pull/1124
   
   You  can rebase your branch after https://github.com/apache/incubator-apisix/pull/1124 merged.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] sshniro commented on issue #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
sshniro commented on issue #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#issuecomment-589528645
 
 
   Thanks for the pointers @membphis , now I will refactor the UDP logger to use this util.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r379944685
 
 

 ##########
 File path: lua/apisix/plugins/batch-processor.lua
 ##########
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+    __index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+    type = "object",
+    properties = {
+        name = {type = "string", default = "log buffer"},
+        max_retry_count = {type = "integer", minimum = 0, default= 0},
+        retry_delay = {type = "integer", minimum = 0, default= 1},
+        buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted
+        inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive
+        batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted
+    }
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+    local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+    if not hdl then
+        core.log.error("failed to create process timer: ", err)
+        return
+    end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+    if premature then
+        return
+    end
+
+    local ok, err = batch_processor.func(batch.entries)
+    if ok then
+        ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name))
+
+    else
+        batch.retry_count = batch.retry_count + 1
+        if batch.retry_count < batch_processor.max_retry_count then
+            core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err)
+            schedule_func_exec(batch_processor, batch_processor.retry_delay, batch)
+        else
+            core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count))
+        end
+    end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
+    if premature then
+        return
+    end
+
+    if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or
+            now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then
+        ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name))
+        batch_processor:process_buffer()
+        batch_processor.isTimerRunning = false
+        return
+    end
+
+    -- buffer duration did not exceed or the buffer is active, extending the timer
+    ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", batch_processor.name))
 
 Review comment:
   code style: `code.log.debug(...)`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] sshniro commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
sshniro commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r378760095
 
 

 ##########
 File path: lua/apisix/plugins/batch-processor.lua
 ##########
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+    __index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+    type = "object",
+    properties = {
+        name = {type = "string", default = "log buffer"},
+        max_retry_count = {type = "integer", minimum = 0, default= 0},
+        retry_delay = {type = "integer", minimum = 0, default= 1},
+        buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted
+        inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive
+        batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted
+    }
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+    local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+    if not hdl then
+        core.log.error("failed to create process timer: ", err)
+        return
+    end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+    if premature then
+        return
+    end
+
+    local ok, err = batch_processor.func(batch.entries)
+    if ok then
+        ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name))
+
+    else
+        batch.retry_count = batch.retry_count + 1
+        if batch.retry_count < batch_processor.max_retry_count then
+            core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err)
+            schedule_func_exec(batch_processor, batch_processor.retry_delay, batch)
+        else
+            core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count))
+        end
+    end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
+    if premature then
+        return
+    end
+
+    if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or
+            now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then
+        ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name))
 
 Review comment:
   Hi, @membphis I've tried the `core.log.debug` option but the logs do not appear in the `t/servroot/logs/error.log` file and only `ngx_log(DEBUG,"")` works.
   
   Do I have to enable any parameter to do this?
   The `log_level('debug');` option is also set in the test file.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r380403734
 
 

 ##########
 File path: t/plugin/batch-processor.t
 ##########
 @@ -0,0 +1,367 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+log_level('debug');
+repeat_each(1);
+no_long_string();
+no_root_location();
+run_tests;
+
+__DATA__
+
+=== TEST 1: send invalid arguments for constructor
+--- config
+    location /t {
+        content_by_lua_block {
+            local Batch = require("apisix.plugins.batch-processor")
+            local config = {
+                max_retry_count  = 2,
+                batch_max_size = 1,
+                process_delay  = 0,
+                retry_delay  = 0,
+            }
+            local func_to_send = function(elements)
+                return true
+            end
+            local log_buffer, err = Batch:new("", config)
+
+            if log_buffer then
+                log_buffer:push({hello='world'})
+                ngx.say("done")
+            end
+
+            if not log_buffer then
+                ngx.say("failed")
+            end
+
+        }
+    }
+--- request
+GET /t
+--- response_body
+failed
+--- wait: 0.5
+
+
+
+=== TEST 2: sanity
+--- config
+    location /t {
+        content_by_lua_block {
+            local Batch = require("apisix.plugins.batch-processor")
+            local func_to_send = function(elements)
+                return true
+            end
+
+            local config = {
+                max_retry_count  = 2,
+                batch_max_size = 1,
+                process_delay  = 0,
+                retry_delay  = 0,
+            }
+
+            local log_buffer, err = Batch:new(func_to_send, config)
+
+            if not log_buffer then
+                ngx.say(err)
+            end
+
+            log_buffer:push({hello='world'})
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+done
+--- error_log
+Batch Processor[log buffer] successfully processed the entries
+--- wait: 0.5
+
+
+
+=== TEST 3: batch processor timeout exceeded
+--- config
+    location /t {
+        content_by_lua_block {
+            local Batch = require("apisix.plugins.batch-processor")
+            local config = {
+                max_retry_count  = 2,
+                batch_max_size = 2,
+                process_delay  = 0,
+                retry_delay  = 0,
+                inactive_timeout = 1
+            }
+            local func_to_send = function(elements)
+                return true
+            end
+            local log_buffer, err = Batch:new(func_to_send, config)
+
+            if not log_buffer then
+                ngx.say(err)
+            end
+
+            log_buffer:push({hello='world'})
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+done
+--- error_log
+BatchProcessor[log buffer] buffer duration exceeded, activating buffer flush
+Batch Processor[log buffer] successfully processed the entries
+--- wait: 3
+
+
+
+=== TEST 4: batch processor batch max size exceeded
+--- config
+    location /t {
+        content_by_lua_block {
+            local Batch = require("apisix.plugins.batch-processor")
+            local config = {
+                max_retry_count  = 2,
+                batch_max_size = 2,
+                process_delay  = 0,
+                retry_delay  = 0,
+            }
+            local func_to_send = function(elements)
+                return true
+            end
+            local log_buffer, err = Batch:new(func_to_send, config)
+
+            if not log_buffer then
+                ngx.say(err)
+            end
+
+            log_buffer:push({hello='world'})
+            log_buffer:push({hello='world'})
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+done
+--- no_error_log
+BatchProcessor[log buffer] activating flush due to no activity
+--- error_log
+batch processor[log buffer] batch max size has exceeded
+Batch Processor[log buffer] successfully processed the entries
+--- wait: 0.5
+
+
+
+=== TEST 5: first failed to process and second try success
+--- config
+    location /t {
+        content_by_lua_block {
+            local Batch = require("apisix.plugins.batch-processor")
+            local core = require("apisix.core")
+            local retry = false
+            local config = {
+                max_retry_count  = 2,
+                batch_max_size = 2,
+                process_delay  = 0,
+                retry_delay  = 0,
+            }
+            local func_to_send = function(elements)
+                if not retry then
+                    retry = true
+                    return false
+                end
+                return true
+            end
+            local log_buffer, err = Batch:new(func_to_send, config)
+
+            if not log_buffer then
+                ngx.say(err)
+            end
+
+            log_buffer:push({hello='world'})
+            log_buffer:push({hello='world'})
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+done
+--- error_log
+Batch Processor[log buffer] failed to process entries
+Batch Processor[log buffer] successfully processed the entries
+--- wait: 0.5
+
+
+
+=== TEST 6: Exceeding max retry count
+--- config
+    location /t {
+        content_by_lua_block {
+            local Batch = require("apisix.plugins.batch-processor")
+            local config = {
+                max_retry_count  = 2,
+                batch_max_size = 2,
+                process_delay  = 0,
+                retry_delay  = 0,
+            }
+            local func_to_send = function(elements)
+                return false
+            end
+            local log_buffer, err = Batch:new(func_to_send, config)
+
+            if not log_buffer then
+                ngx.say(err)
+            end
+
+            log_buffer:push({hello='world'})
+            log_buffer:push({hello='world'})
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+done
+--- no_error_log
+BatchProcessor[log buffer] buffer duration exceeded, activating buffer flush
+--- error_log
+Batch Processor[log buffer] failed to process entries
+Batch Processor[log buffer] exceeded the max_retry_count
+--- wait: 0.5
+
+
+
+=== TEST 7: two batches
+--- config
+    location /t {
+        content_by_lua_block {
+            local Batch = require("apisix.plugins.batch-processor")
+            local core = require("apisix.core")
+            local count = 0
+            local config = {
+                max_retry_count  = 2,
+                batch_max_size = 2,
+                process_delay  = 0,
+                retry_delay  = 0,
+            }
+            local func_to_send = function(elements)
+                count = count + 1
+                core.log.info("batch[", count , "] sent")
 
 Review comment:
   we need to print the `elements` for checking if it all right.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r378644474
 
 

 ##########
 File path: t/plugin/batch-processor-refac.t
 ##########
 @@ -0,0 +1,374 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+log_level('debug');
+repeat_each(1);
+no_long_string();
+no_root_location();
+run_tests;
+
+__DATA__
+
+=== TEST 1: wrong configuration parameters
+--- config
+    location /t {
+        content_by_lua_block {
+            local Batch = require("apisix.plugins.batch-processor")
+            local func_to_send = function(elements)
+                return true
+            end
+
+            local config = {
+                max_retry_count  = 2,
+                batch_max_size = 1,
+                process_delay  = 0,
+                retry_delay  = 0,
+            }
+
+            local log_buffer, err = Batch:new("", config)
+
+            if log_buffer then
+                log_buffer:push({hello='world'})
+                ngx.say("done")
+            end
+
+            if not log_buffer then
+                ngx.say("failed")
+            end
+
+        }
+    }
+--- request
+GET /t
+--- response_body
+failed
+--- wait: 0.5
+
+
+=== TEST 2: sanity
+--- config
+    location /t {
+        content_by_lua_block {
+            local Batch = require("apisix.plugins.batch-processor")
+            local func_to_send = function(elements)
+                return true
+            end
+
+            local config = {
+                max_retry_count  = 2,
+                batch_max_size = 1,
+                process_delay  = 0,
+                retry_delay  = 0,
+            }
+
+            local log_buffer, err = Batch:new(func_to_send, config)
+
+            if not log_buffer then
+                ngx.say(err)
+            end
+
+            log_buffer:push({hello='world'})
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+done
+--- error_log
+Batch Processor[log buffer] successfully processed the entries
+--- wait: 0.5
+
+=== TEST 3: batch processor timeout exceeded
 
 Review comment:
   https://github.com/apache/incubator-apisix/blob/fda20d99d55d91905622b9d780e4dce79d128e76/Contributing.md#check-code-style-and-test-case-style
   
   check your code style and test case style

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

Posted by GitBox <gi...@apache.org>.
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r379945034
 
 

 ##########
 File path: lua/apisix/plugins/batch-processor.lua
 ##########
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+    __index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+    type = "object",
+    properties = {
+        name = {type = "string", default = "log buffer"},
+        max_retry_count = {type = "integer", minimum = 0, default= 0},
+        retry_delay = {type = "integer", minimum = 0, default= 1},
+        buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted
+        inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive
+        batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted
+    }
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+    local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+    if not hdl then
+        core.log.error("failed to create process timer: ", err)
+        return
+    end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+    if premature then
+        return
+    end
+
+    local ok, err = batch_processor.func(batch.entries)
+    if ok then
+        ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name))
+
+    else
+        batch.retry_count = batch.retry_count + 1
+        if batch.retry_count < batch_processor.max_retry_count then
+            core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err)
+            schedule_func_exec(batch_processor, batch_processor.retry_delay, batch)
+        else
+            core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count))
+        end
+    end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
+    if premature then
+        return
+    end
+
+    if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or
+            now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then
+        ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name))
+        batch_processor:process_buffer()
+        batch_processor.isTimerRunning = false
+        return
+    end
+
+    -- buffer duration did not exceed or the buffer is active, extending the timer
+    ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", batch_processor.name))
+    create_buffer_timer(batch_processor)
+end
+
+
+create_buffer_timer = function(batch_processor)
+    local hdl, err = timer_at(batch_processor.inactive_timeout, flush_buffer, batch_processor)
+    if not hdl then
+        core.log.error("failed to create buffer timer: ", err)
+        return
+    end
+    batch_processor.isTimerRunning = true
+end
+
+
+function Batch_Processor:new(func, config)
+    local ok, err = core.schema.check(schema, config)
+
+    if not ok then
+        return err
+    end
+
+    if not(type(func) == "function") then
+        return nil, "Invalid argument, arg #1 must be a function"
+    end
+
+    local batch_processor = {
+        func = func,
+        buffer_duration = config.buffer_duration,
+        inactive_timeout = config.inactive_timeout,
+        max_retry_count = config.max_retry_count,
+        batch_max_size = config.batch_max_size,
+        retry_delay = config.retry_delay,
+        name = config.name,
+        batch_to_process = {},
+        entry_buffer = { entries = {}, retry_count = 0},
+        isTimerRunning = false,
+        first_entry_t = 0,
+        last_entry_t = 0
+    }
+
+    return setmetatable(batch_processor, Batch_Processor_mt)
+end
+
+
+function Batch_Processor:push(entry)
+    -- if the batch size is one then immediately send for processing
+    if self.batch_max_size == 1 then
+        local batch = { entries = { entry }, retry_count = 0 }
+        schedule_func_exec(self, 0, batch)
+    end
+
+    local entries = self.entry_buffer.entries
+    table.insert(entries, entry)
+
+    if #entries == 1 then
+        self.first_entry_t = now()
+    end
+    self.last_entry_t = now()
+
+    if self.batch_max_size <= #entries then
+        ngx_log(DEBUG, fmt("batch processor[%s] batch max size has exceeded", self.name))
+        self:process_buffer()
+    elseif not self.isTimerRunning then
+        create_buffer_timer(self)
+    end
+
+end
+
+
+function Batch_Processor:process_buffer()
+    if #self.entry_buffer.entries > 0 then
+        ngx_log(DEBUG, fmt("tranferring buffer entries to processing pipe line, buffercount[%d]", #self.entry_buffer.entries))
+        self.batch_to_process[#self.batch_to_process + 1] = self.entry_buffer
+        self.entry_buffer = { entries = {}, retry_count = 0 }
+    end
+
+    if(#self.batch_to_process > 0) then
+        repeat
 
 Review comment:
   How about this style?
   
   ```lua
   for _, batch in ipairs(self.batch_to_process) do
       schedule_func_exec(self, 0, batch)
   end
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services