You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by mo...@apache.org on 2023/05/23 06:30:26 UTC

[apisix] branch master updated: feat(config_etcd): use a single long http connection to watch all resources (#9456)

This is an automated email from the ASF dual-hosted git repository.

monkeydluffy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new 4377d0549 feat(config_etcd): use a single long http connection to watch all resources (#9456)
4377d0549 is described below

commit 4377d05491ae2af228d40e3900362371363a8f46
Author: jinhua luo <ho...@163.com>
AuthorDate: Tue May 23 14:30:16 2023 +0800

    feat(config_etcd): use a single long http connection to watch all resources (#9456)
---
 apisix/core/config_etcd.lua            | 312 ++++++++++++++++++++++++++++-----
 ci/linux_openresty_runner.sh           |   2 +-
 t/core/etcd-sync.t                     |  68 +------
 t/plugin/error-log-logger-skywalking.t |   4 +-
 4 files changed, 275 insertions(+), 111 deletions(-)

diff --git a/apisix/core/config_etcd.lua b/apisix/core/config_etcd.lua
index 4946cc5c2..ecb762704 100644
--- a/apisix/core/config_etcd.lua
+++ b/apisix/core/config_etcd.lua
@@ -27,6 +27,10 @@ local json         = require("apisix.core.json")
 local etcd_apisix  = require("apisix.core.etcd")
 local core_str     = require("apisix.core.string")
 local new_tab      = require("table.new")
+local inspect      = require("inspect")
+local errlog       = require("ngx.errlog")
+local log_level    = errlog.get_sys_filter_level()
+local NGX_INFO     = ngx.INFO
 local check_schema = require("apisix.core.schema").check
 local exiting      = ngx.worker.exiting
 local insert_tab   = table.insert
@@ -43,9 +47,14 @@ local xpcall       = xpcall
 local debug        = debug
 local string       = string
 local error        = error
+local pairs        = pairs
+local next         = next
+local assert       = assert
 local rand         = math.random
 local constants    = require("apisix.constants")
 local health_check = require("resty.etcd.health_check")
+local semaphore    = require("ngx.semaphore")
+local tablex       = require("pl.tablex")
 
 
 local is_http = ngx.config.subsystem == "http"
@@ -58,6 +67,7 @@ if not is_http then
 end
 local created_obj  = {}
 local loaded_configuration = {}
+local watch_ctx
 
 
 local _M = {
@@ -75,6 +85,208 @@ local mt = {
 }
 
 
+local get_etcd
+do
+    local etcd_cli
+
+    function get_etcd()
+        if etcd_cli ~= nil then
+            return etcd_cli
+        end
+
+        local _, err
+        etcd_cli, _, err = etcd_apisix.get_etcd_syncer()
+        return etcd_cli, err
+    end
+end
+
+
+local function cancel_watch(http_cli)
+    local res, err = watch_ctx.cli:watchcancel(http_cli)
+    if res == 1 then
+        log.info("cancel watch connection success")
+    else
+        log.error("cancel watch failed: ", err)
+    end
+end
+
+
+-- append res to the queue and notify pending watchers
+local function produce_res(res, err)
+    if log_level >= NGX_INFO then
+        log.info("append res: ", inspect(res), ", err: ", inspect(err))
+    end
+    insert_tab(watch_ctx.res, {res=res, err=err})
+    for _, sema in pairs(watch_ctx.sema) do
+        sema:post()
+    end
+    table.clear(watch_ctx.sema)
+end
+
+
+local function run_watch(premature)
+    if premature then
+        return
+    end
+
+    local local_conf, err = config_local.local_conf()
+    if not local_conf then
+        error("no local conf: " .. err)
+    end
+    watch_ctx.prefix = local_conf.etcd.prefix .. "/"
+
+    watch_ctx.cli, err = get_etcd()
+    if not watch_ctx.cli then
+        error("failed to create etcd instance: " .. string(err))
+    end
+
+    local rev = 0
+    if loaded_configuration then
+        local _, res = next(loaded_configuration)
+        if res then
+            rev = tonumber(res.headers["X-Etcd-Index"])
+            assert(rev > 0, 'invalid res.headers["X-Etcd-Index"]')
+        end
+    end
+
+    if rev == 0 then
+        while true do
+            local res, err = watch_ctx.cli:get(watch_ctx.prefix)
+            if not res then
+                log.error("etcd get: ", err)
+                ngx_sleep(3)
+            else
+                watch_ctx.rev = tonumber(res.body.header.revision)
+                break
+            end
+        end
+    end
+
+    watch_ctx.rev = rev + 1
+    watch_ctx.started = true
+
+    log.warn("main etcd watcher started, revision=", watch_ctx.rev)
+    for _, sema in pairs(watch_ctx.wait_init) do
+        sema:post()
+    end
+    watch_ctx.wait_init = nil
+
+    local opts = {}
+    opts.timeout = 50 -- second
+    opts.need_cancel = true
+
+    ::restart_watch::
+    while true do
+        opts.start_revision = watch_ctx.rev
+        log.info("restart watchdir: start_revision=", opts.start_revision)
+        local res_func, err, http_cli = watch_ctx.cli:watchdir(watch_ctx.prefix, opts)
+        if not res_func then
+            log.error("watchdir: ", err)
+            ngx_sleep(3)
+            goto restart_watch
+        end
+
+        ::watch_event::
+        while true do
+            local res, err = res_func()
+            if log_level >= NGX_INFO then
+                log.info("res_func: ", inspect(res))
+            end
+
+            if not res then
+                if err ~= "closed" and
+                    err ~= "timeout" and
+                    err ~= "broken pipe"
+                then
+                    log.error("wait watch event: ", err)
+                end
+                cancel_watch(http_cli)
+                break
+            end
+
+            if res.error then
+                log.error("wait watch event: ", inspect(res.error))
+                cancel_watch(http_cli)
+                break
+            end
+
+            if res.result.created then
+                goto watch_event
+            end
+
+            if res.result.canceled then
+                log.warn("watch canceled by etcd, res: ", inspect(res))
+                if res.result.compact_revision then
+                    watch_ctx.rev = tonumber(res.result.compact_revision)
+                    log.warn("etcd compacted, compact_revision=", watch_ctx.rev)
+                    produce_res(nil, "compacted")
+                end
+                cancel_watch(http_cli)
+                break
+            end
+
+            -- cleanup
+            local min_idx = 0
+            for _, idx in pairs(watch_ctx.idx) do
+                if (min_idx == 0) or (idx < min_idx) then
+                    min_idx = idx
+                end
+            end
+
+            for i = 1, min_idx - 1 do
+                watch_ctx.res[i] = false
+            end
+
+            if min_idx > 100 then
+                for k, idx in pairs(watch_ctx.idx) do
+                    watch_ctx.idx[k] = idx - min_idx + 1
+                end
+                -- trim the res table
+                for i = 1, min_idx - 1 do
+                    table.remove(watch_ctx.res, 1)
+                end
+            end
+
+            local rev = tonumber(res.result.header.revision)
+            if rev > watch_ctx.rev then
+                watch_ctx.rev = rev + 1
+            end
+            produce_res(res)
+        end
+    end
+end
+
+
+local function init_watch_ctx(key)
+    if not watch_ctx then
+        watch_ctx = {
+            idx = {},
+            res = {},
+            sema = {},
+            wait_init = {},
+            started = false,
+        }
+        ngx_timer_at(0, run_watch)
+    end
+
+    if watch_ctx.started == false then
+        -- wait until the main watcher is started
+        local sema, err = semaphore.new()
+        if not sema then
+            error(err)
+        end
+        watch_ctx.wait_init[key] = sema
+        while true do
+            local ok, err = sema:wait(60)
+            if ok then
+                break
+            end
+            log.error("wait main watcher to start, key: ", key, ", err: ", err)
+        end
+    end
+end
+
+
 local function getkey(etcd_cli, key)
     if not etcd_cli then
         return nil, "not inited"
@@ -157,45 +369,67 @@ local function flush_watching_streams(self)
 end
 
 
-local function http_waitdir(etcd_cli, key, modified_index, timeout)
-    local opts = {}
-    opts.start_revision = modified_index
-    opts.timeout = timeout
-    opts.need_cancel = true
-    local res_func, func_err, http_cli = etcd_cli:watchdir(key, opts)
-    if not res_func then
-        return nil, func_err
+local function http_waitdir(self, etcd_cli, key, modified_index, timeout)
+    if not watch_ctx.idx[key] then
+        watch_ctx.idx[key] = 1
     end
 
-    -- in etcd v3, the 1st res of watch is watch info, useless to us.
-    -- try twice to skip create info
-    local res, err = res_func()
-    if not res or not res.result or not res.result.events then
-        res, err = res_func()
-    end
+    ::iterate_events::
+    for i = watch_ctx.idx[key], #watch_ctx.res do
+        watch_ctx.idx[key] = i + 1
 
-    if http_cli then
-        local res_cancel, err_cancel = etcd_cli:watchcancel(http_cli)
-        if res_cancel == 1 then
-            log.info("cancel watch connection success")
-        else
-            log.error("cancel watch failed: ", err_cancel)
+        local item = watch_ctx.res[i]
+        if item == false then
+            goto iterate_events
+        end
+
+        local res, err = item.res, item.err
+        if err then
+            return res, err
+        end
+
+        -- ignore res with revision smaller then self.prev_index
+        if tonumber(res.result.header.revision) > self.prev_index then
+            local res2
+            for _, evt in ipairs(res.result.events) do
+                if evt.kv.key:find(key) == 1 then
+                    if not res2 then
+                        res2 = tablex.deepcopy(res)
+                        table.clear(res2.result.events)
+                    end
+                    insert_tab(res2.result.events, evt)
+                end
+            end
+
+            if res2 then
+                if log_level >= NGX_INFO then
+                    log.info("http_waitdir: ", inspect(res2))
+                end
+                return res2
+            end
         end
     end
 
-    if not res then
-        return nil, err
+    -- if no events, wait via semaphore
+    if not self.watch_sema then
+        local sema, err = semaphore.new()
+        if not sema then
+            error(err)
+        end
+        self.watch_sema = sema
     end
 
-    if type(res.result) ~= "table" then
-        err = "failed to wait etcd dir"
-        if res.error and res.error.message then
-            err = err .. ": " .. res.error.message
+    watch_ctx.sema[key] = self.watch_sema
+    local ok, err = self.watch_sema:wait(timeout or 60)
+    watch_ctx.sema[key] = nil
+    if ok then
+        goto iterate_events
+    else
+        if err ~= "timeout" then
+            log.error("wait watch event, key=", key, ", err: ", err)
         end
         return nil, err
     end
-
-    return res, err
 end
 
 
@@ -213,7 +447,7 @@ local function waitdir(self)
     if etcd_cli.use_grpc then
         res, err = grpc_waitdir(self, etcd_cli, key, modified_index, timeout)
     else
-        res, err = http_waitdir(etcd_cli, key, modified_index, timeout)
+        res, err = http_waitdir(self, etcd_cli, key, modified_index, timeout)
     end
 
     if not res then
@@ -359,6 +593,10 @@ local function sync_data(self)
         return nil, "missing 'key' arguments"
     end
 
+    if not self.etcd_cli.use_grpc then
+        init_watch_ctx(self.key)
+    end
+
     if self.need_reload then
         flush_watching_streams(self)
 
@@ -555,22 +793,6 @@ function _M.getkey(self, key)
 end
 
 
-local get_etcd
-do
-    local etcd_cli
-
-    function get_etcd()
-        if etcd_cli ~= nil then
-            return etcd_cli
-        end
-
-        local _, err
-        etcd_cli, _, err = etcd_apisix.get_etcd_syncer()
-        return etcd_cli, err
-    end
-end
-
-
 local function _automatic_fetch(premature, self)
     if premature then
         return
diff --git a/ci/linux_openresty_runner.sh b/ci/linux_openresty_runner.sh
index 2cdc87b21..877248913 100755
--- a/ci/linux_openresty_runner.sh
+++ b/ci/linux_openresty_runner.sh
@@ -18,5 +18,5 @@
 
 
 export OPENRESTY_VERSION=source
-export TEST_CI_USE_GRPC=true
+#export TEST_CI_USE_GRPC=true
 . ./ci/linux_openresty_common_runner.sh
diff --git a/t/core/etcd-sync.t b/t/core/etcd-sync.t
index e74ae19ec..aef5e2361 100644
--- a/t/core/etcd-sync.t
+++ b/t/core/etcd-sync.t
@@ -22,65 +22,7 @@ run_tests;
 
 __DATA__
 
-=== TEST 1: minus timeout to watch repeatedly
---- yaml_config
-deployment:
-    role: traditional
-    role_traditional:
-        config_provider: etcd
-    etcd:
-        # this test requires the HTTP long pull as the gRPC stream is shared and can't change
-        # default timeout in the fly
-        use_grpc: false
-    admin:
-        admin_key: null
---- config
-    location /t {
-        content_by_lua_block {
-            local core = require("apisix.core")
-            local t = require("lib.test_admin").test
-
-            local consumers, _ = core.config.new("/consumers", {
-                automatic = true,
-                item_schema = core.schema.consumer,
-                timeout = 0.2
-            })
-
-            ngx.sleep(0.6)
-            local idx = consumers.prev_index
-
-            local code, body = t('/apisix/admin/consumers',
-                ngx.HTTP_PUT,
-                [[{
-                    "username": "jobs",
-                    "plugins": {
-                        "basic-auth": {
-                            "username": "jobs",
-                            "password": "123456"
-                        }
-                    }
-                }]])
-
-            ngx.sleep(2)
-            local new_idx = consumers.prev_index
-            core.log.info("idx:", idx, " new_idx: ", new_idx)
-            if new_idx > idx then
-                ngx.say("prev_index updated")
-            else
-                ngx.say("prev_index not update")
-            end
-        }
-    }
---- request
-GET /t
---- response_body
-prev_index updated
---- error_log eval
-qr/(create watch stream for key|cancel watch connection success)/
-
-
-
-=== TEST 2: using default timeout
+=== TEST 1: using default timeout
 --- config
     location /t {
         content_by_lua_block {
@@ -126,7 +68,7 @@ waitdir key
 
 
 
-=== TEST 3: no update
+=== TEST 2: no update
 --- config
     location /t {
         content_by_lua_block {
@@ -162,7 +104,7 @@ prev_index not update
 
 
 
-=== TEST 4: bad plugin configuration (validated via incremental sync)
+=== TEST 3: bad plugin configuration (validated via incremental sync)
 --- config
     location /t {
         content_by_lua_block {
@@ -182,7 +124,7 @@ property "uri" validation failed
 
 
 
-=== TEST 5: bad plugin configuration (validated via full sync)
+=== TEST 4: bad plugin configuration (validated via full sync)
 --- config
     location /t {
         content_by_lua_block {
@@ -196,7 +138,7 @@ property "uri" validation failed
 
 
 
-=== TEST 6: bad plugin configuration (validated without sync during start)
+=== TEST 5: bad plugin configuration (validated without sync during start)
 --- extra_yaml_config
   disable_sync_configuration_during_start: true
 --- config
diff --git a/t/plugin/error-log-logger-skywalking.t b/t/plugin/error-log-logger-skywalking.t
index ffebf1cf4..edb5003c0 100644
--- a/t/plugin/error-log-logger-skywalking.t
+++ b/t/plugin/error-log-logger-skywalking.t
@@ -118,8 +118,8 @@ qr/Batch Processor\[error-log-logger\] failed to process entries: error while se
 --- request
 GET /tg
 --- response_body
---- error_log eval
-qr/.*\[\{\"body\":\{\"text\":\{\"text\":\".*this is an error message for test.*\"\}\},\"endpoint\":\"\",\"service\":\"APISIX\",\"serviceInstance\":\"instance\".*/
+--- error_log
+this is an error message for test
 --- wait: 5