You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by sp...@apache.org on 2022/05/05 02:02:42 UTC

[apisix] branch master updated: feat(stream): port syslog plugin (#6953)

This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b7e3fde0 feat(stream): port syslog plugin (#6953)
3b7e3fde0 is described below

commit 3b7e3fde0559155c79060097a27c1d3010df5046
Author: tzssangglass <tz...@gmail.com>
AuthorDate: Thu May 5 10:02:36 2022 +0800

    feat(stream): port syslog plugin (#6953)
---
 Makefile                                       |   3 +
 apisix/plugins/syslog.lua                      |  82 +-----
 apisix/plugins/{syslog.lua => syslog/init.lua} |  51 +---
 apisix/stream/plugins/syslog.lua               |  82 ++++++
 apisix/utils/batch-processor.lua               |   7 +-
 conf/config-default.yaml                       |   1 +
 t/stream-plugin/syslog.t                       | 343 +++++++++++++++++++++++++
 7 files changed, 441 insertions(+), 128 deletions(-)

diff --git a/Makefile b/Makefile
index 39131e20c..3bc974295 100644
--- a/Makefile
+++ b/Makefile
@@ -334,6 +334,9 @@ install: runtime
 	$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/plugins/slslog
 	$(ENV_INSTALL) apisix/plugins/slslog/*.lua $(ENV_INST_LUADIR)/apisix/plugins/slslog/
 
+	$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/plugins/syslog
+	$(ENV_INSTALL) apisix/plugins/syslog/*.lua $(ENV_INST_LUADIR)/apisix/plugins/syslog/
+
 	$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/plugins/zipkin
 	$(ENV_INSTALL) apisix/plugins/zipkin/*.lua $(ENV_INST_LUADIR)/apisix/plugins/zipkin/
 
diff --git a/apisix/plugins/syslog.lua b/apisix/plugins/syslog.lua
index be73cac55..7eb4675c0 100644
--- a/apisix/plugins/syslog.lua
+++ b/apisix/plugins/syslog.lua
@@ -18,12 +18,11 @@
 local core = require("apisix.core")
 local log_util = require("apisix.utils.log-util")
 local bp_manager_mod = require("apisix.utils.batch-processor-manager")
-local logger_socket = require("resty.logger.socket")
+local syslog = require("apisix.plugins.syslog.init")
 local plugin_name = "syslog"
 local ngx = ngx
 
-
-local batch_processor_manager = bp_manager_mod.new("sys logger")
+local batch_processor_manager = bp_manager_mod.new("http sys logger")
 local schema = {
     type = "object",
     properties = {
@@ -43,11 +42,6 @@ local schema = {
 }
 
 
-local lrucache = core.lrucache.new({
-    ttl = 300, count = 512, serial_creating = true,
-})
-
-
 local schema = batch_processor_manager:wrap_schema(schema)
 
 local _M = {
@@ -55,6 +49,7 @@ local _M = {
     priority = 401,
     name = plugin_name,
     schema = schema,
+    flush_syslog = syslog.flush_syslog,
 }
 
 
@@ -70,78 +65,9 @@ function _M.check_schema(conf)
 end
 
 
-function _M.flush_syslog(logger)
-    local ok, err = logger:flush(logger)
-    if not ok then
-        core.log.error("failed to flush message:", err)
-    end
-
-    return ok
-end
-
-
-local function send_syslog_data(conf, log_message, api_ctx)
-    local err_msg
-    local res = true
-
-    core.log.info("sending a batch logs to ", conf.host, ":", conf.port)
-
-    -- fetch it from lrucache
-    local logger, err = core.lrucache.plugin_ctx(
-        lrucache, api_ctx, nil, logger_socket.new, logger_socket, {
-            host = conf.host,
-            port = conf.port,
-            flush_limit = conf.flush_limit,
-            drop_limit = conf.drop_limit,
-            timeout = conf.timeout,
-            sock_type = conf.sock_type,
-            pool_size = conf.pool_size,
-            tls = conf.tls,
-        }
-    )
-
-    if not logger then
-        res = false
-        err_msg = "failed when initiating the sys logger processor".. err
-    end
-
-    -- reuse the logger object
-    local ok, err = logger:log(core.json.encode(log_message))
-    if not ok then
-        res = false
-        err_msg = "failed to log message" .. err
-    end
-
-    return res, err_msg
-end
-
-
--- log phase in APISIX
 function _M.log(conf, ctx)
     local entry = log_util.get_full_log(ngx, conf)
-
-    if batch_processor_manager:add_entry(conf, entry) then
-        return
-    end
-
-    -- Generate a function to be executed by the batch processor
-    local cp_ctx = core.table.clone(ctx)
-    local func = function(entries, batch_max_size)
-        local data, err
-        if batch_max_size == 1 then
-            data, err = core.json.encode(entries[1]) -- encode as single {}
-        else
-            data, err = core.json.encode(entries) -- encode as array [{}]
-        end
-
-        if not data then
-            return false, 'error occurred while encoding the data: ' .. err
-        end
-
-        return send_syslog_data(conf, data, cp_ctx)
-    end
-
-    batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)
+    syslog.push_entry(conf, ctx, entry)
 end
 
 
diff --git a/apisix/plugins/syslog.lua b/apisix/plugins/syslog/init.lua
similarity index 68%
copy from apisix/plugins/syslog.lua
copy to apisix/plugins/syslog/init.lua
index be73cac55..24f2f62ab 100644
--- a/apisix/plugins/syslog.lua
+++ b/apisix/plugins/syslog/init.lua
@@ -16,59 +16,16 @@
 --
 
 local core = require("apisix.core")
-local log_util = require("apisix.utils.log-util")
 local bp_manager_mod = require("apisix.utils.batch-processor-manager")
 local logger_socket = require("resty.logger.socket")
-local plugin_name = "syslog"
-local ngx = ngx
-
 
 local batch_processor_manager = bp_manager_mod.new("sys logger")
-local schema = {
-    type = "object",
-    properties = {
-        host = {type = "string"},
-        port = {type = "integer"},
-        max_retry_times = {type = "integer", minimum = 1},
-        retry_interval = {type = "integer", minimum = 0},
-        flush_limit = {type = "integer", minimum = 1, default = 4096},
-        drop_limit = {type = "integer", default = 1048576},
-        timeout = {type = "integer", minimum = 1, default = 3000},
-        sock_type = {type = "string", default = "tcp", enum = {"tcp", "udp"}},
-        pool_size = {type = "integer", minimum = 5, default = 5},
-        tls = {type = "boolean", default = false},
-        include_req_body = {type = "boolean", default = false}
-    },
-    required = {"host", "port"}
-}
-
 
 local lrucache = core.lrucache.new({
     ttl = 300, count = 512, serial_creating = true,
 })
 
-
-local schema = batch_processor_manager:wrap_schema(schema)
-
-local _M = {
-    version = 0.1,
-    priority = 401,
-    name = plugin_name,
-    schema = schema,
-}
-
-
-function _M.check_schema(conf)
-    local ok, err = core.schema.check(schema, conf)
-    if not ok then
-        return false, err
-    end
-
-    conf.max_retry_count = conf.max_retry_times or conf.max_retry_count
-    conf.retry_delay = conf.retry_interval or conf.retry_delay
-    return true
-end
-
+local _M = {}
 
 function _M.flush_syslog(logger)
     local ok, err = logger:flush(logger)
@@ -116,10 +73,8 @@ local function send_syslog_data(conf, log_message, api_ctx)
 end
 
 
--- log phase in APISIX
-function _M.log(conf, ctx)
-    local entry = log_util.get_full_log(ngx, conf)
-
+-- called in log phase of APISIX
+function _M.push_entry(conf, ctx, entry)
     if batch_processor_manager:add_entry(conf, entry) then
         return
     end
diff --git a/apisix/stream/plugins/syslog.lua b/apisix/stream/plugins/syslog.lua
new file mode 100644
index 000000000..fcae83033
--- /dev/null
+++ b/apisix/stream/plugins/syslog.lua
@@ -0,0 +1,82 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local core = require("apisix.core")
+local log_util = require("apisix.utils.log-util")
+local bp_manager_mod = require("apisix.utils.batch-processor-manager")
+local syslog = require("apisix.plugins.syslog.init")
+local plugin = require("apisix.plugin")
+local plugin_name = "syslog"
+
+local batch_processor_manager = bp_manager_mod.new("stream sys logger")
+local schema = {
+    type = "object",
+    properties = {
+        host = {type = "string"},
+        port = {type = "integer"},
+        flush_limit = {type = "integer", minimum = 1, default = 4096},
+        drop_limit = {type = "integer", default = 1048576},
+        timeout = {type = "integer", minimum = 1, default = 3000},
+        sock_type = {type = "string", default = "tcp", enum = {"tcp", "udp"}},
+        pool_size = {type = "integer", minimum = 5, default = 5},
+        tls = {type = "boolean", default = false}
+    },
+    required = {"host", "port"}
+}
+
+local schema = batch_processor_manager:wrap_schema(schema)
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+local _M = {
+    version = 0.1,
+    priority = 401,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+    flush_syslog = syslog.flush_syslog,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+    return core.schema.check(schema, conf)
+end
+
+
+function _M.log(conf, ctx)
+    local metadata = plugin.plugin_metadata(plugin_name)
+    if not metadata or not metadata.value.log_format
+          or core.table.nkeys(metadata.value.log_format) <= 0
+    then
+        core.log.error("syslog's log_format is not set")
+        return
+    end
+
+    local entry = log_util.get_custom_format_log(ctx, metadata.value.log_format)
+    syslog.push_entry(conf, ctx, entry)
+end
+
+
+return _M
diff --git a/apisix/utils/batch-processor.lua b/apisix/utils/batch-processor.lua
index 6d3bf53f2..5e324e8d6 100644
--- a/apisix/utils/batch-processor.lua
+++ b/apisix/utils/batch-processor.lua
@@ -28,7 +28,10 @@ local batch_processor_mt = {
 local execute_func
 local create_buffer_timer
 local batch_metrics
-local prometheus = require("apisix.plugins.prometheus.exporter")
+local prometheus
+if ngx.config.subsystem == "http" then
+    prometheus = require("apisix.plugins.prometheus.exporter")
+end
 
 
 local schema = {
@@ -181,7 +184,7 @@ function batch_processor:push(entry)
         return
     end
 
-    if not batch_metrics and prometheus.get_prometheus() and self.name
+    if prometheus and not batch_metrics and self.name
        and self.route_id and self.server_addr then
         batch_metrics = prometheus.get_prometheus():gauge("batch_process_entries",
                                                           "batch process remaining entries",
diff --git a/conf/config-default.yaml b/conf/config-default.yaml
index 4522fdc18..0bda63548 100644
--- a/conf/config-default.yaml
+++ b/conf/config-default.yaml
@@ -402,6 +402,7 @@ stream_plugins: # sorted by priority
   - ip-restriction                 # priority: 3000
   - limit-conn                     # priority: 1003
   - mqtt-proxy                     # priority: 1000
+  - syslog                         # priority: 401
   # <- recommend to use priority (0, 100) for your custom plugins
 
 #wasm:
diff --git a/t/stream-plugin/syslog.t b/t/stream-plugin/syslog.t
new file mode 100644
index 000000000..c6d96c95e
--- /dev/null
+++ b/t/stream-plugin/syslog.t
@@ -0,0 +1,343 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+no_long_string();
+no_shuffle();
+no_root_location();
+
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    if (!$block->error_log && !$block->no_error_log) {
+        $block->set_value("no_error_log", "[error]\n[alert]");
+    }
+
+    if (!defined $block->extra_stream_config) {
+        my $stream_config = <<_EOC_;
+    server {
+        listen 8125 udp;
+        content_by_lua_block {
+            require("lib.mock_layer4").dogstatsd()
+        }
+    }
+_EOC_
+        $block->set_value("extra_stream_config", $stream_config);
+    }
+
+});
+
+run_tests;
+
+__DATA__
+
+=== TEST 1: custom log format not set
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/upstreams/1',
+                ngx.HTTP_PUT,
+                [[{
+                    "nodes": {
+                        "127.0.0.1:1995": 1
+                    },
+                    "type": "roundrobin"
+                }]]
+                )
+
+            if code >= 300 then
+                ngx.status = code
+                ngx.say(body)
+                return
+            end
+
+            local code, body = t('/apisix/admin/stream_routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "plugins": {
+                        "syslog": {
+                            "host" : "127.0.0.1",
+                            "port" : 8125,
+                            "sock_type": "udp",
+                            "batch_max_size": 1,
+                            "flush_limit":1
+                        }
+                    },
+                    "upstream_id": "1"
+                }]]
+                )
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+
+
+
+=== TEST 2: hit
+--- stream_request eval
+mmm
+--- stream_response
+hello world
+--- error_log
+syslog's log_format is not set
+
+
+
+=== TEST 3: set custom log format
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/plugin_metadata/syslog',
+                ngx.HTTP_PUT,
+                [[{
+                    "log_format": {
+                        "client_ip": "$remote_addr"
+                    }
+                }]]
+                )
+
+            if code >= 300 then
+                ngx.status = code
+                ngx.say(body)
+                return
+            end
+
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+
+
+
+=== TEST 4: hit
+--- stream_request eval
+mmm
+--- stream_response
+hello world
+--- wait: 0.5
+--- error_log eval
+qr/message received:.*\"client_ip\\"\:\\"127.0.0.1\\"/
+
+
+
+=== TEST 5: flush manually
+--- config
+    location /t {
+        content_by_lua_block {
+            local plugin = require("apisix.stream.plugins.syslog")
+            local logger_socket = require("resty.logger.socket")
+            local logger, err = logger_socket:new({
+                    host = "127.0.0.1",
+                    port = 5044,
+                    flush_limit = 100,
+            })
+
+            local bytes, err = logger:log("abc")
+            if err then
+                ngx.log(ngx.ERR, err)
+            end
+
+            local bytes, err = logger:log("efg")
+            if err then
+                ngx.log(ngx.ERR, err)
+            end
+
+            local ok, err = plugin.flush_syslog(logger)
+            if not ok then
+                ngx.say("failed to flush syslog: ", err)
+                return
+            end
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+done
+
+
+
+=== TEST 6: small flush_limit, instant flush
+--- stream_conf_enable
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/stream_routes/1',
+                ngx.HTTP_PUT,
+                [[{
+                    "plugins": {
+                        "syslog": {
+                                "host" : "127.0.0.1",
+                                "port" : 5044,
+                                "flush_limit" : 1,
+                                "inactive_timeout": 1
+                            }
+                    },
+                    "upstream": {
+                        "nodes": {
+                            "127.0.0.1:1995": 1
+                        },
+                        "type": "roundrobin"
+                    }
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+
+            -- wait etcd sync
+            ngx.sleep(0.5)
+
+            local sock = ngx.socket.tcp()
+            local ok, err = sock:connect("127.0.0.1", 1985)
+            if not ok then
+                ngx.say("failed to connect: ", err)
+                return
+            end
+
+            assert(sock:send("mmm"))
+            local data = assert(sock:receive("*a"))
+            ngx.print(data)
+
+            -- wait flush log
+            ngx.sleep(2.5)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+hello world
+--- timeout: 5
+--- error_log
+try to lock with key stream/route#1
+unlock with key stream/route#1
+
+
+
+=== TEST 7: check plugin configuration updating
+--- stream_conf_enable
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body1 = t('/apisix/admin/stream_routes/1',
+                ngx.HTTP_PUT,
+                [[{
+                    "plugins": {
+                        "syslog": {
+                                "host" : "127.0.0.1",
+                                "port" : 5044,
+                                "batch_max_size": 1
+                            }
+                    },
+                    "upstream": {
+                        "nodes": {
+                            "127.0.0.1:1995": 1
+                        },
+                        "type": "roundrobin"
+                    }
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+                ngx.say("fail")
+                return
+            end
+
+            local sock = ngx.socket.tcp()
+            local ok, err = sock:connect("127.0.0.1", 1985)
+            if not ok then
+                ngx.status = code
+                ngx.say("fail")
+                return
+            end
+
+            assert(sock:send("mmm"))
+            local body2 = assert(sock:receive("*a"))
+
+            local code, body3 = t('/apisix/admin/stream_routes/1',
+                ngx.HTTP_PUT,
+                [[{
+                    "plugins": {
+                        "syslog": {
+                                "host" : "127.0.0.1",
+                                "port" : 5045,
+                                 "batch_max_size": 1
+                            }
+                    },
+                    "upstream": {
+                        "nodes": {
+                            "127.0.0.1:1995": 1
+                        },
+                        "type": "roundrobin"
+                    }
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+                ngx.say("fail")
+                return
+            end
+
+            local sock = ngx.socket.tcp()
+            local ok, err = sock:connect("127.0.0.1", 1985)
+            if not ok then
+                ngx.status = code
+                ngx.say("fail")
+                return
+            end
+
+            assert(sock:send("mmm"))
+            local body4 = assert(sock:receive("*a"))
+
+            ngx.print(body1)
+            ngx.print(body2)
+            ngx.print(body3)
+            ngx.print(body4)
+        }
+    }
+--- request
+GET /t
+--- wait: 0.5
+--- response_body
+passedhello world
+passedhello world
+--- grep_error_log eval
+qr/sending a batch logs to 127.0.0.1:(\d+)/
+--- grep_error_log_out
+sending a batch logs to 127.0.0.1:5044
+sending a batch logs to 127.0.0.1:5045