You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by we...@apache.org on 2020/02/20 07:25:29 UTC

[incubator-apisix] branch master updated: feature: Batch processor implementation to aggregate logs in batch (#1121)

This is an automated email from the ASF dual-hosted git repository.

wenming pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new d50727b  feature: Batch processor implementation to aggregate logs in batch  (#1121)
d50727b is described below

commit d50727bcf08ae3ac271e5b3eb299dfd643f85715
Author: Nirojan Selvanathan <ss...@gmail.com>
AuthorDate: Thu Feb 20 08:25:20 2020 +0100

    feature: Batch processor implementation to aggregate logs in batch  (#1121)
---
 doc/batch-processor.md                 |  72 ++++++
 lua/apisix/plugins/batch-processor.lua | 180 +++++++++++++++
 t/plugin/batch-processor.t             | 409 +++++++++++++++++++++++++++++++++
 3 files changed, 661 insertions(+)

diff --git a/doc/batch-processor.md b/doc/batch-processor.md
new file mode 100644
index 0000000..d4f06b8
--- /dev/null
+++ b/doc/batch-processor.md
@@ -0,0 +1,72 @@
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+# Batch Processor
+
+The batch processor can be used to aggregate entries(logs/any data) and process them in a batch.
+When the batch_max_size is set to zero the processor will execute each entry immediately. Setting the batch max size more
+than 1 will start aggregating the entries until it reaches the max size or the timeout expires.
+
+
+## Configurations
+
+The only mandatory parameter to create a batch processor is a function. The function will be executed when the batch reaches the max size
+or when the buffer duration exceeds.
+
+|Name           |Requirement    |Description|
+|-------        |-----          |------|
+|id             |optional       |A unique identifier to identity the batch processor|
+|batch_max_size |optional       |Max size of each batch, default is 1000|
+|inactive_timeout|optional      |maximum age in seconds when the buffer will be flushed if inactive, default is 5s|
+|buffer_duration|optional       |Maximum age in seconds of the oldest entry in a batch before the batch must be processed, default is 5|
+|max_retry_count|optional       |Maximum number of retries before removing from the processing pipe line; default is zero|
+|retry_delay    |optional       |Number of seconds the process execution should be delayed if the execution fails; default is 1|
+
+
+The following code shows an example of how to use a batch processor. The batch processor takes a function to be executed as the first
+argument and the batch configuration as the second parameter.
+
+
+```lua
+local bp = require("apisix.plugins.batch-processor")
+local func_to_execute = function(entries)
+            -- serialize to json array core.json.encode(entries)
+            -- process/send data
+            return true
+       end
+
+local config = {
+    max_retry_count  = 2,
+    buffer_duration  = 60,
+    inactive_timeout  = 5,
+    batch_max_size = 1,
+    retry_delay  = 0
+}
+
+
+local batch_processor, err = bp:new(func_to_execute, config)
+
+if batch_processor then
+    batch_processor:push({hello='world'})
+end
+```
+
+Note: Please make sure the batch max size (entry count) is within the limits of the function execution.
+The timer to flush the batch runs based on the `inactive_timeout` configuration. Thus, for optimal usage,
+keep the `inactive_timeout` smaller than the `buffer_duration`.
diff --git a/lua/apisix/plugins/batch-processor.lua b/lua/apisix/plugins/batch-processor.lua
new file mode 100644
index 0000000..d545c57
--- /dev/null
+++ b/lua/apisix/plugins/batch-processor.lua
@@ -0,0 +1,180 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local fmt = string.format
+local ipairs = ipairs
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+    __index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+    type = "object",
+    properties = {
+        name = {type = "string", default = "log buffer"},
+        max_retry_count = {type = "integer", minimum = 0, default= 0},
+        retry_delay = {type = "integer", minimum = 0, default= 1},
+        buffer_duration = {type = "integer", minimum = 1, default= 60},
+        inactive_timeout = {type = "integer", minimum = 1, default= 5},
+        batch_max_size = {type = "integer", minimum = 1, default= 1000},
+    }
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+    local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+    if not hdl then
+        core.log.error("failed to create process timer: ", err)
+        return
+    end
+end
+
+
+function execute_func(premature, batch_processor, batch)
+    if premature then
+        return
+    end
+
+    local ok, err = batch_processor.func(batch.entries)
+    if not ok then
+        batch.retry_count = batch.retry_count + 1
+        if batch.retry_count < batch_processor.max_retry_count then
+            core.log.warn(fmt("Batch Processor[%s] failed to process entries: ",
+                batch_processor.name), err)
+            schedule_func_exec(batch_processor, batch_processor.retry_delay, batch)
+        else
+            core.log.error(fmt(("Batch Processor[%s] exceeded the max_retry_count[%d] "
+                    .. "dropping the entries"), batch_processor.name, batch.retry_count))
+        end
+        return
+    end
+
+    core.log.debug(fmt("Batch Processor[%s] successfully processed the entries",
+        batch_processor.name))
+end
+
+
+local function flush_buffer(premature, batch_processor)
+    if premature then
+        return
+    end
+
+    if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or
+            now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then
+        core.log.debug(fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush",
+            batch_processor.name))
+        batch_processor:process_buffer()
+        batch_processor.is_timer_running = false
+        return
+    end
+
+    -- buffer duration did not exceed or the buffer is active, extending the timer
+    core.log.debug(fmt("BatchProcessor[%s] extending buffer timer", batch_processor.name))
+    create_buffer_timer(batch_processor)
+end
+
+
+function create_buffer_timer(batch_processor)
+    local hdl, err = timer_at(batch_processor.inactive_timeout, flush_buffer, batch_processor)
+    if not hdl then
+        core.log.error("failed to create buffer timer: ", err)
+        return
+    end
+    batch_processor.is_timer_running = true
+end
+
+
+function Batch_Processor:new(func, config)
+    local ok, err = core.schema.check(schema, config)
+    if not ok then
+        return err
+    end
+
+    if not(type(func) == "function") then
+        return nil, "Invalid argument, arg #1 must be a function"
+    end
+
+    local batch_processor = {
+        func = func,
+        buffer_duration = config.buffer_duration,
+        inactive_timeout = config.inactive_timeout,
+        max_retry_count = config.max_retry_count,
+        batch_max_size = config.batch_max_size,
+        retry_delay = config.retry_delay,
+        name = config.name,
+        batch_to_process = {},
+        entry_buffer = { entries = {}, retry_count = 0},
+        is_timer_running = false,
+        first_entry_t = 0,
+        last_entry_t = 0
+    }
+
+    return setmetatable(batch_processor, Batch_Processor_mt)
+end
+
+
+function Batch_Processor:push(entry)
+    -- if the batch size is one then immediately send for processing
+    if self.batch_max_size == 1 then
+        local batch = { entries = { entry }, retry_count = 0 }
+        schedule_func_exec(self, 0, batch)
+    end
+
+    local entries = self.entry_buffer.entries
+    table.insert(entries, entry)
+
+    if #entries == 1 then
+        self.first_entry_t = now()
+    end
+    self.last_entry_t = now()
+
+    if self.batch_max_size <= #entries then
+        core.log.debug(fmt("batch processor[%s] batch max size has exceeded", self.name))
+        self:process_buffer()
+    end
+
+    if not self.is_timer_running then
+        create_buffer_timer(self)
+    end
+end
+
+
+function Batch_Processor:process_buffer()
+    -- If entries are present in the buffer move the entries to processing
+    if #self.entry_buffer.entries > 0 then
+        core.log.debug(fmt("tranferring buffer entries to processing pipe line, buffercount[%d]",
+            #self.entry_buffer.entries))
+        self.batch_to_process[#self.batch_to_process + 1] = self.entry_buffer
+        self.entry_buffer = { entries = {}, retry_count = 0 }
+    end
+
+    for _, batch in ipairs(self.batch_to_process) do
+        schedule_func_exec(self, 0, batch)
+    end
+    self.batch_to_process = {}
+end
+
+
+return Batch_Processor
diff --git a/t/plugin/batch-processor.t b/t/plugin/batch-processor.t
new file mode 100644
index 0000000..1fd7795
--- /dev/null
+++ b/t/plugin/batch-processor.t
@@ -0,0 +1,409 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+log_level('debug');
+repeat_each(1);
+no_long_string();
+no_root_location();
+run_tests;
+
+__DATA__
+
+=== TEST 1: send invalid arguments for constructor
+--- config
+    location /t {
+        content_by_lua_block {
+            local Batch = require("apisix.plugins.batch-processor")
+            local config = {
+                max_retry_count  = 2,
+                batch_max_size = 1,
+                process_delay  = 0,
+                retry_delay  = 0,
+            }
+            local func_to_send = function(elements)
+                return true
+            end
+            local log_buffer, err = Batch:new("", config)
+
+            if log_buffer then
+                log_buffer:push({hello='world'})
+                ngx.say("done")
+            end
+
+            if not log_buffer then
+                ngx.say("failed")
+            end
+
+        }
+    }
+--- request
+GET /t
+--- response_body
+failed
+--- wait: 0.5
+
+
+
+=== TEST 2: sanity
+--- config
+    location /t {
+        content_by_lua_block {
+            local Batch = require("apisix.plugins.batch-processor")
+            local func_to_send = function(elements)
+                return true
+            end
+
+            local config = {
+                max_retry_count  = 2,
+                batch_max_size = 1,
+                process_delay  = 0,
+                retry_delay  = 0,
+            }
+
+            local log_buffer, err = Batch:new(func_to_send, config)
+
+            if not log_buffer then
+                ngx.say(err)
+            end
+
+            log_buffer:push({hello='world'})
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+done
+--- error_log
+Batch Processor[log buffer] successfully processed the entries
+--- wait: 0.5
+
+
+
+=== TEST 3: batch processor timeout exceeded
+--- config
+    location /t {
+        content_by_lua_block {
+            local Batch = require("apisix.plugins.batch-processor")
+            local config = {
+                max_retry_count  = 2,
+                batch_max_size = 2,
+                process_delay  = 0,
+                retry_delay  = 0,
+                inactive_timeout = 1
+            }
+            local func_to_send = function(elements)
+                return true
+            end
+            local log_buffer, err = Batch:new(func_to_send, config)
+
+            if not log_buffer then
+                ngx.say(err)
+            end
+
+            log_buffer:push({hello='world'})
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+done
+--- error_log
+BatchProcessor[log buffer] buffer duration exceeded, activating buffer flush
+Batch Processor[log buffer] successfully processed the entries
+--- wait: 3
+
+
+
+=== TEST 4: batch processor batch max size exceeded
+--- config
+    location /t {
+        content_by_lua_block {
+            local Batch = require("apisix.plugins.batch-processor")
+            local config = {
+                max_retry_count  = 2,
+                batch_max_size = 2,
+                process_delay  = 0,
+                retry_delay  = 0,
+            }
+            local func_to_send = function(elements)
+                return true
+            end
+            local log_buffer, err = Batch:new(func_to_send, config)
+
+            if not log_buffer then
+                ngx.say(err)
+            end
+
+            log_buffer:push({hello='world'})
+            log_buffer:push({hello='world'})
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+done
+--- no_error_log
+BatchProcessor[log buffer] activating flush due to no activity
+--- error_log
+batch processor[log buffer] batch max size has exceeded
+Batch Processor[log buffer] successfully processed the entries
+--- wait: 0.5
+
+
+
+=== TEST 5: first failed to process and second try success
+--- config
+    location /t {
+        content_by_lua_block {
+            local Batch = require("apisix.plugins.batch-processor")
+            local core = require("apisix.core")
+            local retry = false
+            local config = {
+                max_retry_count  = 2,
+                batch_max_size = 2,
+                process_delay  = 0,
+                retry_delay  = 0,
+            }
+            local func_to_send = function(elements)
+                if not retry then
+                    retry = true
+                    return false
+                end
+                return true
+            end
+            local log_buffer, err = Batch:new(func_to_send, config)
+
+            if not log_buffer then
+                ngx.say(err)
+            end
+
+            log_buffer:push({hello='world'})
+            log_buffer:push({hello='world'})
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+done
+--- error_log
+Batch Processor[log buffer] failed to process entries
+Batch Processor[log buffer] successfully processed the entries
+--- wait: 0.5
+
+
+
+=== TEST 6: Exceeding max retry count
+--- config
+    location /t {
+        content_by_lua_block {
+            local Batch = require("apisix.plugins.batch-processor")
+            local config = {
+                max_retry_count  = 2,
+                batch_max_size = 2,
+                process_delay  = 0,
+                retry_delay  = 0,
+            }
+            local func_to_send = function(elements)
+                return false
+            end
+            local log_buffer, err = Batch:new(func_to_send, config)
+
+            if not log_buffer then
+                ngx.say(err)
+            end
+
+            log_buffer:push({hello='world'})
+            log_buffer:push({hello='world'})
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+done
+--- no_error_log
+BatchProcessor[log buffer] buffer duration exceeded, activating buffer flush
+--- error_log
+Batch Processor[log buffer] failed to process entries
+Batch Processor[log buffer] exceeded the max_retry_count
+--- wait: 0.5
+
+
+
+=== TEST 7: two batches
+--- config
+    location /t {
+        content_by_lua_block {
+            local Batch = require("apisix.plugins.batch-processor")
+            local core = require("apisix.core")
+            local count = 0
+            local config = {
+                max_retry_count  = 2,
+                batch_max_size = 2,
+                process_delay  = 0,
+                retry_delay  = 0,
+            }
+            local func_to_send = function(elements)
+                count = count + 1
+                core.log.info("batch[", count , "] sent")
+                return true
+            end
+            local log_buffer, err = Batch:new(func_to_send, config)
+
+            if not log_buffer then
+                ngx.say(err)
+            end
+
+            log_buffer:push({hello='world'})
+            log_buffer:push({hello='world'})
+            log_buffer:push({hello='world'})
+            log_buffer:push({hello='world'})
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+done
+--- no_error_log
+BatchProcessor[log buffer] activating flush due to no activity
+--- error_log
+batch[1] sent
+batch[2] sent
+--- wait: 0.5
+
+
+
+=== TEST 8: batch processor retry count 0 and fail processing
+--- config
+    location /t {
+        content_by_lua_block {
+            local Batch = require("apisix.plugins.batch-processor")
+            local config = {
+                max_retry_count  = 0,
+                batch_max_size = 2,
+                process_delay  = 0,
+                retry_delay  = 0,
+            }
+            local func_to_send = function(elements)
+                return false
+            end
+            local log_buffer, err = Batch:new(func_to_send, config)
+
+            if not log_buffer then
+                ngx.say(err)
+            end
+
+            log_buffer:push({hello='world'})
+            log_buffer:push({hello='world'})
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+done
+--- no_error_log
+BatchProcessor[log buffer] activating flush due to no activity
+Batch Processor[log buffer] failed to process entries
+--- error_log
+Batch Processor[log buffer] exceeded the max_retry_count
+--- wait: 0.5
+
+
+
+=== TEST 9: batch processor timeout exceeded
+--- config
+    location /t {
+        content_by_lua_block {
+            local Batch = require("apisix.plugins.batch-processor")
+            local config = {
+                max_retry_count  = 2,
+                batch_max_size = 2,
+                process_delay  = 0,
+                retry_delay  = 0,
+                buffer_duration = 60,
+                inactive_timeout = 1,
+            }
+            local func_to_send = function(elements)
+                return true
+            end
+            local log_buffer, err = Batch:new(func_to_send, config)
+
+            if not log_buffer then
+                ngx.say(err)
+            end
+
+            log_buffer:push({hello='world'})
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+done
+--- error_log
+BatchProcessor[log buffer] buffer duration exceeded, activating buffer flush
+Batch Processor[log buffer] successfully processed the entries
+--- wait: 3
+
+
+
+=== TEST 10: json encode and log elements
+--- config
+    location /t {
+        content_by_lua_block {
+            local Batch = require("apisix.plugins.batch-processor")
+            local core = require("apisix.core")
+            local config = {
+                max_retry_count  = 2,
+                batch_max_size = 2,
+                process_delay  = 0,
+                retry_delay  = 0,
+            }
+            local func_to_send = function(elements)
+                core.log.info(core.json.encode(elements))
+                return true
+            end
+            local log_buffer, err = Batch:new(func_to_send, config)
+
+            if not log_buffer then
+                ngx.say(err)
+            end
+
+            log_buffer:push({msg='1'})
+            log_buffer:push({msg='2'})
+            log_buffer:push({msg='3'})
+            log_buffer:push({msg='4'})
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+done
+--- no_error_log
+BatchProcessor[log buffer] activating flush due to no activity
+--- error_log
+[{"msg":"1"},{"msg":"2"}]
+[{"msg":"3"},{"msg":"4"}]
+--- wait: 0.5