You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by we...@apache.org on 2020/02/20 07:25:29 UTC
[incubator-apisix] branch master updated: feature: Batch processor
implementation to aggregate logs in batch (#1121)
This is an automated email from the ASF dual-hosted git repository.
wenming pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-apisix.git
The following commit(s) were added to refs/heads/master by this push:
new d50727b feature: Batch processor implementation to aggregate logs in batch (#1121)
d50727b is described below
commit d50727bcf08ae3ac271e5b3eb299dfd643f85715
Author: Nirojan Selvanathan <ss...@gmail.com>
AuthorDate: Thu Feb 20 08:25:20 2020 +0100
feature: Batch processor implementation to aggregate logs in batch (#1121)
---
doc/batch-processor.md | 72 ++++++
lua/apisix/plugins/batch-processor.lua | 180 +++++++++++++++
t/plugin/batch-processor.t | 409 +++++++++++++++++++++++++++++++++
3 files changed, 661 insertions(+)
diff --git a/doc/batch-processor.md b/doc/batch-processor.md
new file mode 100644
index 0000000..d4f06b8
--- /dev/null
+++ b/doc/batch-processor.md
@@ -0,0 +1,72 @@
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+# Batch Processor
+
+The batch processor can be used to aggregate entries(logs/any data) and process them in a batch.
+When the batch_max_size is set to zero the processor will execute each entry immediately. Setting the batch max size more
+than 1 will start aggregating the entries until it reaches the max size or the timeout expires.
+
+
+## Configurations
+
+The only mandatory parameter to create a batch processor is a function. The function will be executed when the batch reaches the max size
+or when the buffer duration exceeds.
+
+|Name |Requirement |Description|
+|------- |----- |------|
+|id |optional |A unique identifier to identity the batch processor|
+|batch_max_size |optional |Max size of each batch, default is 1000|
+|inactive_timeout|optional |maximum age in seconds when the buffer will be flushed if inactive, default is 5s|
+|buffer_duration|optional |Maximum age in seconds of the oldest entry in a batch before the batch must be processed, default is 5|
+|max_retry_count|optional |Maximum number of retries before removing from the processing pipe line; default is zero|
+|retry_delay |optional |Number of seconds the process execution should be delayed if the execution fails; default is 1|
+
+
+The following code shows an example of how to use a batch processor. The batch processor takes a function to be executed as the first
+argument and the batch configuration as the second parameter.
+
+
+```lua
+local bp = require("apisix.plugins.batch-processor")
+local func_to_execute = function(entries)
+ -- serialize to json array core.json.encode(entries)
+ -- process/send data
+ return true
+ end
+
+local config = {
+ max_retry_count = 2,
+ buffer_duration = 60,
+ inactive_timeout = 5,
+ batch_max_size = 1,
+ retry_delay = 0
+}
+
+
+local batch_processor, err = bp:new(func_to_execute, config)
+
+if batch_processor then
+ batch_processor:push({hello='world'})
+end
+```
+
+Note: Please make sure the batch max size (entry count) is within the limits of the function execution.
+The timer to flush the batch runs based on the `inactive_timeout` configuration. Thus, for optimal usage,
+keep the `inactive_timeout` smaller than the `buffer_duration`.
diff --git a/lua/apisix/plugins/batch-processor.lua b/lua/apisix/plugins/batch-processor.lua
new file mode 100644
index 0000000..d545c57
--- /dev/null
+++ b/lua/apisix/plugins/batch-processor.lua
@@ -0,0 +1,180 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local fmt = string.format
+local ipairs = ipairs
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+ __index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+ type = "object",
+ properties = {
+ name = {type = "string", default = "log buffer"},
+ max_retry_count = {type = "integer", minimum = 0, default= 0},
+ retry_delay = {type = "integer", minimum = 0, default= 1},
+ buffer_duration = {type = "integer", minimum = 1, default= 60},
+ inactive_timeout = {type = "integer", minimum = 1, default= 5},
+ batch_max_size = {type = "integer", minimum = 1, default= 1000},
+ }
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+ local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+ if not hdl then
+ core.log.error("failed to create process timer: ", err)
+ return
+ end
+end
+
+
+function execute_func(premature, batch_processor, batch)
+ if premature then
+ return
+ end
+
+ local ok, err = batch_processor.func(batch.entries)
+ if not ok then
+ batch.retry_count = batch.retry_count + 1
+ if batch.retry_count < batch_processor.max_retry_count then
+ core.log.warn(fmt("Batch Processor[%s] failed to process entries: ",
+ batch_processor.name), err)
+ schedule_func_exec(batch_processor, batch_processor.retry_delay, batch)
+ else
+ core.log.error(fmt(("Batch Processor[%s] exceeded the max_retry_count[%d] "
+ .. "dropping the entries"), batch_processor.name, batch.retry_count))
+ end
+ return
+ end
+
+ core.log.debug(fmt("Batch Processor[%s] successfully processed the entries",
+ batch_processor.name))
+end
+
+
+local function flush_buffer(premature, batch_processor)
+ if premature then
+ return
+ end
+
+ if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or
+ now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then
+ core.log.debug(fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush",
+ batch_processor.name))
+ batch_processor:process_buffer()
+ batch_processor.is_timer_running = false
+ return
+ end
+
+ -- buffer duration did not exceed or the buffer is active, extending the timer
+ core.log.debug(fmt("BatchProcessor[%s] extending buffer timer", batch_processor.name))
+ create_buffer_timer(batch_processor)
+end
+
+
+function create_buffer_timer(batch_processor)
+ local hdl, err = timer_at(batch_processor.inactive_timeout, flush_buffer, batch_processor)
+ if not hdl then
+ core.log.error("failed to create buffer timer: ", err)
+ return
+ end
+ batch_processor.is_timer_running = true
+end
+
+
+function Batch_Processor:new(func, config)
+ local ok, err = core.schema.check(schema, config)
+ if not ok then
+ return err
+ end
+
+ if not(type(func) == "function") then
+ return nil, "Invalid argument, arg #1 must be a function"
+ end
+
+ local batch_processor = {
+ func = func,
+ buffer_duration = config.buffer_duration,
+ inactive_timeout = config.inactive_timeout,
+ max_retry_count = config.max_retry_count,
+ batch_max_size = config.batch_max_size,
+ retry_delay = config.retry_delay,
+ name = config.name,
+ batch_to_process = {},
+ entry_buffer = { entries = {}, retry_count = 0},
+ is_timer_running = false,
+ first_entry_t = 0,
+ last_entry_t = 0
+ }
+
+ return setmetatable(batch_processor, Batch_Processor_mt)
+end
+
+
+function Batch_Processor:push(entry)
+ -- if the batch size is one then immediately send for processing
+ if self.batch_max_size == 1 then
+ local batch = { entries = { entry }, retry_count = 0 }
+ schedule_func_exec(self, 0, batch)
+ end
+
+ local entries = self.entry_buffer.entries
+ table.insert(entries, entry)
+
+ if #entries == 1 then
+ self.first_entry_t = now()
+ end
+ self.last_entry_t = now()
+
+ if self.batch_max_size <= #entries then
+ core.log.debug(fmt("batch processor[%s] batch max size has exceeded", self.name))
+ self:process_buffer()
+ end
+
+ if not self.is_timer_running then
+ create_buffer_timer(self)
+ end
+end
+
+
+function Batch_Processor:process_buffer()
+ -- If entries are present in the buffer move the entries to processing
+ if #self.entry_buffer.entries > 0 then
+ core.log.debug(fmt("tranferring buffer entries to processing pipe line, buffercount[%d]",
+ #self.entry_buffer.entries))
+ self.batch_to_process[#self.batch_to_process + 1] = self.entry_buffer
+ self.entry_buffer = { entries = {}, retry_count = 0 }
+ end
+
+ for _, batch in ipairs(self.batch_to_process) do
+ schedule_func_exec(self, 0, batch)
+ end
+ self.batch_to_process = {}
+end
+
+
+return Batch_Processor
diff --git a/t/plugin/batch-processor.t b/t/plugin/batch-processor.t
new file mode 100644
index 0000000..1fd7795
--- /dev/null
+++ b/t/plugin/batch-processor.t
@@ -0,0 +1,409 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+log_level('debug');
+repeat_each(1);
+no_long_string();
+no_root_location();
+run_tests;
+
+__DATA__
+
+=== TEST 1: send invalid arguments for constructor
+--- config
+ location /t {
+ content_by_lua_block {
+ local Batch = require("apisix.plugins.batch-processor")
+ local config = {
+ max_retry_count = 2,
+ batch_max_size = 1,
+ process_delay = 0,
+ retry_delay = 0,
+ }
+ local func_to_send = function(elements)
+ return true
+ end
+ local log_buffer, err = Batch:new("", config)
+
+ if log_buffer then
+ log_buffer:push({hello='world'})
+ ngx.say("done")
+ end
+
+ if not log_buffer then
+ ngx.say("failed")
+ end
+
+ }
+ }
+--- request
+GET /t
+--- response_body
+failed
+--- wait: 0.5
+
+
+
+=== TEST 2: sanity
+--- config
+ location /t {
+ content_by_lua_block {
+ local Batch = require("apisix.plugins.batch-processor")
+ local func_to_send = function(elements)
+ return true
+ end
+
+ local config = {
+ max_retry_count = 2,
+ batch_max_size = 1,
+ process_delay = 0,
+ retry_delay = 0,
+ }
+
+ local log_buffer, err = Batch:new(func_to_send, config)
+
+ if not log_buffer then
+ ngx.say(err)
+ end
+
+ log_buffer:push({hello='world'})
+ ngx.say("done")
+ }
+ }
+--- request
+GET /t
+--- response_body
+done
+--- error_log
+Batch Processor[log buffer] successfully processed the entries
+--- wait: 0.5
+
+
+
+=== TEST 3: batch processor timeout exceeded
+--- config
+ location /t {
+ content_by_lua_block {
+ local Batch = require("apisix.plugins.batch-processor")
+ local config = {
+ max_retry_count = 2,
+ batch_max_size = 2,
+ process_delay = 0,
+ retry_delay = 0,
+ inactive_timeout = 1
+ }
+ local func_to_send = function(elements)
+ return true
+ end
+ local log_buffer, err = Batch:new(func_to_send, config)
+
+ if not log_buffer then
+ ngx.say(err)
+ end
+
+ log_buffer:push({hello='world'})
+ ngx.say("done")
+ }
+ }
+--- request
+GET /t
+--- response_body
+done
+--- error_log
+BatchProcessor[log buffer] buffer duration exceeded, activating buffer flush
+Batch Processor[log buffer] successfully processed the entries
+--- wait: 3
+
+
+
+=== TEST 4: batch processor batch max size exceeded
+--- config
+ location /t {
+ content_by_lua_block {
+ local Batch = require("apisix.plugins.batch-processor")
+ local config = {
+ max_retry_count = 2,
+ batch_max_size = 2,
+ process_delay = 0,
+ retry_delay = 0,
+ }
+ local func_to_send = function(elements)
+ return true
+ end
+ local log_buffer, err = Batch:new(func_to_send, config)
+
+ if not log_buffer then
+ ngx.say(err)
+ end
+
+ log_buffer:push({hello='world'})
+ log_buffer:push({hello='world'})
+ ngx.say("done")
+ }
+ }
+--- request
+GET /t
+--- response_body
+done
+--- no_error_log
+BatchProcessor[log buffer] activating flush due to no activity
+--- error_log
+batch processor[log buffer] batch max size has exceeded
+Batch Processor[log buffer] successfully processed the entries
+--- wait: 0.5
+
+
+
+=== TEST 5: first failed to process and second try success
+--- config
+ location /t {
+ content_by_lua_block {
+ local Batch = require("apisix.plugins.batch-processor")
+ local core = require("apisix.core")
+ local retry = false
+ local config = {
+ max_retry_count = 2,
+ batch_max_size = 2,
+ process_delay = 0,
+ retry_delay = 0,
+ }
+ local func_to_send = function(elements)
+ if not retry then
+ retry = true
+ return false
+ end
+ return true
+ end
+ local log_buffer, err = Batch:new(func_to_send, config)
+
+ if not log_buffer then
+ ngx.say(err)
+ end
+
+ log_buffer:push({hello='world'})
+ log_buffer:push({hello='world'})
+ ngx.say("done")
+ }
+ }
+--- request
+GET /t
+--- response_body
+done
+--- error_log
+Batch Processor[log buffer] failed to process entries
+Batch Processor[log buffer] successfully processed the entries
+--- wait: 0.5
+
+
+
+=== TEST 6: Exceeding max retry count
+--- config
+ location /t {
+ content_by_lua_block {
+ local Batch = require("apisix.plugins.batch-processor")
+ local config = {
+ max_retry_count = 2,
+ batch_max_size = 2,
+ process_delay = 0,
+ retry_delay = 0,
+ }
+ local func_to_send = function(elements)
+ return false
+ end
+ local log_buffer, err = Batch:new(func_to_send, config)
+
+ if not log_buffer then
+ ngx.say(err)
+ end
+
+ log_buffer:push({hello='world'})
+ log_buffer:push({hello='world'})
+ ngx.say("done")
+ }
+ }
+--- request
+GET /t
+--- response_body
+done
+--- no_error_log
+BatchProcessor[log buffer] buffer duration exceeded, activating buffer flush
+--- error_log
+Batch Processor[log buffer] failed to process entries
+Batch Processor[log buffer] exceeded the max_retry_count
+--- wait: 0.5
+
+
+
+=== TEST 7: two batches
+--- config
+ location /t {
+ content_by_lua_block {
+ local Batch = require("apisix.plugins.batch-processor")
+ local core = require("apisix.core")
+ local count = 0
+ local config = {
+ max_retry_count = 2,
+ batch_max_size = 2,
+ process_delay = 0,
+ retry_delay = 0,
+ }
+ local func_to_send = function(elements)
+ count = count + 1
+ core.log.info("batch[", count , "] sent")
+ return true
+ end
+ local log_buffer, err = Batch:new(func_to_send, config)
+
+ if not log_buffer then
+ ngx.say(err)
+ end
+
+ log_buffer:push({hello='world'})
+ log_buffer:push({hello='world'})
+ log_buffer:push({hello='world'})
+ log_buffer:push({hello='world'})
+ ngx.say("done")
+ }
+ }
+--- request
+GET /t
+--- response_body
+done
+--- no_error_log
+BatchProcessor[log buffer] activating flush due to no activity
+--- error_log
+batch[1] sent
+batch[2] sent
+--- wait: 0.5
+
+
+
+=== TEST 8: batch processor retry count 0 and fail processing
+--- config
+ location /t {
+ content_by_lua_block {
+ local Batch = require("apisix.plugins.batch-processor")
+ local config = {
+ max_retry_count = 0,
+ batch_max_size = 2,
+ process_delay = 0,
+ retry_delay = 0,
+ }
+ local func_to_send = function(elements)
+ return false
+ end
+ local log_buffer, err = Batch:new(func_to_send, config)
+
+ if not log_buffer then
+ ngx.say(err)
+ end
+
+ log_buffer:push({hello='world'})
+ log_buffer:push({hello='world'})
+ ngx.say("done")
+ }
+ }
+--- request
+GET /t
+--- response_body
+done
+--- no_error_log
+BatchProcessor[log buffer] activating flush due to no activity
+Batch Processor[log buffer] failed to process entries
+--- error_log
+Batch Processor[log buffer] exceeded the max_retry_count
+--- wait: 0.5
+
+
+
+=== TEST 9: batch processor timeout exceeded
+--- config
+ location /t {
+ content_by_lua_block {
+ local Batch = require("apisix.plugins.batch-processor")
+ local config = {
+ max_retry_count = 2,
+ batch_max_size = 2,
+ process_delay = 0,
+ retry_delay = 0,
+ buffer_duration = 60,
+ inactive_timeout = 1,
+ }
+ local func_to_send = function(elements)
+ return true
+ end
+ local log_buffer, err = Batch:new(func_to_send, config)
+
+ if not log_buffer then
+ ngx.say(err)
+ end
+
+ log_buffer:push({hello='world'})
+ ngx.say("done")
+ }
+ }
+--- request
+GET /t
+--- response_body
+done
+--- error_log
+BatchProcessor[log buffer] buffer duration exceeded, activating buffer flush
+Batch Processor[log buffer] successfully processed the entries
+--- wait: 3
+
+
+
+=== TEST 10: json encode and log elements
+--- config
+ location /t {
+ content_by_lua_block {
+ local Batch = require("apisix.plugins.batch-processor")
+ local core = require("apisix.core")
+ local config = {
+ max_retry_count = 2,
+ batch_max_size = 2,
+ process_delay = 0,
+ retry_delay = 0,
+ }
+ local func_to_send = function(elements)
+ core.log.info(core.json.encode(elements))
+ return true
+ end
+ local log_buffer, err = Batch:new(func_to_send, config)
+
+ if not log_buffer then
+ ngx.say(err)
+ end
+
+ log_buffer:push({msg='1'})
+ log_buffer:push({msg='2'})
+ log_buffer:push({msg='3'})
+ log_buffer:push({msg='4'})
+ ngx.say("done")
+ }
+ }
+--- request
+GET /t
+--- response_body
+done
+--- no_error_log
+BatchProcessor[log buffer] activating flush due to no activity
+--- error_log
+[{"msg":"1"},{"msg":"2"}]
+[{"msg":"3"},{"msg":"4"}]
+--- wait: 0.5