You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by mo...@apache.org on 2023/05/23 06:30:26 UTC
[apisix] branch master updated: feat(config_etcd): use a single long http connection to watch all resources (#9456)
This is an automated email from the ASF dual-hosted git repository.
monkeydluffy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new 4377d0549 feat(config_etcd): use a single long http connection to watch all resources (#9456)
4377d0549 is described below
commit 4377d05491ae2af228d40e3900362371363a8f46
Author: jinhua luo <ho...@163.com>
AuthorDate: Tue May 23 14:30:16 2023 +0800
feat(config_etcd): use a single long http connection to watch all resources (#9456)
---
apisix/core/config_etcd.lua | 312 ++++++++++++++++++++++++++++-----
ci/linux_openresty_runner.sh | 2 +-
t/core/etcd-sync.t | 68 +------
t/plugin/error-log-logger-skywalking.t | 4 +-
4 files changed, 275 insertions(+), 111 deletions(-)
diff --git a/apisix/core/config_etcd.lua b/apisix/core/config_etcd.lua
index 4946cc5c2..ecb762704 100644
--- a/apisix/core/config_etcd.lua
+++ b/apisix/core/config_etcd.lua
@@ -27,6 +27,10 @@ local json = require("apisix.core.json")
local etcd_apisix = require("apisix.core.etcd")
local core_str = require("apisix.core.string")
local new_tab = require("table.new")
+local inspect = require("inspect")
+local errlog = require("ngx.errlog")
+local log_level = errlog.get_sys_filter_level()
+local NGX_INFO = ngx.INFO
local check_schema = require("apisix.core.schema").check
local exiting = ngx.worker.exiting
local insert_tab = table.insert
@@ -43,9 +47,14 @@ local xpcall = xpcall
local debug = debug
local string = string
local error = error
+local pairs = pairs
+local next = next
+local assert = assert
local rand = math.random
local constants = require("apisix.constants")
local health_check = require("resty.etcd.health_check")
+local semaphore = require("ngx.semaphore")
+local tablex = require("pl.tablex")
local is_http = ngx.config.subsystem == "http"
@@ -58,6 +67,7 @@ if not is_http then
end
local created_obj = {}
local loaded_configuration = {}
+local watch_ctx
local _M = {
@@ -75,6 +85,208 @@ local mt = {
}
+local get_etcd
+do
+ local etcd_cli
+
+ function get_etcd()
+ if etcd_cli ~= nil then
+ return etcd_cli
+ end
+
+ local _, err
+ etcd_cli, _, err = etcd_apisix.get_etcd_syncer()
+ return etcd_cli, err
+ end
+end
+
+
+local function cancel_watch(http_cli)
+ local res, err = watch_ctx.cli:watchcancel(http_cli)
+ if res == 1 then
+ log.info("cancel watch connection success")
+ else
+ log.error("cancel watch failed: ", err)
+ end
+end
+
+
+-- append res to the queue and notify pending watchers
+local function produce_res(res, err)
+ if log_level >= NGX_INFO then
+ log.info("append res: ", inspect(res), ", err: ", inspect(err))
+ end
+ insert_tab(watch_ctx.res, {res=res, err=err})
+ for _, sema in pairs(watch_ctx.sema) do
+ sema:post()
+ end
+ table.clear(watch_ctx.sema)
+end
+
+
+local function run_watch(premature)
+ if premature then
+ return
+ end
+
+ local local_conf, err = config_local.local_conf()
+ if not local_conf then
+ error("no local conf: " .. err)
+ end
+ watch_ctx.prefix = local_conf.etcd.prefix .. "/"
+
+ watch_ctx.cli, err = get_etcd()
+ if not watch_ctx.cli then
+ error("failed to create etcd instance: " .. string(err))
+ end
+
+ local rev = 0
+ if loaded_configuration then
+ local _, res = next(loaded_configuration)
+ if res then
+ rev = tonumber(res.headers["X-Etcd-Index"])
+ assert(rev > 0, 'invalid res.headers["X-Etcd-Index"]')
+ end
+ end
+
+ if rev == 0 then
+ while true do
+ local res, err = watch_ctx.cli:get(watch_ctx.prefix)
+ if not res then
+ log.error("etcd get: ", err)
+ ngx_sleep(3)
+ else
+ watch_ctx.rev = tonumber(res.body.header.revision)
+ break
+ end
+ end
+ end
+
+ watch_ctx.rev = rev + 1
+ watch_ctx.started = true
+
+ log.warn("main etcd watcher started, revision=", watch_ctx.rev)
+ for _, sema in pairs(watch_ctx.wait_init) do
+ sema:post()
+ end
+ watch_ctx.wait_init = nil
+
+ local opts = {}
+ opts.timeout = 50 -- second
+ opts.need_cancel = true
+
+ ::restart_watch::
+ while true do
+ opts.start_revision = watch_ctx.rev
+ log.info("restart watchdir: start_revision=", opts.start_revision)
+ local res_func, err, http_cli = watch_ctx.cli:watchdir(watch_ctx.prefix, opts)
+ if not res_func then
+ log.error("watchdir: ", err)
+ ngx_sleep(3)
+ goto restart_watch
+ end
+
+ ::watch_event::
+ while true do
+ local res, err = res_func()
+ if log_level >= NGX_INFO then
+ log.info("res_func: ", inspect(res))
+ end
+
+ if not res then
+ if err ~= "closed" and
+ err ~= "timeout" and
+ err ~= "broken pipe"
+ then
+ log.error("wait watch event: ", err)
+ end
+ cancel_watch(http_cli)
+ break
+ end
+
+ if res.error then
+ log.error("wait watch event: ", inspect(res.error))
+ cancel_watch(http_cli)
+ break
+ end
+
+ if res.result.created then
+ goto watch_event
+ end
+
+ if res.result.canceled then
+ log.warn("watch canceled by etcd, res: ", inspect(res))
+ if res.result.compact_revision then
+ watch_ctx.rev = tonumber(res.result.compact_revision)
+ log.warn("etcd compacted, compact_revision=", watch_ctx.rev)
+ produce_res(nil, "compacted")
+ end
+ cancel_watch(http_cli)
+ break
+ end
+
+ -- cleanup
+ local min_idx = 0
+ for _, idx in pairs(watch_ctx.idx) do
+ if (min_idx == 0) or (idx < min_idx) then
+ min_idx = idx
+ end
+ end
+
+ for i = 1, min_idx - 1 do
+ watch_ctx.res[i] = false
+ end
+
+ if min_idx > 100 then
+ for k, idx in pairs(watch_ctx.idx) do
+ watch_ctx.idx[k] = idx - min_idx + 1
+ end
+ -- trim the res table
+ for i = 1, min_idx - 1 do
+ table.remove(watch_ctx.res, 1)
+ end
+ end
+
+ local rev = tonumber(res.result.header.revision)
+ if rev > watch_ctx.rev then
+ watch_ctx.rev = rev + 1
+ end
+ produce_res(res)
+ end
+ end
+end
+
+
+local function init_watch_ctx(key)
+ if not watch_ctx then
+ watch_ctx = {
+ idx = {},
+ res = {},
+ sema = {},
+ wait_init = {},
+ started = false,
+ }
+ ngx_timer_at(0, run_watch)
+ end
+
+ if watch_ctx.started == false then
+ -- wait until the main watcher is started
+ local sema, err = semaphore.new()
+ if not sema then
+ error(err)
+ end
+ watch_ctx.wait_init[key] = sema
+ while true do
+ local ok, err = sema:wait(60)
+ if ok then
+ break
+ end
+ log.error("wait main watcher to start, key: ", key, ", err: ", err)
+ end
+ end
+end
+
+
local function getkey(etcd_cli, key)
if not etcd_cli then
return nil, "not inited"
@@ -157,45 +369,67 @@ local function flush_watching_streams(self)
end
-local function http_waitdir(etcd_cli, key, modified_index, timeout)
- local opts = {}
- opts.start_revision = modified_index
- opts.timeout = timeout
- opts.need_cancel = true
- local res_func, func_err, http_cli = etcd_cli:watchdir(key, opts)
- if not res_func then
- return nil, func_err
+local function http_waitdir(self, etcd_cli, key, modified_index, timeout)
+ if not watch_ctx.idx[key] then
+ watch_ctx.idx[key] = 1
end
- -- in etcd v3, the 1st res of watch is watch info, useless to us.
- -- try twice to skip create info
- local res, err = res_func()
- if not res or not res.result or not res.result.events then
- res, err = res_func()
- end
+ ::iterate_events::
+ for i = watch_ctx.idx[key], #watch_ctx.res do
+ watch_ctx.idx[key] = i + 1
- if http_cli then
- local res_cancel, err_cancel = etcd_cli:watchcancel(http_cli)
- if res_cancel == 1 then
- log.info("cancel watch connection success")
- else
- log.error("cancel watch failed: ", err_cancel)
+ local item = watch_ctx.res[i]
+ if item == false then
+ goto iterate_events
+ end
+
+ local res, err = item.res, item.err
+ if err then
+ return res, err
+ end
+
+ -- ignore res with revision smaller then self.prev_index
+ if tonumber(res.result.header.revision) > self.prev_index then
+ local res2
+ for _, evt in ipairs(res.result.events) do
+ if evt.kv.key:find(key) == 1 then
+ if not res2 then
+ res2 = tablex.deepcopy(res)
+ table.clear(res2.result.events)
+ end
+ insert_tab(res2.result.events, evt)
+ end
+ end
+
+ if res2 then
+ if log_level >= NGX_INFO then
+ log.info("http_waitdir: ", inspect(res2))
+ end
+ return res2
+ end
end
end
- if not res then
- return nil, err
+ -- if no events, wait via semaphore
+ if not self.watch_sema then
+ local sema, err = semaphore.new()
+ if not sema then
+ error(err)
+ end
+ self.watch_sema = sema
end
- if type(res.result) ~= "table" then
- err = "failed to wait etcd dir"
- if res.error and res.error.message then
- err = err .. ": " .. res.error.message
+ watch_ctx.sema[key] = self.watch_sema
+ local ok, err = self.watch_sema:wait(timeout or 60)
+ watch_ctx.sema[key] = nil
+ if ok then
+ goto iterate_events
+ else
+ if err ~= "timeout" then
+ log.error("wait watch event, key=", key, ", err: ", err)
end
return nil, err
end
-
- return res, err
end
@@ -213,7 +447,7 @@ local function waitdir(self)
if etcd_cli.use_grpc then
res, err = grpc_waitdir(self, etcd_cli, key, modified_index, timeout)
else
- res, err = http_waitdir(etcd_cli, key, modified_index, timeout)
+ res, err = http_waitdir(self, etcd_cli, key, modified_index, timeout)
end
if not res then
@@ -359,6 +593,10 @@ local function sync_data(self)
return nil, "missing 'key' arguments"
end
+ if not self.etcd_cli.use_grpc then
+ init_watch_ctx(self.key)
+ end
+
if self.need_reload then
flush_watching_streams(self)
@@ -555,22 +793,6 @@ function _M.getkey(self, key)
end
-local get_etcd
-do
- local etcd_cli
-
- function get_etcd()
- if etcd_cli ~= nil then
- return etcd_cli
- end
-
- local _, err
- etcd_cli, _, err = etcd_apisix.get_etcd_syncer()
- return etcd_cli, err
- end
-end
-
-
local function _automatic_fetch(premature, self)
if premature then
return
diff --git a/ci/linux_openresty_runner.sh b/ci/linux_openresty_runner.sh
index 2cdc87b21..877248913 100755
--- a/ci/linux_openresty_runner.sh
+++ b/ci/linux_openresty_runner.sh
@@ -18,5 +18,5 @@
export OPENRESTY_VERSION=source
-export TEST_CI_USE_GRPC=true
+#export TEST_CI_USE_GRPC=true
. ./ci/linux_openresty_common_runner.sh
diff --git a/t/core/etcd-sync.t b/t/core/etcd-sync.t
index e74ae19ec..aef5e2361 100644
--- a/t/core/etcd-sync.t
+++ b/t/core/etcd-sync.t
@@ -22,65 +22,7 @@ run_tests;
__DATA__
-=== TEST 1: minus timeout to watch repeatedly
---- yaml_config
-deployment:
- role: traditional
- role_traditional:
- config_provider: etcd
- etcd:
- # this test requires the HTTP long pull as the gRPC stream is shared and can't change
- # default timeout in the fly
- use_grpc: false
- admin:
- admin_key: null
---- config
- location /t {
- content_by_lua_block {
- local core = require("apisix.core")
- local t = require("lib.test_admin").test
-
- local consumers, _ = core.config.new("/consumers", {
- automatic = true,
- item_schema = core.schema.consumer,
- timeout = 0.2
- })
-
- ngx.sleep(0.6)
- local idx = consumers.prev_index
-
- local code, body = t('/apisix/admin/consumers',
- ngx.HTTP_PUT,
- [[{
- "username": "jobs",
- "plugins": {
- "basic-auth": {
- "username": "jobs",
- "password": "123456"
- }
- }
- }]])
-
- ngx.sleep(2)
- local new_idx = consumers.prev_index
- core.log.info("idx:", idx, " new_idx: ", new_idx)
- if new_idx > idx then
- ngx.say("prev_index updated")
- else
- ngx.say("prev_index not update")
- end
- }
- }
---- request
-GET /t
---- response_body
-prev_index updated
---- error_log eval
-qr/(create watch stream for key|cancel watch connection success)/
-
-
-
-=== TEST 2: using default timeout
+=== TEST 1: using default timeout
--- config
location /t {
content_by_lua_block {
@@ -126,7 +68,7 @@ waitdir key
-=== TEST 3: no update
+=== TEST 2: no update
--- config
location /t {
content_by_lua_block {
@@ -162,7 +104,7 @@ prev_index not update
-=== TEST 4: bad plugin configuration (validated via incremental sync)
+=== TEST 3: bad plugin configuration (validated via incremental sync)
--- config
location /t {
content_by_lua_block {
@@ -182,7 +124,7 @@ property "uri" validation failed
-=== TEST 5: bad plugin configuration (validated via full sync)
+=== TEST 4: bad plugin configuration (validated via full sync)
--- config
location /t {
content_by_lua_block {
@@ -196,7 +138,7 @@ property "uri" validation failed
-=== TEST 6: bad plugin configuration (validated without sync during start)
+=== TEST 5: bad plugin configuration (validated without sync during start)
--- extra_yaml_config
disable_sync_configuration_during_start: true
--- config
diff --git a/t/plugin/error-log-logger-skywalking.t b/t/plugin/error-log-logger-skywalking.t
index ffebf1cf4..edb5003c0 100644
--- a/t/plugin/error-log-logger-skywalking.t
+++ b/t/plugin/error-log-logger-skywalking.t
@@ -118,8 +118,8 @@ qr/Batch Processor\[error-log-logger\] failed to process entries: error while se
--- request
GET /tg
--- response_body
---- error_log eval
-qr/.*\[\{\"body\":\{\"text\":\{\"text\":\".*this is an error message for test.*\"\}\},\"endpoint\":\"\",\"service\":\"APISIX\",\"serviceInstance\":\"instance\".*/
+--- error_log
+this is an error message for test
--- wait: 5