You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by sp...@apache.org on 2022/05/05 02:02:42 UTC
[apisix] branch master updated: feat(stream): port syslog plugin (#6953)
This is an automated email from the ASF dual-hosted git repository.
spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new 3b7e3fde0 feat(stream): port syslog plugin (#6953)
3b7e3fde0 is described below
commit 3b7e3fde0559155c79060097a27c1d3010df5046
Author: tzssangglass <tz...@gmail.com>
AuthorDate: Thu May 5 10:02:36 2022 +0800
feat(stream): port syslog plugin (#6953)
---
Makefile | 3 +
apisix/plugins/syslog.lua | 82 +-----
apisix/plugins/{syslog.lua => syslog/init.lua} | 51 +---
apisix/stream/plugins/syslog.lua | 82 ++++++
apisix/utils/batch-processor.lua | 7 +-
conf/config-default.yaml | 1 +
t/stream-plugin/syslog.t | 343 +++++++++++++++++++++++++
7 files changed, 441 insertions(+), 128 deletions(-)
diff --git a/Makefile b/Makefile
index 39131e20c..3bc974295 100644
--- a/Makefile
+++ b/Makefile
@@ -334,6 +334,9 @@ install: runtime
$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/plugins/slslog
$(ENV_INSTALL) apisix/plugins/slslog/*.lua $(ENV_INST_LUADIR)/apisix/plugins/slslog/
+ $(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/plugins/syslog
+ $(ENV_INSTALL) apisix/plugins/syslog/*.lua $(ENV_INST_LUADIR)/apisix/plugins/syslog/
+
$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/plugins/zipkin
$(ENV_INSTALL) apisix/plugins/zipkin/*.lua $(ENV_INST_LUADIR)/apisix/plugins/zipkin/
diff --git a/apisix/plugins/syslog.lua b/apisix/plugins/syslog.lua
index be73cac55..7eb4675c0 100644
--- a/apisix/plugins/syslog.lua
+++ b/apisix/plugins/syslog.lua
@@ -18,12 +18,11 @@
local core = require("apisix.core")
local log_util = require("apisix.utils.log-util")
local bp_manager_mod = require("apisix.utils.batch-processor-manager")
-local logger_socket = require("resty.logger.socket")
+local syslog = require("apisix.plugins.syslog.init")
local plugin_name = "syslog"
local ngx = ngx
-
-local batch_processor_manager = bp_manager_mod.new("sys logger")
+local batch_processor_manager = bp_manager_mod.new("http sys logger")
local schema = {
type = "object",
properties = {
@@ -43,11 +42,6 @@ local schema = {
}
-local lrucache = core.lrucache.new({
- ttl = 300, count = 512, serial_creating = true,
-})
-
-
local schema = batch_processor_manager:wrap_schema(schema)
local _M = {
@@ -55,6 +49,7 @@ local _M = {
priority = 401,
name = plugin_name,
schema = schema,
+ flush_syslog = syslog.flush_syslog,
}
@@ -70,78 +65,9 @@ function _M.check_schema(conf)
end
-function _M.flush_syslog(logger)
- local ok, err = logger:flush(logger)
- if not ok then
- core.log.error("failed to flush message:", err)
- end
-
- return ok
-end
-
-
-local function send_syslog_data(conf, log_message, api_ctx)
- local err_msg
- local res = true
-
- core.log.info("sending a batch logs to ", conf.host, ":", conf.port)
-
- -- fetch it from lrucache
- local logger, err = core.lrucache.plugin_ctx(
- lrucache, api_ctx, nil, logger_socket.new, logger_socket, {
- host = conf.host,
- port = conf.port,
- flush_limit = conf.flush_limit,
- drop_limit = conf.drop_limit,
- timeout = conf.timeout,
- sock_type = conf.sock_type,
- pool_size = conf.pool_size,
- tls = conf.tls,
- }
- )
-
- if not logger then
- res = false
- err_msg = "failed when initiating the sys logger processor".. err
- end
-
- -- reuse the logger object
- local ok, err = logger:log(core.json.encode(log_message))
- if not ok then
- res = false
- err_msg = "failed to log message" .. err
- end
-
- return res, err_msg
-end
-
-
--- log phase in APISIX
function _M.log(conf, ctx)
local entry = log_util.get_full_log(ngx, conf)
-
- if batch_processor_manager:add_entry(conf, entry) then
- return
- end
-
- -- Generate a function to be executed by the batch processor
- local cp_ctx = core.table.clone(ctx)
- local func = function(entries, batch_max_size)
- local data, err
- if batch_max_size == 1 then
- data, err = core.json.encode(entries[1]) -- encode as single {}
- else
- data, err = core.json.encode(entries) -- encode as array [{}]
- end
-
- if not data then
- return false, 'error occurred while encoding the data: ' .. err
- end
-
- return send_syslog_data(conf, data, cp_ctx)
- end
-
- batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)
+ syslog.push_entry(conf, ctx, entry)
end
diff --git a/apisix/plugins/syslog.lua b/apisix/plugins/syslog/init.lua
similarity index 68%
copy from apisix/plugins/syslog.lua
copy to apisix/plugins/syslog/init.lua
index be73cac55..24f2f62ab 100644
--- a/apisix/plugins/syslog.lua
+++ b/apisix/plugins/syslog/init.lua
@@ -16,59 +16,16 @@
--
local core = require("apisix.core")
-local log_util = require("apisix.utils.log-util")
local bp_manager_mod = require("apisix.utils.batch-processor-manager")
local logger_socket = require("resty.logger.socket")
-local plugin_name = "syslog"
-local ngx = ngx
-
local batch_processor_manager = bp_manager_mod.new("sys logger")
-local schema = {
- type = "object",
- properties = {
- host = {type = "string"},
- port = {type = "integer"},
- max_retry_times = {type = "integer", minimum = 1},
- retry_interval = {type = "integer", minimum = 0},
- flush_limit = {type = "integer", minimum = 1, default = 4096},
- drop_limit = {type = "integer", default = 1048576},
- timeout = {type = "integer", minimum = 1, default = 3000},
- sock_type = {type = "string", default = "tcp", enum = {"tcp", "udp"}},
- pool_size = {type = "integer", minimum = 5, default = 5},
- tls = {type = "boolean", default = false},
- include_req_body = {type = "boolean", default = false}
- },
- required = {"host", "port"}
-}
-
local lrucache = core.lrucache.new({
ttl = 300, count = 512, serial_creating = true,
})
-
-local schema = batch_processor_manager:wrap_schema(schema)
-
-local _M = {
- version = 0.1,
- priority = 401,
- name = plugin_name,
- schema = schema,
-}
-
-
-function _M.check_schema(conf)
- local ok, err = core.schema.check(schema, conf)
- if not ok then
- return false, err
- end
-
- conf.max_retry_count = conf.max_retry_times or conf.max_retry_count
- conf.retry_delay = conf.retry_interval or conf.retry_delay
- return true
-end
-
+local _M = {}
function _M.flush_syslog(logger)
local ok, err = logger:flush(logger)
@@ -116,10 +73,8 @@ local function send_syslog_data(conf, log_message, api_ctx)
end
--- log phase in APISIX
-function _M.log(conf, ctx)
- local entry = log_util.get_full_log(ngx, conf)
-
+-- called in log phase of APISIX
+function _M.push_entry(conf, ctx, entry)
if batch_processor_manager:add_entry(conf, entry) then
return
end
diff --git a/apisix/stream/plugins/syslog.lua b/apisix/stream/plugins/syslog.lua
new file mode 100644
index 000000000..fcae83033
--- /dev/null
+++ b/apisix/stream/plugins/syslog.lua
@@ -0,0 +1,82 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local core = require("apisix.core")
+local log_util = require("apisix.utils.log-util")
+local bp_manager_mod = require("apisix.utils.batch-processor-manager")
+local syslog = require("apisix.plugins.syslog.init")
+local plugin = require("apisix.plugin")
+local plugin_name = "syslog"
+
+local batch_processor_manager = bp_manager_mod.new("stream sys logger")
+local schema = {
+ type = "object",
+ properties = {
+ host = {type = "string"},
+ port = {type = "integer"},
+ flush_limit = {type = "integer", minimum = 1, default = 4096},
+ drop_limit = {type = "integer", default = 1048576},
+ timeout = {type = "integer", minimum = 1, default = 3000},
+ sock_type = {type = "string", default = "tcp", enum = {"tcp", "udp"}},
+ pool_size = {type = "integer", minimum = 5, default = 5},
+ tls = {type = "boolean", default = false}
+ },
+ required = {"host", "port"}
+}
+
+local schema = batch_processor_manager:wrap_schema(schema)
+
+local metadata_schema = {
+ type = "object",
+ properties = {
+ log_format = log_util.metadata_schema_log_format,
+ },
+}
+
+local _M = {
+ version = 0.1,
+ priority = 401,
+ name = plugin_name,
+ schema = schema,
+ metadata_schema = metadata_schema,
+ flush_syslog = syslog.flush_syslog,
+}
+
+
+function _M.check_schema(conf, schema_type)
+ if schema_type == core.schema.TYPE_METADATA then
+ return core.schema.check(metadata_schema, conf)
+ end
+ return core.schema.check(schema, conf)
+end
+
+
+function _M.log(conf, ctx)
+ local metadata = plugin.plugin_metadata(plugin_name)
+ if not metadata or not metadata.value.log_format
+ or core.table.nkeys(metadata.value.log_format) <= 0
+ then
+ core.log.error("syslog's log_format is not set")
+ return
+ end
+
+ local entry = log_util.get_custom_format_log(ctx, metadata.value.log_format)
+ syslog.push_entry(conf, ctx, entry)
+end
+
+
+return _M
diff --git a/apisix/utils/batch-processor.lua b/apisix/utils/batch-processor.lua
index 6d3bf53f2..5e324e8d6 100644
--- a/apisix/utils/batch-processor.lua
+++ b/apisix/utils/batch-processor.lua
@@ -28,7 +28,10 @@ local batch_processor_mt = {
local execute_func
local create_buffer_timer
local batch_metrics
-local prometheus = require("apisix.plugins.prometheus.exporter")
+local prometheus
+if ngx.config.subsystem == "http" then
+ prometheus = require("apisix.plugins.prometheus.exporter")
+end
local schema = {
@@ -181,7 +184,7 @@ function batch_processor:push(entry)
return
end
- if not batch_metrics and prometheus.get_prometheus() and self.name
+ if prometheus and not batch_metrics and self.name
and self.route_id and self.server_addr then
batch_metrics = prometheus.get_prometheus():gauge("batch_process_entries",
"batch process remaining entries",
diff --git a/conf/config-default.yaml b/conf/config-default.yaml
index 4522fdc18..0bda63548 100644
--- a/conf/config-default.yaml
+++ b/conf/config-default.yaml
@@ -402,6 +402,7 @@ stream_plugins: # sorted by priority
- ip-restriction # priority: 3000
- limit-conn # priority: 1003
- mqtt-proxy # priority: 1000
+ - syslog # priority: 401
# <- recommend to use priority (0, 100) for your custom plugins
#wasm:
diff --git a/t/stream-plugin/syslog.t b/t/stream-plugin/syslog.t
new file mode 100644
index 000000000..c6d96c95e
--- /dev/null
+++ b/t/stream-plugin/syslog.t
@@ -0,0 +1,343 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+no_long_string();
+no_shuffle();
+no_root_location();
+
+
+add_block_preprocessor(sub {
+ my ($block) = @_;
+
+ if (!$block->error_log && !$block->no_error_log) {
+ $block->set_value("no_error_log", "[error]\n[alert]");
+ }
+
+ if (!defined $block->extra_stream_config) {
+ my $stream_config = <<_EOC_;
+ server {
+ listen 8125 udp;
+ content_by_lua_block {
+ require("lib.mock_layer4").dogstatsd()
+ }
+ }
+_EOC_
+ $block->set_value("extra_stream_config", $stream_config);
+ }
+
+});
+
+run_tests;
+
+__DATA__
+
+=== TEST 1: custom log format not set
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/upstreams/1',
+ ngx.HTTP_PUT,
+ [[{
+ "nodes": {
+ "127.0.0.1:1995": 1
+ },
+ "type": "roundrobin"
+ }]]
+ )
+
+ if code >= 300 then
+ ngx.status = code
+ ngx.say(body)
+ return
+ end
+
+ local code, body = t('/apisix/admin/stream_routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "plugins": {
+ "syslog": {
+ "host" : "127.0.0.1",
+ "port" : 8125,
+ "sock_type": "udp",
+ "batch_max_size": 1,
+ "flush_limit":1
+ }
+ },
+ "upstream_id": "1"
+ }]]
+ )
+
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- request
+GET /t
+--- response_body
+passed
+
+
+
+=== TEST 2: hit
+--- stream_request eval
+mmm
+--- stream_response
+hello world
+--- error_log
+syslog's log_format is not set
+
+
+
+=== TEST 3: set custom log format
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/plugin_metadata/syslog',
+ ngx.HTTP_PUT,
+ [[{
+ "log_format": {
+ "client_ip": "$remote_addr"
+ }
+ }]]
+ )
+
+ if code >= 300 then
+ ngx.status = code
+ ngx.say(body)
+ return
+ end
+
+ ngx.say(body)
+ }
+ }
+--- request
+GET /t
+--- response_body
+passed
+
+
+
+=== TEST 4: hit
+--- stream_request eval
+mmm
+--- stream_response
+hello world
+--- wait: 0.5
+--- error_log eval
+qr/message received:.*\"client_ip\\"\:\\"127.0.0.1\\"/
+
+
+
+=== TEST 5: flush manually
+--- config
+ location /t {
+ content_by_lua_block {
+ local plugin = require("apisix.stream.plugins.syslog")
+ local logger_socket = require("resty.logger.socket")
+ local logger, err = logger_socket:new({
+ host = "127.0.0.1",
+ port = 5044,
+ flush_limit = 100,
+ })
+
+ local bytes, err = logger:log("abc")
+ if err then
+ ngx.log(ngx.ERR, err)
+ end
+
+ local bytes, err = logger:log("efg")
+ if err then
+ ngx.log(ngx.ERR, err)
+ end
+
+ local ok, err = plugin.flush_syslog(logger)
+ if not ok then
+ ngx.say("failed to flush syslog: ", err)
+ return
+ end
+ ngx.say("done")
+ }
+ }
+--- request
+GET /t
+--- response_body
+done
+
+
+
+=== TEST 6: small flush_limit, instant flush
+--- stream_conf_enable
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/stream_routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "plugins": {
+ "syslog": {
+ "host" : "127.0.0.1",
+ "port" : 5044,
+ "flush_limit" : 1,
+ "inactive_timeout": 1
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1995": 1
+ },
+ "type": "roundrobin"
+ }
+ }]]
+ )
+
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+
+ -- wait etcd sync
+ ngx.sleep(0.5)
+
+ local sock = ngx.socket.tcp()
+ local ok, err = sock:connect("127.0.0.1", 1985)
+ if not ok then
+ ngx.say("failed to connect: ", err)
+ return
+ end
+
+ assert(sock:send("mmm"))
+ local data = assert(sock:receive("*a"))
+ ngx.print(data)
+
+ -- wait flush log
+ ngx.sleep(2.5)
+ }
+ }
+--- request
+GET /t
+--- response_body
+passed
+hello world
+--- timeout: 5
+--- error_log
+try to lock with key stream/route#1
+unlock with key stream/route#1
+
+
+
+=== TEST 7: check plugin configuration updating
+--- stream_conf_enable
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body1 = t('/apisix/admin/stream_routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "plugins": {
+ "syslog": {
+ "host" : "127.0.0.1",
+ "port" : 5044,
+ "batch_max_size": 1
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1995": 1
+ },
+ "type": "roundrobin"
+ }
+ }]]
+ )
+
+ if code >= 300 then
+ ngx.status = code
+ ngx.say("fail")
+ return
+ end
+
+ local sock = ngx.socket.tcp()
+ local ok, err = sock:connect("127.0.0.1", 1985)
+ if not ok then
+ ngx.status = code
+ ngx.say("fail")
+ return
+ end
+
+ assert(sock:send("mmm"))
+ local body2 = assert(sock:receive("*a"))
+
+ local code, body3 = t('/apisix/admin/stream_routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "plugins": {
+ "syslog": {
+ "host" : "127.0.0.1",
+ "port" : 5045,
+ "batch_max_size": 1
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1995": 1
+ },
+ "type": "roundrobin"
+ }
+ }]]
+ )
+
+ if code >= 300 then
+ ngx.status = code
+ ngx.say("fail")
+ return
+ end
+
+ local sock = ngx.socket.tcp()
+ local ok, err = sock:connect("127.0.0.1", 1985)
+ if not ok then
+ ngx.status = code
+ ngx.say("fail")
+ return
+ end
+
+ assert(sock:send("mmm"))
+ local body4 = assert(sock:receive("*a"))
+
+ ngx.print(body1)
+ ngx.print(body2)
+ ngx.print(body3)
+ ngx.print(body4)
+ }
+ }
+--- request
+GET /t
+--- wait: 0.5
+--- response_body
+passedhello world
+passedhello world
+--- grep_error_log eval
+qr/sending a batch logs to 127.0.0.1:(\d+)/
+--- grep_error_log_out
+sending a batch logs to 127.0.0.1:5044
+sending a batch logs to 127.0.0.1:5045