You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by "kingluo (via GitHub)" <gi...@apache.org> on 2023/05/10 17:22:21 UTC

[GitHub] [apisix] kingluo opened a new pull request, #9456: feat(config_etcd): use only one http connection to watch all resources

kingluo opened a new pull request, #9456:
URL: https://github.com/apache/apisix/pull/9456

   ### Description
   
   <!-- Please include a summary of the change and which issue is fixed. -->
   <!-- Please also include relevant motivation and context. -->
   
   1. only one http connection to watch the prefix for all resources
   2. use chunked streaming to receive events in the connection, timeout=60s
   
   ### Checklist
   
   - [x] I have explained the need for this PR and the problem it solves
   - [x] I have explained the changes or the new features added to this PR
   - [ ] I have added tests corresponding to this change
   - [ ] I have updated the documentation to reflect this change
   - [x] I have verified that this change is backward compatible (If not, please discuss on the [APISIX mailing list](https://github.com/apache/apisix/tree/master#community) first)
   
   <!--
   
   Note
   
   1. Mark the PR as draft until it's ready to be reviewed.
   3. Always add/update tests for any changes unless you have a good reason.
   4. Always update the documentation to reflect the changes made in the PR.
   5. Make a new commit to resolve conversations instead of `push -f`.
   6. To resolve merge conflicts, merge master instead of rebasing.
   7. Use "request review" to notify the reviewer after making changes.
   8. Only a reviewer can mark a conversation as resolved.
   
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] monkeyDluffy6017 commented on a diff in pull request #9456: feat(config_etcd): use a single long http connection to watch all resources

Posted by "monkeyDluffy6017 (via GitHub)" <gi...@apache.org>.
monkeyDluffy6017 commented on code in PR #9456:
URL: https://github.com/apache/apisix/pull/9456#discussion_r1201494111


##########
apisix/core/config_etcd.lua:
##########
@@ -75,6 +85,208 @@ local mt = {
 }
 
 
+local get_etcd
+do
+    local etcd_cli
+
+    function get_etcd()
+        if etcd_cli ~= nil then
+            return etcd_cli
+        end
+
+        local _, err
+        etcd_cli, _, err = etcd_apisix.get_etcd_syncer()
+        return etcd_cli, err
+    end
+end
+
+
+local function cancel_watch(http_cli)
+    local res, err = watch_ctx.cli:watchcancel(http_cli)
+    if res == 1 then
+        log.info("cancel watch connection success")
+    else
+        log.error("cancel watch failed: ", err)
+    end
+end
+
+
+-- append res to the queue and notify pending watchers
+local function produce_res(res, err)
+    if log_level >= NGX_INFO then
+        log.info("append res: ", inspect(res), ", err: ", inspect(err))
+    end

Review Comment:
   Why not use `json.delay_encode` here? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] monkeyDluffy6017 commented on a diff in pull request #9456: feat(config_etcd): use a single long http connection to watch all resources

Posted by "monkeyDluffy6017 (via GitHub)" <gi...@apache.org>.
monkeyDluffy6017 commented on code in PR #9456:
URL: https://github.com/apache/apisix/pull/9456#discussion_r1199003106


##########
apisix/core/config_etcd.lua:
##########
@@ -75,6 +82,203 @@ local mt = {
 }
 
 
+local get_etcd
+do
+    local etcd_cli
+
+    function get_etcd()
+        if etcd_cli ~= nil then
+            return etcd_cli
+        end
+
+        local _, err
+        etcd_cli, _, err = etcd_apisix.get_etcd_syncer()
+        return etcd_cli, err
+    end
+end
+
+
+local function cancel_watch(http_cli)
+    local res, err = watch_ctx.cli:watchcancel(http_cli)
+    if res == 1 then
+        log.info("cancel watch connection success")
+    else
+        log.error("cancel watch failed: ", err)
+    end
+end
+
+
+-- append res to the queue and notify pending watchers
+local function produce_res(res, err)
+    log.info("append res: ", inspect(res), ", err: ", inspect(err))
+    insert_tab(watch_ctx.res, {res=res, err=err})
+    for _, sema in pairs(watch_ctx.sema) do
+        sema:post()
+    end
+    table.clear(watch_ctx.sema)
+end
+
+
+local function run_watch(premature)
+    if premature then
+        return
+    end
+
+    local local_conf, err = config_local.local_conf()
+    if not local_conf then
+        error("no local conf: " .. err)
+    end
+    watch_ctx.prefix = local_conf.etcd.prefix .. "/"
+
+    watch_ctx.cli, err = get_etcd()
+    if not watch_ctx.cli then
+        error("failed to create etcd instance: " .. string(err))
+    end
+
+    local rev = 0
+    if loaded_configuration then
+        local _, res = next(loaded_configuration)
+        if res then
+            rev = tonumber(res.headers["X-Etcd-Index"])
+            assert(rev > 0, 'invalid res.headers["X-Etcd-Index"]')
+        end
+    end
+
+    if rev == 0 then
+        while true do
+            local res, err = watch_ctx.cli:get(watch_ctx.prefix)
+            if not res then
+                log.error("etcd get: ", err)
+                ngx_sleep(3)
+            else
+                watch_ctx.rev = tonumber(res.body.header.revision)
+                break
+            end
+        end
+    end
+
+    watch_ctx.rev = rev + 1
+    watch_ctx.started = true
+
+    log.warn("main etcd watcher started, revision=", watch_ctx.rev)
+    for _, sema in pairs(watch_ctx.wait_init) do
+        sema:post()
+    end
+    watch_ctx.wait_init = nil
+
+    local opts = {}
+    opts.timeout = 50 -- second
+    opts.need_cancel = true
+
+    ::restart_watch::
+    while true do
+        opts.start_revision = watch_ctx.rev
+        log.info("restart watchdir: start_revision=", opts.start_revision)
+        local res_func, err, http_cli = watch_ctx.cli:watchdir(watch_ctx.prefix, opts)
+        if not res_func then
+            log.error("watchdir: ", err)
+            ngx_sleep(3)
+            goto restart_watch
+        end
+
+        ::watch_event::
+        while true do
+            local res, err = res_func()
+            log.info("res_func: ", inspect(res))
+
+            if not res then
+                if err ~= "closed" and
+                    err ~= "timeout" and
+                    err ~= "broken pipe" then
+                    log.error("wait watch event: ", err)
+                end
+                cancel_watch(http_cli)
+                break
+            end
+
+            if res.error then
+                log.error("wait watch event: ", inspect(res.error))
+                cancel_watch(http_cli)
+                break
+            end
+
+            if res.result.created then
+                goto watch_event
+            end
+
+            if res.result.canceled then
+                log.warn("watch canceled by etcd, res: ", inspect(res))
+                if res.result.compact_revision then
+                    watch_ctx.rev = tonumber(res.result.compact_revision)
+                    log.warn("etcd compacted, compact_revision=", watch_ctx.rev)
+                    produce_res(nil, "compacted")
+                end
+                cancel_watch(http_cli)
+                break
+            end
+
+            -- cleanup
+            local min_idx = 0
+            for _, idx in pairs(watch_ctx.idx) do
+                if (min_idx == 0) or (idx < min_idx) then
+                    min_idx = idx
+                end
+            end
+
+            for i = 1,min_idx-1 do

Review Comment:
   ```suggestion
               for i = 1, min_idx-1 do
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] kingluo commented on a diff in pull request #9456: feat(config_etcd): use a single long http connection to watch all resources

Posted by "kingluo (via GitHub)" <gi...@apache.org>.
kingluo commented on code in PR #9456:
URL: https://github.com/apache/apisix/pull/9456#discussion_r1201492784


##########
apisix/core/config_etcd.lua:
##########
@@ -157,45 +369,71 @@ local function flush_watching_streams(self)
 end
 
 
-local function http_waitdir(etcd_cli, key, modified_index, timeout)
-    local opts = {}
-    opts.start_revision = modified_index
-    opts.timeout = timeout
-    opts.need_cancel = true
-    local res_func, func_err, http_cli = etcd_cli:watchdir(key, opts)
-    if not res_func then
-        return nil, func_err
+local function http_waitdir(self, etcd_cli, key, modified_index, timeout)
+    if not watch_ctx.idx[key] then
+        watch_ctx.idx[key] = 1
     end
 
-    -- in etcd v3, the 1st res of watch is watch info, useless to us.
-    -- try twice to skip create info
-    local res, err = res_func()
-    if not res or not res.result or not res.result.events then
-        res, err = res_func()
-    end
+    ::iterate_events::
+    for i = watch_ctx.idx[key], #watch_ctx.res do
+        watch_ctx.idx[key] = i + 1
 
-    if http_cli then
-        local res_cancel, err_cancel = etcd_cli:watchcancel(http_cli)
-        if res_cancel == 1 then
-            log.info("cancel watch connection success")
-        else
-            log.error("cancel watch failed: ", err_cancel)
+        local item = watch_ctx.res[i]
+        if item == false then
+            goto iterate_events
+        end
+
+        local res, err = item.res, item.err
+        if err then
+            return res, err
+        end
+
+        local found = false
+        -- ignore res with revision smaller then self.prev_index
+        if tonumber(res.result.header.revision) > self.prev_index then
+            for _, evt in ipairs(res.result.events) do
+                if evt.kv.key:find(key) == 1 then
+                    found = true
+                    break
+                end
+            end
+        end
+
+        if found then
+            local res2 = tablex.deepcopy(res)
+            table.clear(res2.result.events)
+            for _, evt in ipairs(res.result.events) do
+                if evt.kv.key:find(key) == 1 then
+                    insert_tab(res2.result.events, evt)
+                end
+            end
+            if log_level >= NGX_INFO then
+                log.info("http_waitdir: ", inspect(res2))
+            end
+            return res2
         end

Review Comment:
   fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] kingluo commented on a diff in pull request #9456: feat(config_etcd): use a single long http connection to watch all resources

Posted by "kingluo (via GitHub)" <gi...@apache.org>.
kingluo commented on code in PR #9456:
URL: https://github.com/apache/apisix/pull/9456#discussion_r1193009388


##########
apisix/core/config_etcd.lua:
##########
@@ -75,6 +82,203 @@ local mt = {
 }
 
 
+local get_etcd
+do
+    local etcd_cli
+
+    function get_etcd()
+        if etcd_cli ~= nil then
+            return etcd_cli
+        end
+
+        local _, err
+        etcd_cli, _, err = etcd_apisix.get_etcd_syncer()
+        return etcd_cli, err
+    end
+end
+
+
+local function cancel_watch(http_cli)
+    local res, err = watch_ctx.cli:watchcancel(http_cli)
+    if res == 1 then
+        log.info("cancel watch connection success")
+    else
+        log.error("cancel watch failed: ", err)
+    end
+end
+
+
+-- append res to the queue and notify pending watchers
+local function produce_res(res, err)
+    log.info("append res: ", inspect(res), ", err: ", inspect(err))
+    insert_tab(watch_ctx.res, {res=res, err=err})
+    for _, sema in pairs(watch_ctx.sema) do
+        sema:post()
+    end
+    table.clear(watch_ctx.sema)
+end
+
+
+local function run_watch(premature)
+    if premature then
+        return
+    end
+
+    local local_conf, err = config_local.local_conf()
+    if not local_conf then
+        error("no local conf: " .. err)
+    end
+    watch_ctx.prefix = local_conf.etcd.prefix .. "/"
+
+    watch_ctx.cli, err = get_etcd()
+    if not watch_ctx.cli then
+        error("failed to create etcd instance: " .. string(err))
+    end
+
+    local rev = 0
+    if loaded_configuration then
+        local _, res = next(loaded_configuration)
+        if res then
+            rev = tonumber(res.headers["X-Etcd-Index"])
+            assert(rev > 0, 'invalid res.headers["X-Etcd-Index"]')
+        end
+    end
+
+    if rev == 0 then
+        while true do
+            local res, err = watch_ctx.cli:get(watch_ctx.prefix)
+            if not res then
+                log.error("etcd get: ", err)
+                ngx_sleep(3)
+            else
+                watch_ctx.rev = tonumber(res.body.header.revision)
+                break
+            end
+        end
+    end
+
+    watch_ctx.rev = rev + 1
+    watch_ctx.started = true
+
+    log.warn("main etcd watcher started, revision=", watch_ctx.rev)
+    for _, sema in pairs(watch_ctx.wait_init) do
+        sema:post()
+    end
+    watch_ctx.wait_init = nil
+
+    local opts = {}
+    opts.timeout = 50 -- second

Review Comment:
   The choice of 50 seconds is to make it smaller than the default proxy_read_timeout value, 60 seconds, so that nginx will not print error logs, such as:
   
   ```
   2023/05/10 23:43:44 [error] 3668019#3668019: *809 upstream timed out (110: Connection timed out) while reading upstream, client: unix:, server: , request: "POST /v3/watch HTTP/1.1", upstream: "http://127.0.0.1:2379/v3/watch", host: "127.0.0.1"
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] kingluo commented on a diff in pull request #9456: feat(config_etcd): use a single long http connection to watch all resources

Posted by "kingluo (via GitHub)" <gi...@apache.org>.
kingluo commented on code in PR #9456:
URL: https://github.com/apache/apisix/pull/9456#discussion_r1201492291


##########
apisix/core/config_etcd.lua:
##########
@@ -157,45 +369,71 @@ local function flush_watching_streams(self)
 end
 
 
-local function http_waitdir(etcd_cli, key, modified_index, timeout)
-    local opts = {}
-    opts.start_revision = modified_index
-    opts.timeout = timeout
-    opts.need_cancel = true
-    local res_func, func_err, http_cli = etcd_cli:watchdir(key, opts)
-    if not res_func then
-        return nil, func_err
+local function http_waitdir(self, etcd_cli, key, modified_index, timeout)
+    if not watch_ctx.idx[key] then
+        watch_ctx.idx[key] = 1

Review Comment:
   1. min_idx maybe 0, which is an internal state of the main watcher and comes from the summary of consumer idx of all resources
   2. after the first watch iteration, the consume idx of the resource will be kept in sync with the global min_idx.
   3. the sync gap is small, now the max gap is 100, so the overhead should be ignored



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] kingluo commented on a diff in pull request #9456: feat(config_etcd): use a single long http connection to watch all resources

Posted by "kingluo (via GitHub)" <gi...@apache.org>.
kingluo commented on code in PR #9456:
URL: https://github.com/apache/apisix/pull/9456#discussion_r1191434579


##########
t/plugin/error-log-logger-skywalking.t:
##########
@@ -118,8 +118,8 @@ qr/Batch Processor\[error-log-logger\] failed to process entries: error while se
 --- request
 GET /tg
 --- response_body
---- error_log eval
-qr/.*\[\{\"body\":\{\"text\":\{\"text\":\".*this is an error message for test.*\"\}\},\"endpoint\":\"\",\"service\":\"APISIX\",\"serviceInstance\":\"instance\".*/

Review Comment:
   The new watch subsystem may print new logs.
   All in all, strict log assumption is not a good practice.



##########
t/fuzzing/public.py:
##########
@@ -39,7 +39,10 @@ def check_log():
     apisix_errorlog = apisix_pwd() + "/logs/error.log"
     apisix_accesslog = apisix_pwd() + "/logs/access.log"
 
-    cmds = ['cat %s | grep -a "error" | grep -v "invalid request body"'%apisix_errorlog, 'cat %s | grep -a " 500 "'%apisix_accesslog]
+    cmds = ['cat %s | grep -a "error" \

Review Comment:
   filter out timeout logs from upstream connections used for watching.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] monkeyDluffy6017 commented on a diff in pull request #9456: feat(config_etcd): use a single long http connection to watch all resources

Posted by "monkeyDluffy6017 (via GitHub)" <gi...@apache.org>.
monkeyDluffy6017 commented on code in PR #9456:
URL: https://github.com/apache/apisix/pull/9456#discussion_r1199011256


##########
apisix/core/config_etcd.lua:
##########
@@ -157,45 +361,69 @@ local function flush_watching_streams(self)
 end
 
 
-local function http_waitdir(etcd_cli, key, modified_index, timeout)
-    local opts = {}
-    opts.start_revision = modified_index
-    opts.timeout = timeout
-    opts.need_cancel = true
-    local res_func, func_err, http_cli = etcd_cli:watchdir(key, opts)
-    if not res_func then
-        return nil, func_err
+local function http_waitdir(self, etcd_cli, key, modified_index, timeout)
+    if not watch_ctx.idx[key] then
+        watch_ctx.idx[key] = 1
     end
 
-    -- in etcd v3, the 1st res of watch is watch info, useless to us.
-    -- try twice to skip create info
-    local res, err = res_func()
-    if not res or not res.result or not res.result.events then
-        res, err = res_func()
-    end
+    ::iterate_events::
+    for i = watch_ctx.idx[key],#watch_ctx.res do

Review Comment:
   ```suggestion
       for i = watch_ctx.idx[key], #watch_ctx.res do
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] kingluo commented on pull request #9456: feat(config_etcd): use a single long http connection to watch all resources

Posted by "kingluo (via GitHub)" <gi...@apache.org>.
kingluo commented on PR #9456:
URL: https://github.com/apache/apisix/pull/9456#issuecomment-1554779156

   > Why do you call this "long http connection", the http_cli will be recreated every time: https://github.com/api7/lua-resty-etcd/blob/master/lib/resty/etcd/v3.lua#L803?
   
   Please read the code carefully.
   Once the http connection for watching is created, it keeps receiving the watch event chunks, unless it is timeout or compacted.
   So is it a long or not?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] kingluo commented on a diff in pull request #9456: feat(config_etcd): use a single long http connection to watch all resources

Posted by "kingluo (via GitHub)" <gi...@apache.org>.
kingluo commented on code in PR #9456:
URL: https://github.com/apache/apisix/pull/9456#discussion_r1201449493


##########
apisix/core/config_etcd.lua:
##########
@@ -157,45 +369,71 @@ local function flush_watching_streams(self)
 end
 
 
-local function http_waitdir(etcd_cli, key, modified_index, timeout)
-    local opts = {}
-    opts.start_revision = modified_index
-    opts.timeout = timeout
-    opts.need_cancel = true
-    local res_func, func_err, http_cli = etcd_cli:watchdir(key, opts)
-    if not res_func then
-        return nil, func_err
+local function http_waitdir(self, etcd_cli, key, modified_index, timeout)
+    if not watch_ctx.idx[key] then
+        watch_ctx.idx[key] = 1
     end
 
-    -- in etcd v3, the 1st res of watch is watch info, useless to us.
-    -- try twice to skip create info
-    local res, err = res_func()
-    if not res or not res.result or not res.result.events then
-        res, err = res_func()
-    end
+    ::iterate_events::
+    for i = watch_ctx.idx[key], #watch_ctx.res do
+        watch_ctx.idx[key] = i + 1
 
-    if http_cli then
-        local res_cancel, err_cancel = etcd_cli:watchcancel(http_cli)
-        if res_cancel == 1 then
-            log.info("cancel watch connection success")
-        else
-            log.error("cancel watch failed: ", err_cancel)
+        local item = watch_ctx.res[i]
+        if item == false then
+            goto iterate_events
+        end
+
+        local res, err = item.res, item.err
+        if err then
+            return res, err
+        end
+
+        local found = false
+        -- ignore res with revision smaller then self.prev_index
+        if tonumber(res.result.header.revision) > self.prev_index then
+            for _, evt in ipairs(res.result.events) do
+                if evt.kv.key:find(key) == 1 then
+                    found = true
+                    break
+                end
+            end
+        end
+
+        if found then
+            local res2 = tablex.deepcopy(res)
+            table.clear(res2.result.events)
+            for _, evt in ipairs(res.result.events) do
+                if evt.kv.key:find(key) == 1 then
+                    insert_tab(res2.result.events, evt)
+                end
+            end
+            if log_level >= NGX_INFO then
+                log.info("http_waitdir: ", inspect(res2))
+            end
+            return res2
         end

Review Comment:
   No need to create a new table (and replace a newly created sub-table) here. Because it does not necessarily match events in the queue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] kingluo commented on a diff in pull request #9456: feat(config_etcd): use a single long http connection to watch all resources

Posted by "kingluo (via GitHub)" <gi...@apache.org>.
kingluo commented on code in PR #9456:
URL: https://github.com/apache/apisix/pull/9456#discussion_r1201549536


##########
apisix/core/config_etcd.lua:
##########
@@ -75,6 +85,208 @@ local mt = {
 }
 
 
+local get_etcd
+do
+    local etcd_cli
+
+    function get_etcd()
+        if etcd_cli ~= nil then
+            return etcd_cli
+        end
+
+        local _, err
+        etcd_cli, _, err = etcd_apisix.get_etcd_syncer()
+        return etcd_cli, err
+    end
+end
+
+
+local function cancel_watch(http_cli)
+    local res, err = watch_ctx.cli:watchcancel(http_cli)
+    if res == 1 then
+        log.info("cancel watch connection success")
+    else
+        log.error("cancel watch failed: ", err)
+    end
+end
+
+
+-- append res to the queue and notify pending watchers
+local function produce_res(res, err)
+    if log_level >= NGX_INFO then
+        log.info("append res: ", inspect(res), ", err: ", inspect(err))
+    end

Review Comment:
   the delay stuff has bug: https://github.com/apache/apisix/blob/a943c036987aa2e9a34f05060015ff65b8913345/apisix/core/json.lua#L116-L120
   It only uses a singleton table to log, but here I need two vars to log.
   And inspect is more informational than json for debugging.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] kingluo commented on a diff in pull request #9456: feat(config_etcd): use a single long http connection to watch all resources

Posted by "kingluo (via GitHub)" <gi...@apache.org>.
kingluo commented on code in PR #9456:
URL: https://github.com/apache/apisix/pull/9456#discussion_r1193009388


##########
apisix/core/config_etcd.lua:
##########
@@ -75,6 +82,203 @@ local mt = {
 }
 
 
+local get_etcd
+do
+    local etcd_cli
+
+    function get_etcd()
+        if etcd_cli ~= nil then
+            return etcd_cli
+        end
+
+        local _, err
+        etcd_cli, _, err = etcd_apisix.get_etcd_syncer()
+        return etcd_cli, err
+    end
+end
+
+
+local function cancel_watch(http_cli)
+    local res, err = watch_ctx.cli:watchcancel(http_cli)
+    if res == 1 then
+        log.info("cancel watch connection success")
+    else
+        log.error("cancel watch failed: ", err)
+    end
+end
+
+
+-- append res to the queue and notify pending watchers
+local function produce_res(res, err)
+    log.info("append res: ", inspect(res), ", err: ", inspect(err))
+    insert_tab(watch_ctx.res, {res=res, err=err})
+    for _, sema in pairs(watch_ctx.sema) do
+        sema:post()
+    end
+    table.clear(watch_ctx.sema)
+end
+
+
+local function run_watch(premature)
+    if premature then
+        return
+    end
+
+    local local_conf, err = config_local.local_conf()
+    if not local_conf then
+        error("no local conf: " .. err)
+    end
+    watch_ctx.prefix = local_conf.etcd.prefix .. "/"
+
+    watch_ctx.cli, err = get_etcd()
+    if not watch_ctx.cli then
+        error("failed to create etcd instance: " .. string(err))
+    end
+
+    local rev = 0
+    if loaded_configuration then
+        local _, res = next(loaded_configuration)
+        if res then
+            rev = tonumber(res.headers["X-Etcd-Index"])
+            assert(rev > 0, 'invalid res.headers["X-Etcd-Index"]')
+        end
+    end
+
+    if rev == 0 then
+        while true do
+            local res, err = watch_ctx.cli:get(watch_ctx.prefix)
+            if not res then
+                log.error("etcd get: ", err)
+                ngx_sleep(3)
+            else
+                watch_ctx.rev = tonumber(res.body.header.revision)
+                break
+            end
+        end
+    end
+
+    watch_ctx.rev = rev + 1
+    watch_ctx.started = true
+
+    log.warn("main etcd watcher started, revision=", watch_ctx.rev)
+    for _, sema in pairs(watch_ctx.wait_init) do
+        sema:post()
+    end
+    watch_ctx.wait_init = nil
+
+    local opts = {}
+    opts.timeout = 50 -- second

Review Comment:
   The choice of 50 seconds is to make it smaller than the default [proxy_read_timeout](http://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_read_timeout) value, 60 seconds, so that nginx will not print error logs, such as:
   
   ```
   2023/05/10 23:43:44 [error] 3668019#3668019: *809 upstream timed out (110: Connection timed out) while reading upstream, client: unix:, server: , request: "POST /v3/watch HTTP/1.1", upstream: "http://127.0.0.1:2379/v3/watch", host: "127.0.0.1"
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] kingluo commented on a diff in pull request #9456: feat(config_etcd): use a single long http connection to watch all resources

Posted by "kingluo (via GitHub)" <gi...@apache.org>.
kingluo commented on code in PR #9456:
URL: https://github.com/apache/apisix/pull/9456#discussion_r1191436047


##########
t/fuzzing/public.py:
##########
@@ -39,7 +39,10 @@ def check_log():
     apisix_errorlog = apisix_pwd() + "/logs/error.log"
     apisix_accesslog = apisix_pwd() + "/logs/access.log"
 
-    cmds = ['cat %s | grep -a "error" | grep -v "invalid request body"'%apisix_errorlog, 'cat %s | grep -a " 500 "'%apisix_accesslog]
+    cmds = ['cat %s | grep -a "error" \

Review Comment:
   filter out timeout logs from upstream connections used for watching.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] kingluo commented on a diff in pull request #9456: feat(config_etcd): use a single long http connection to watch all resources

Posted by "kingluo (via GitHub)" <gi...@apache.org>.
kingluo commented on code in PR #9456:
URL: https://github.com/apache/apisix/pull/9456#discussion_r1193006630


##########
ci/linux_openresty_runner.sh:
##########
@@ -18,5 +18,5 @@
 
 
 export OPENRESTY_VERSION=source
-export TEST_CI_USE_GRPC=true
+#export TEST_CI_USE_GRPC=true

Review Comment:
   We're going to start focusing on etcd's http tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] monkeyDluffy6017 commented on a diff in pull request #9456: feat(config_etcd): use a single long http connection to watch all resources

Posted by "monkeyDluffy6017 (via GitHub)" <gi...@apache.org>.
monkeyDluffy6017 commented on code in PR #9456:
URL: https://github.com/apache/apisix/pull/9456#discussion_r1198969861


##########
apisix/core/config_etcd.lua:
##########
@@ -75,6 +82,203 @@ local mt = {
 }
 
 
+local get_etcd
+do
+    local etcd_cli
+
+    function get_etcd()
+        if etcd_cli ~= nil then
+            return etcd_cli
+        end
+
+        local _, err
+        etcd_cli, _, err = etcd_apisix.get_etcd_syncer()
+        return etcd_cli, err
+    end
+end
+
+
+local function cancel_watch(http_cli)
+    local res, err = watch_ctx.cli:watchcancel(http_cli)
+    if res == 1 then
+        log.info("cancel watch connection success")
+    else
+        log.error("cancel watch failed: ", err)
+    end
+end
+
+
+-- append res to the queue and notify pending watchers
+local function produce_res(res, err)
+    log.info("append res: ", inspect(res), ", err: ", inspect(err))
+    insert_tab(watch_ctx.res, {res=res, err=err})
+    for _, sema in pairs(watch_ctx.sema) do
+        sema:post()
+    end
+    table.clear(watch_ctx.sema)
+end
+
+
+local function run_watch(premature)
+    if premature then
+        return
+    end
+
+    local local_conf, err = config_local.local_conf()
+    if not local_conf then
+        error("no local conf: " .. err)
+    end
+    watch_ctx.prefix = local_conf.etcd.prefix .. "/"
+
+    watch_ctx.cli, err = get_etcd()
+    if not watch_ctx.cli then
+        error("failed to create etcd instance: " .. string(err))
+    end
+
+    local rev = 0
+    if loaded_configuration then
+        local _, res = next(loaded_configuration)
+        if res then
+            rev = tonumber(res.headers["X-Etcd-Index"])
+            assert(rev > 0, 'invalid res.headers["X-Etcd-Index"]')
+        end
+    end
+
+    if rev == 0 then
+        while true do
+            local res, err = watch_ctx.cli:get(watch_ctx.prefix)
+            if not res then
+                log.error("etcd get: ", err)
+                ngx_sleep(3)
+            else
+                watch_ctx.rev = tonumber(res.body.header.revision)
+                break
+            end
+        end
+    end
+
+    watch_ctx.rev = rev + 1
+    watch_ctx.started = true
+
+    log.warn("main etcd watcher started, revision=", watch_ctx.rev)
+    for _, sema in pairs(watch_ctx.wait_init) do
+        sema:post()
+    end
+    watch_ctx.wait_init = nil
+
+    local opts = {}
+    opts.timeout = 50 -- second
+    opts.need_cancel = true
+
+    ::restart_watch::
+    while true do
+        opts.start_revision = watch_ctx.rev
+        log.info("restart watchdir: start_revision=", opts.start_revision)
+        local res_func, err, http_cli = watch_ctx.cli:watchdir(watch_ctx.prefix, opts)
+        if not res_func then
+            log.error("watchdir: ", err)
+            ngx_sleep(3)
+            goto restart_watch
+        end
+
+        ::watch_event::
+        while true do
+            local res, err = res_func()
+            log.info("res_func: ", inspect(res))
+
+            if not res then
+                if err ~= "closed" and
+                    err ~= "timeout" and
+                    err ~= "broken pipe" then
+                    log.error("wait watch event: ", err)
+                end

Review Comment:
   ```suggestion
                   if err ~= "closed" and
                       err ~= "timeout" and
                       err ~= "broken pipe" 
                   then
                       log.error("wait watch event: ", err)
                   end
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] monkeyDluffy6017 commented on a diff in pull request #9456: feat(config_etcd): use a single long http connection to watch all resources

Posted by "monkeyDluffy6017 (via GitHub)" <gi...@apache.org>.
monkeyDluffy6017 commented on code in PR #9456:
URL: https://github.com/apache/apisix/pull/9456#discussion_r1200230058


##########
apisix/core/config_etcd.lua:
##########
@@ -75,6 +85,208 @@ local mt = {
 }
 
 
+local get_etcd
+do
+    local etcd_cli
+
+    function get_etcd()
+        if etcd_cli ~= nil then
+            return etcd_cli
+        end
+
+        local _, err
+        etcd_cli, _, err = etcd_apisix.get_etcd_syncer()
+        return etcd_cli, err
+    end
+end
+
+
+local function cancel_watch(http_cli)
+    local res, err = watch_ctx.cli:watchcancel(http_cli)
+    if res == 1 then
+        log.info("cancel watch connection success")
+    else
+        log.error("cancel watch failed: ", err)
+    end
+end
+
+
+-- append res to the queue and notify pending watchers
+local function produce_res(res, err)
+    if log_level >= NGX_INFO then
+        log.info("append res: ", inspect(res), ", err: ", inspect(err))
+    end
+    insert_tab(watch_ctx.res, {res=res, err=err})
+    for _, sema in pairs(watch_ctx.sema) do
+        sema:post()
+    end
+    table.clear(watch_ctx.sema)
+end
+
+
+local function run_watch(premature)
+    if premature then
+        return
+    end
+
+    local local_conf, err = config_local.local_conf()
+    if not local_conf then
+        error("no local conf: " .. err)
+    end
+    watch_ctx.prefix = local_conf.etcd.prefix .. "/"
+
+    watch_ctx.cli, err = get_etcd()
+    if not watch_ctx.cli then
+        error("failed to create etcd instance: " .. string(err))
+    end
+
+    local rev = 0
+    if loaded_configuration then
+        local _, res = next(loaded_configuration)
+        if res then
+            rev = tonumber(res.headers["X-Etcd-Index"])
+            assert(rev > 0, 'invalid res.headers["X-Etcd-Index"]')
+        end
+    end
+
+    if rev == 0 then
+        while true do
+            local res, err = watch_ctx.cli:get(watch_ctx.prefix)
+            if not res then
+                log.error("etcd get: ", err)
+                ngx_sleep(3)
+            else
+                watch_ctx.rev = tonumber(res.body.header.revision)
+                break
+            end
+        end
+    end
+
+    watch_ctx.rev = rev + 1
+    watch_ctx.started = true
+
+    log.warn("main etcd watcher started, revision=", watch_ctx.rev)
+    for _, sema in pairs(watch_ctx.wait_init) do
+        sema:post()
+    end
+    watch_ctx.wait_init = nil
+
+    local opts = {}
+    opts.timeout = 50 -- second
+    opts.need_cancel = true
+
+    ::restart_watch::
+    while true do
+        opts.start_revision = watch_ctx.rev
+        log.info("restart watchdir: start_revision=", opts.start_revision)
+        local res_func, err, http_cli = watch_ctx.cli:watchdir(watch_ctx.prefix, opts)
+        if not res_func then
+            log.error("watchdir: ", err)
+            ngx_sleep(3)
+            goto restart_watch
+        end
+
+        ::watch_event::
+        while true do
+            local res, err = res_func()

Review Comment:
   It's very good design here, the server response is a stream, if we don't close the connection here, we could get the response event one by one



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] monkeyDluffy6017 commented on a diff in pull request #9456: feat(config_etcd): use a single long http connection to watch all resources

Posted by "monkeyDluffy6017 (via GitHub)" <gi...@apache.org>.
monkeyDluffy6017 commented on code in PR #9456:
URL: https://github.com/apache/apisix/pull/9456#discussion_r1201470844


##########
apisix/core/config_etcd.lua:
##########
@@ -157,45 +369,71 @@ local function flush_watching_streams(self)
 end
 
 
-local function http_waitdir(etcd_cli, key, modified_index, timeout)
-    local opts = {}
-    opts.start_revision = modified_index
-    opts.timeout = timeout
-    opts.need_cancel = true
-    local res_func, func_err, http_cli = etcd_cli:watchdir(key, opts)
-    if not res_func then
-        return nil, func_err
+local function http_waitdir(self, etcd_cli, key, modified_index, timeout)
+    if not watch_ctx.idx[key] then
+        watch_ctx.idx[key] = 1
     end
 
-    -- in etcd v3, the 1st res of watch is watch info, useless to us.
-    -- try twice to skip create info
-    local res, err = res_func()
-    if not res or not res.result or not res.result.events then
-        res, err = res_func()
-    end
+    ::iterate_events::
+    for i = watch_ctx.idx[key], #watch_ctx.res do
+        watch_ctx.idx[key] = i + 1
 
-    if http_cli then
-        local res_cancel, err_cancel = etcd_cli:watchcancel(http_cli)
-        if res_cancel == 1 then
-            log.info("cancel watch connection success")
-        else
-            log.error("cancel watch failed: ", err_cancel)
+        local item = watch_ctx.res[i]
+        if item == false then
+            goto iterate_events
+        end
+
+        local res, err = item.res, item.err
+        if err then
+            return res, err
+        end
+
+        local found = false
+        -- ignore res with revision smaller then self.prev_index
+        if tonumber(res.result.header.revision) > self.prev_index then
+            for _, evt in ipairs(res.result.events) do
+                if evt.kv.key:find(key) == 1 then
+                    found = true
+                    break
+                end
+            end
+        end
+
+        if found then
+            local res2 = tablex.deepcopy(res)
+            table.clear(res2.result.events)
+            for _, evt in ipairs(res.result.events) do
+                if evt.kv.key:find(key) == 1 then
+                    insert_tab(res2.result.events, evt)
+                end
+            end
+            if log_level >= NGX_INFO then
+                log.info("http_waitdir: ", inspect(res2))
+            end
+            return res2
         end

Review Comment:
   we don't need to iterate from the beginning of the `res.result.events` again



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] kingluo commented on a diff in pull request #9456: feat(config_etcd): use a single long http connection to watch all resources

Posted by "kingluo (via GitHub)" <gi...@apache.org>.
kingluo commented on code in PR #9456:
URL: https://github.com/apache/apisix/pull/9456#discussion_r1193006630


##########
ci/linux_openresty_runner.sh:
##########
@@ -18,5 +18,5 @@
 
 
 export OPENRESTY_VERSION=source
-export TEST_CI_USE_GRPC=true
+#export TEST_CI_USE_GRPC=true

Review Comment:
   we should focus on etcd http testing now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] monkeyDluffy6017 commented on a diff in pull request #9456: feat(config_etcd): use a single long http connection to watch all resources

Posted by "monkeyDluffy6017 (via GitHub)" <gi...@apache.org>.
monkeyDluffy6017 commented on code in PR #9456:
URL: https://github.com/apache/apisix/pull/9456#discussion_r1199011586


##########
apisix/core/config_etcd.lua:
##########
@@ -157,45 +361,69 @@ local function flush_watching_streams(self)
 end
 
 
-local function http_waitdir(etcd_cli, key, modified_index, timeout)
-    local opts = {}
-    opts.start_revision = modified_index
-    opts.timeout = timeout
-    opts.need_cancel = true
-    local res_func, func_err, http_cli = etcd_cli:watchdir(key, opts)
-    if not res_func then
-        return nil, func_err
+local function http_waitdir(self, etcd_cli, key, modified_index, timeout)
+    if not watch_ctx.idx[key] then
+        watch_ctx.idx[key] = 1
     end
 
-    -- in etcd v3, the 1st res of watch is watch info, useless to us.
-    -- try twice to skip create info
-    local res, err = res_func()
-    if not res or not res.result or not res.result.events then
-        res, err = res_func()
-    end
+    ::iterate_events::
+    for i = watch_ctx.idx[key],#watch_ctx.res do
+        watch_ctx.idx[key] = i+1

Review Comment:
   ```suggestion
           watch_ctx.idx[key] = i + 1
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] kingluo commented on a diff in pull request #9456: feat(config_etcd): use a single long http connection to watch all resources

Posted by "kingluo (via GitHub)" <gi...@apache.org>.
kingluo commented on code in PR #9456:
URL: https://github.com/apache/apisix/pull/9456#discussion_r1201450085


##########
apisix/core/config_etcd.lua:
##########
@@ -157,45 +369,71 @@ local function flush_watching_streams(self)
 end
 
 
-local function http_waitdir(etcd_cli, key, modified_index, timeout)
-    local opts = {}
-    opts.start_revision = modified_index
-    opts.timeout = timeout
-    opts.need_cancel = true
-    local res_func, func_err, http_cli = etcd_cli:watchdir(key, opts)
-    if not res_func then
-        return nil, func_err
+local function http_waitdir(self, etcd_cli, key, modified_index, timeout)
+    if not watch_ctx.idx[key] then
+        watch_ctx.idx[key] = 1

Review Comment:
   It's used to init the idx per resource. The min_idx is the global status for all resources. What do you mean?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] kingluo commented on a diff in pull request #9456: feat(config_etcd): use a single long http connection to watch all resources

Posted by "kingluo (via GitHub)" <gi...@apache.org>.
kingluo commented on code in PR #9456:
URL: https://github.com/apache/apisix/pull/9456#discussion_r1201492784


##########
apisix/core/config_etcd.lua:
##########
@@ -157,45 +369,71 @@ local function flush_watching_streams(self)
 end
 
 
-local function http_waitdir(etcd_cli, key, modified_index, timeout)
-    local opts = {}
-    opts.start_revision = modified_index
-    opts.timeout = timeout
-    opts.need_cancel = true
-    local res_func, func_err, http_cli = etcd_cli:watchdir(key, opts)
-    if not res_func then
-        return nil, func_err
+local function http_waitdir(self, etcd_cli, key, modified_index, timeout)
+    if not watch_ctx.idx[key] then
+        watch_ctx.idx[key] = 1
     end
 
-    -- in etcd v3, the 1st res of watch is watch info, useless to us.
-    -- try twice to skip create info
-    local res, err = res_func()
-    if not res or not res.result or not res.result.events then
-        res, err = res_func()
-    end
+    ::iterate_events::
+    for i = watch_ctx.idx[key], #watch_ctx.res do
+        watch_ctx.idx[key] = i + 1
 
-    if http_cli then
-        local res_cancel, err_cancel = etcd_cli:watchcancel(http_cli)
-        if res_cancel == 1 then
-            log.info("cancel watch connection success")
-        else
-            log.error("cancel watch failed: ", err_cancel)
+        local item = watch_ctx.res[i]
+        if item == false then
+            goto iterate_events
+        end
+
+        local res, err = item.res, item.err
+        if err then
+            return res, err
+        end
+
+        local found = false
+        -- ignore res with revision smaller then self.prev_index
+        if tonumber(res.result.header.revision) > self.prev_index then
+            for _, evt in ipairs(res.result.events) do
+                if evt.kv.key:find(key) == 1 then
+                    found = true
+                    break
+                end
+            end
+        end
+
+        if found then
+            local res2 = tablex.deepcopy(res)
+            table.clear(res2.result.events)
+            for _, evt in ipairs(res.result.events) do
+                if evt.kv.key:find(key) == 1 then
+                    insert_tab(res2.result.events, evt)
+                end
+            end
+            if log_level >= NGX_INFO then
+                log.info("http_waitdir: ", inspect(res2))
+            end
+            return res2
         end

Review Comment:
   fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] kingluo commented on a diff in pull request #9456: feat(config_etcd): use a single long http connection to watch all resources

Posted by "kingluo (via GitHub)" <gi...@apache.org>.
kingluo commented on code in PR #9456:
URL: https://github.com/apache/apisix/pull/9456#discussion_r1201492291


##########
apisix/core/config_etcd.lua:
##########
@@ -157,45 +369,71 @@ local function flush_watching_streams(self)
 end
 
 
-local function http_waitdir(etcd_cli, key, modified_index, timeout)
-    local opts = {}
-    opts.start_revision = modified_index
-    opts.timeout = timeout
-    opts.need_cancel = true
-    local res_func, func_err, http_cli = etcd_cli:watchdir(key, opts)
-    if not res_func then
-        return nil, func_err
+local function http_waitdir(self, etcd_cli, key, modified_index, timeout)
+    if not watch_ctx.idx[key] then
+        watch_ctx.idx[key] = 1

Review Comment:
   1. min_idx maybe 0, which is an internal state of the main watcher and comes from the summary of consumer idx of all resources, not reverse.
   2. after the first watch iteration, the consume idx of the resource will be kept in sync with the global min_idx.
   3. the sync gap is small, now the max gap is 100, so the overhead should be ignored



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] kingluo commented on a diff in pull request #9456: feat(config_etcd): use a single long http connection to watch all resources

Posted by "kingluo (via GitHub)" <gi...@apache.org>.
kingluo commented on code in PR #9456:
URL: https://github.com/apache/apisix/pull/9456#discussion_r1201459572


##########
apisix/core/config_etcd.lua:
##########
@@ -157,45 +369,71 @@ local function flush_watching_streams(self)
 end
 
 
-local function http_waitdir(etcd_cli, key, modified_index, timeout)
-    local opts = {}
-    opts.start_revision = modified_index
-    opts.timeout = timeout
-    opts.need_cancel = true
-    local res_func, func_err, http_cli = etcd_cli:watchdir(key, opts)
-    if not res_func then
-        return nil, func_err
+local function http_waitdir(self, etcd_cli, key, modified_index, timeout)
+    if not watch_ctx.idx[key] then
+        watch_ctx.idx[key] = 1

Review Comment:
   The consume idx of each resource must start from 1, because it needs to consume the earliest event!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] kingluo commented on a diff in pull request #9456: feat(config_etcd): use a single long http connection to watch all resources

Posted by "kingluo (via GitHub)" <gi...@apache.org>.
kingluo commented on code in PR #9456:
URL: https://github.com/apache/apisix/pull/9456#discussion_r1201459572


##########
apisix/core/config_etcd.lua:
##########
@@ -157,45 +369,71 @@ local function flush_watching_streams(self)
 end
 
 
-local function http_waitdir(etcd_cli, key, modified_index, timeout)
-    local opts = {}
-    opts.start_revision = modified_index
-    opts.timeout = timeout
-    opts.need_cancel = true
-    local res_func, func_err, http_cli = etcd_cli:watchdir(key, opts)
-    if not res_func then
-        return nil, func_err
+local function http_waitdir(self, etcd_cli, key, modified_index, timeout)
+    if not watch_ctx.idx[key] then
+        watch_ctx.idx[key] = 1

Review Comment:
   Each resource must start from 1, because it needs to consume the earliest event!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] monkeyDluffy6017 commented on a diff in pull request #9456: feat(config_etcd): use a single long http connection to watch all resources

Posted by "monkeyDluffy6017 (via GitHub)" <gi...@apache.org>.
monkeyDluffy6017 commented on code in PR #9456:
URL: https://github.com/apache/apisix/pull/9456#discussion_r1201469980


##########
apisix/core/config_etcd.lua:
##########
@@ -157,45 +369,71 @@ local function flush_watching_streams(self)
 end
 
 
-local function http_waitdir(etcd_cli, key, modified_index, timeout)
-    local opts = {}
-    opts.start_revision = modified_index
-    opts.timeout = timeout
-    opts.need_cancel = true
-    local res_func, func_err, http_cli = etcd_cli:watchdir(key, opts)
-    if not res_func then
-        return nil, func_err
+local function http_waitdir(self, etcd_cli, key, modified_index, timeout)
+    if not watch_ctx.idx[key] then
+        watch_ctx.idx[key] = 1
     end
 
-    -- in etcd v3, the 1st res of watch is watch info, useless to us.
-    -- try twice to skip create info
-    local res, err = res_func()
-    if not res or not res.result or not res.result.events then
-        res, err = res_func()
-    end
+    ::iterate_events::
+    for i = watch_ctx.idx[key], #watch_ctx.res do
+        watch_ctx.idx[key] = i + 1
 
-    if http_cli then
-        local res_cancel, err_cancel = etcd_cli:watchcancel(http_cli)
-        if res_cancel == 1 then
-            log.info("cancel watch connection success")
-        else
-            log.error("cancel watch failed: ", err_cancel)
+        local item = watch_ctx.res[i]
+        if item == false then
+            goto iterate_events
+        end
+
+        local res, err = item.res, item.err
+        if err then
+            return res, err
+        end
+
+        local found = false
+        -- ignore res with revision smaller then self.prev_index
+        if tonumber(res.result.header.revision) > self.prev_index then
+            for _, evt in ipairs(res.result.events) do
+                if evt.kv.key:find(key) == 1 then
+                    found = true
+                    break
+                end
+            end
+        end
+
+        if found then
+            local res2 = tablex.deepcopy(res)
+            table.clear(res2.result.events)
+            for _, evt in ipairs(res.result.events) do
+                if evt.kv.key:find(key) == 1 then
+                    insert_tab(res2.result.events, evt)
+                end
+            end
+            if log_level >= NGX_INFO then
+                log.info("http_waitdir: ", inspect(res2))
+            end
+            return res2
         end

Review Comment:
   ```suggestion
           local found = false
           local res2
           if tonumber(res.result.header.revision) > self.prev_index then
               for _, evt in ipairs(res.result.events) do
                   if evt.kv.key:find(key) == 1 then
                       if not found then 
                           res2 = tablex.deepcopy(res)
                           table.clear(res2.result.events)
                           found = true
                       end
   
                       insert_tab(res2.result.events, evt)
                   end
               end
           end
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] monkeyDluffy6017 commented on a diff in pull request #9456: feat(config_etcd): use a single long http connection to watch all resources

Posted by "monkeyDluffy6017 (via GitHub)" <gi...@apache.org>.
monkeyDluffy6017 commented on code in PR #9456:
URL: https://github.com/apache/apisix/pull/9456#discussion_r1201485215


##########
apisix/core/config_etcd.lua:
##########
@@ -157,45 +369,71 @@ local function flush_watching_streams(self)
 end
 
 
-local function http_waitdir(etcd_cli, key, modified_index, timeout)
-    local opts = {}
-    opts.start_revision = modified_index
-    opts.timeout = timeout
-    opts.need_cancel = true
-    local res_func, func_err, http_cli = etcd_cli:watchdir(key, opts)
-    if not res_func then
-        return nil, func_err
+local function http_waitdir(self, etcd_cli, key, modified_index, timeout)
+    if not watch_ctx.idx[key] then
+        watch_ctx.idx[key] = 1

Review Comment:
   so store the min_idx, and new resource could start from min_idx, because the event before min_idx is false, right ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] monkeyDluffy6017 commented on a diff in pull request #9456: feat(config_etcd): use a single long http connection to watch all resources

Posted by "monkeyDluffy6017 (via GitHub)" <gi...@apache.org>.
monkeyDluffy6017 commented on code in PR #9456:
URL: https://github.com/apache/apisix/pull/9456#discussion_r1201448807


##########
apisix/core/config_etcd.lua:
##########
@@ -157,45 +369,71 @@ local function flush_watching_streams(self)
 end
 
 
-local function http_waitdir(etcd_cli, key, modified_index, timeout)
-    local opts = {}
-    opts.start_revision = modified_index
-    opts.timeout = timeout
-    opts.need_cancel = true
-    local res_func, func_err, http_cli = etcd_cli:watchdir(key, opts)
-    if not res_func then
-        return nil, func_err
+local function http_waitdir(self, etcd_cli, key, modified_index, timeout)
+    if not watch_ctx.idx[key] then
+        watch_ctx.idx[key] = 1
     end
 
-    -- in etcd v3, the 1st res of watch is watch info, useless to us.
-    -- try twice to skip create info
-    local res, err = res_func()
-    if not res or not res.result or not res.result.events then
-        res, err = res_func()
-    end
+    ::iterate_events::
+    for i = watch_ctx.idx[key], #watch_ctx.res do
+        watch_ctx.idx[key] = i + 1
 
-    if http_cli then
-        local res_cancel, err_cancel = etcd_cli:watchcancel(http_cli)
-        if res_cancel == 1 then
-            log.info("cancel watch connection success")
-        else
-            log.error("cancel watch failed: ", err_cancel)
+        local item = watch_ctx.res[i]
+        if item == false then
+            goto iterate_events
+        end
+
+        local res, err = item.res, item.err
+        if err then
+            return res, err
+        end
+
+        local found = false
+        -- ignore res with revision smaller then self.prev_index
+        if tonumber(res.result.header.revision) > self.prev_index then
+            for _, evt in ipairs(res.result.events) do
+                if evt.kv.key:find(key) == 1 then
+                    found = true
+                    break
+                end
+            end
+        end
+
+        if found then
+            local res2 = tablex.deepcopy(res)
+            table.clear(res2.result.events)
+            for _, evt in ipairs(res.result.events) do
+                if evt.kv.key:find(key) == 1 then
+                    insert_tab(res2.result.events, evt)
+                end
+            end
+            if log_level >= NGX_INFO then
+                log.info("http_waitdir: ", inspect(res2))
+            end
+            return res2
         end

Review Comment:
   ```suggestion
           local events = {}
           if tonumber(res.result.header.revision) > self.prev_index then
               for _, evt in ipairs(res.result.events) do
                   if evt.kv.key:find(key) == 1 then
                       insert_tab(events, evt)
                   end
               end
           end
   
           if not core.table.isempty(events) then
               local res2 = tablex.deepcopy(res)
               res2.result.events = events
               return res2
           end
   ```



##########
apisix/core/config_etcd.lua:
##########
@@ -157,45 +369,71 @@ local function flush_watching_streams(self)
 end
 
 
-local function http_waitdir(etcd_cli, key, modified_index, timeout)
-    local opts = {}
-    opts.start_revision = modified_index
-    opts.timeout = timeout
-    opts.need_cancel = true
-    local res_func, func_err, http_cli = etcd_cli:watchdir(key, opts)
-    if not res_func then
-        return nil, func_err
+local function http_waitdir(self, etcd_cli, key, modified_index, timeout)
+    if not watch_ctx.idx[key] then
+        watch_ctx.idx[key] = 1

Review Comment:
   could we store the min_idx and assgin here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] kingluo commented on a diff in pull request #9456: feat(config_etcd): use a single long http connection to watch all resources

Posted by "kingluo (via GitHub)" <gi...@apache.org>.
kingluo commented on code in PR #9456:
URL: https://github.com/apache/apisix/pull/9456#discussion_r1201449493


##########
apisix/core/config_etcd.lua:
##########
@@ -157,45 +369,71 @@ local function flush_watching_streams(self)
 end
 
 
-local function http_waitdir(etcd_cli, key, modified_index, timeout)
-    local opts = {}
-    opts.start_revision = modified_index
-    opts.timeout = timeout
-    opts.need_cancel = true
-    local res_func, func_err, http_cli = etcd_cli:watchdir(key, opts)
-    if not res_func then
-        return nil, func_err
+local function http_waitdir(self, etcd_cli, key, modified_index, timeout)
+    if not watch_ctx.idx[key] then
+        watch_ctx.idx[key] = 1
     end
 
-    -- in etcd v3, the 1st res of watch is watch info, useless to us.
-    -- try twice to skip create info
-    local res, err = res_func()
-    if not res or not res.result or not res.result.events then
-        res, err = res_func()
-    end
+    ::iterate_events::
+    for i = watch_ctx.idx[key], #watch_ctx.res do
+        watch_ctx.idx[key] = i + 1
 
-    if http_cli then
-        local res_cancel, err_cancel = etcd_cli:watchcancel(http_cli)
-        if res_cancel == 1 then
-            log.info("cancel watch connection success")
-        else
-            log.error("cancel watch failed: ", err_cancel)
+        local item = watch_ctx.res[i]
+        if item == false then
+            goto iterate_events
+        end
+
+        local res, err = item.res, item.err
+        if err then
+            return res, err
+        end
+
+        local found = false
+        -- ignore res with revision smaller then self.prev_index
+        if tonumber(res.result.header.revision) > self.prev_index then
+            for _, evt in ipairs(res.result.events) do
+                if evt.kv.key:find(key) == 1 then
+                    found = true
+                    break
+                end
+            end
+        end
+
+        if found then
+            local res2 = tablex.deepcopy(res)
+            table.clear(res2.result.events)
+            for _, evt in ipairs(res.result.events) do
+                if evt.kv.key:find(key) == 1 then
+                    insert_tab(res2.result.events, evt)
+                end
+            end
+            if log_level >= NGX_INFO then
+                log.info("http_waitdir: ", inspect(res2))
+            end
+            return res2
         end

Review Comment:
   No need to create a new table (and replace a newly created sub-table) here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] monkeyDluffy6017 commented on a diff in pull request #9456: feat(config_etcd): use a single long http connection to watch all resources

Posted by "monkeyDluffy6017 (via GitHub)" <gi...@apache.org>.
monkeyDluffy6017 commented on code in PR #9456:
URL: https://github.com/apache/apisix/pull/9456#discussion_r1201495208


##########
apisix/core/config_etcd.lua:
##########
@@ -157,45 +369,71 @@ local function flush_watching_streams(self)
 end
 
 
-local function http_waitdir(etcd_cli, key, modified_index, timeout)
-    local opts = {}
-    opts.start_revision = modified_index
-    opts.timeout = timeout
-    opts.need_cancel = true
-    local res_func, func_err, http_cli = etcd_cli:watchdir(key, opts)
-    if not res_func then
-        return nil, func_err
+local function http_waitdir(self, etcd_cli, key, modified_index, timeout)
+    if not watch_ctx.idx[key] then
+        watch_ctx.idx[key] = 1

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] monkeyDluffy6017 merged pull request #9456: feat(config_etcd): use a single long http connection to watch all resources

Posted by "monkeyDluffy6017 (via GitHub)" <gi...@apache.org>.
monkeyDluffy6017 merged PR #9456:
URL: https://github.com/apache/apisix/pull/9456


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] monkeyDluffy6017 commented on a diff in pull request #9456: feat(config_etcd): use a single long http connection to watch all resources

Posted by "monkeyDluffy6017 (via GitHub)" <gi...@apache.org>.
monkeyDluffy6017 commented on code in PR #9456:
URL: https://github.com/apache/apisix/pull/9456#discussion_r1201460829


##########
apisix/core/config_etcd.lua:
##########
@@ -157,45 +369,71 @@ local function flush_watching_streams(self)
 end
 
 
-local function http_waitdir(etcd_cli, key, modified_index, timeout)
-    local opts = {}
-    opts.start_revision = modified_index
-    opts.timeout = timeout
-    opts.need_cancel = true
-    local res_func, func_err, http_cli = etcd_cli:watchdir(key, opts)
-    if not res_func then
-        return nil, func_err
+local function http_waitdir(self, etcd_cli, key, modified_index, timeout)
+    if not watch_ctx.idx[key] then
+        watch_ctx.idx[key] = 1

Review Comment:
   But the resource before the mini_idx have been cleaned?
   ```
               for i = 1, min_idx - 1 do
                   watch_ctx.res[i] = false
               end
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] kingluo commented on a diff in pull request #9456: feat(config_etcd): use a single long http connection to watch all resources

Posted by "kingluo (via GitHub)" <gi...@apache.org>.
kingluo commented on code in PR #9456:
URL: https://github.com/apache/apisix/pull/9456#discussion_r1201474534


##########
apisix/core/config_etcd.lua:
##########
@@ -157,45 +369,71 @@ local function flush_watching_streams(self)
 end
 
 
-local function http_waitdir(etcd_cli, key, modified_index, timeout)
-    local opts = {}
-    opts.start_revision = modified_index
-    opts.timeout = timeout
-    opts.need_cancel = true
-    local res_func, func_err, http_cli = etcd_cli:watchdir(key, opts)
-    if not res_func then
-        return nil, func_err
+local function http_waitdir(self, etcd_cli, key, modified_index, timeout)
+    if not watch_ctx.idx[key] then
+        watch_ctx.idx[key] = 1

Review Comment:
   It will skip the false items. When it starts to match, the queue may be not cleaned.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] kingluo commented on a diff in pull request #9456: feat(config_etcd): use a single long http connection to watch all resources

Posted by "kingluo (via GitHub)" <gi...@apache.org>.
kingluo commented on code in PR #9456:
URL: https://github.com/apache/apisix/pull/9456#discussion_r1191877838


##########
t/core/etcd-sync.t:
##########
@@ -22,65 +22,7 @@ run_tests;
 
 __DATA__
 
-=== TEST 1: minus timeout to watch repeatedly
---- yaml_config
-deployment:
-    role: traditional
-    role_traditional:
-        config_provider: etcd
-    etcd:
-        # this test requires the HTTP long pull as the gRPC stream is shared and can't change
-        # default timeout in the fly
-        use_grpc: false
-    admin:
-        admin_key: null
---- config
-    location /t {
-        content_by_lua_block {
-            local core = require("apisix.core")
-            local t = require("lib.test_admin").test
-
-            local consumers, _ = core.config.new("/consumers", {
-                automatic = true,
-                item_schema = core.schema.consumer,
-                timeout = 0.2
-            })
-
-            ngx.sleep(0.6)
-            local idx = consumers.prev_index
-
-            local code, body = t('/apisix/admin/consumers',
-                ngx.HTTP_PUT,
-                [[{
-                    "username": "jobs",
-                    "plugins": {
-                        "basic-auth": {
-                            "username": "jobs",
-                            "password": "123456"
-                        }
-                    }
-                }]])
-
-            ngx.sleep(2)
-            local new_idx = consumers.prev_index
-            core.log.info("idx:", idx, " new_idx: ", new_idx)
-            if new_idx > idx then
-                ngx.say("prev_index updated")
-            else
-                ngx.say("prev_index not update")
-            end
-        }
-    }
---- request
-GET /t
---- response_body
-prev_index updated
---- error_log eval
-qr/(create watch stream for key|cancel watch connection success)/

Review Comment:
   deprecated test. etcd watch is a single long connection now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] monkeyDluffy6017 commented on a diff in pull request #9456: feat(config_etcd): use a single long http connection to watch all resources

Posted by "monkeyDluffy6017 (via GitHub)" <gi...@apache.org>.
monkeyDluffy6017 commented on code in PR #9456:
URL: https://github.com/apache/apisix/pull/9456#discussion_r1199005319


##########
apisix/core/config_etcd.lua:
##########
@@ -75,6 +82,203 @@ local mt = {
 }
 
 
+local get_etcd
+do
+    local etcd_cli
+
+    function get_etcd()
+        if etcd_cli ~= nil then
+            return etcd_cli
+        end
+
+        local _, err
+        etcd_cli, _, err = etcd_apisix.get_etcd_syncer()
+        return etcd_cli, err
+    end
+end
+
+
+local function cancel_watch(http_cli)
+    local res, err = watch_ctx.cli:watchcancel(http_cli)
+    if res == 1 then
+        log.info("cancel watch connection success")
+    else
+        log.error("cancel watch failed: ", err)
+    end
+end
+
+
+-- append res to the queue and notify pending watchers
+local function produce_res(res, err)
+    log.info("append res: ", inspect(res), ", err: ", inspect(err))
+    insert_tab(watch_ctx.res, {res=res, err=err})
+    for _, sema in pairs(watch_ctx.sema) do
+        sema:post()
+    end
+    table.clear(watch_ctx.sema)
+end
+
+
+local function run_watch(premature)
+    if premature then
+        return
+    end
+
+    local local_conf, err = config_local.local_conf()
+    if not local_conf then
+        error("no local conf: " .. err)
+    end
+    watch_ctx.prefix = local_conf.etcd.prefix .. "/"
+
+    watch_ctx.cli, err = get_etcd()
+    if not watch_ctx.cli then
+        error("failed to create etcd instance: " .. string(err))
+    end
+
+    local rev = 0
+    if loaded_configuration then
+        local _, res = next(loaded_configuration)
+        if res then
+            rev = tonumber(res.headers["X-Etcd-Index"])
+            assert(rev > 0, 'invalid res.headers["X-Etcd-Index"]')
+        end
+    end
+
+    if rev == 0 then
+        while true do
+            local res, err = watch_ctx.cli:get(watch_ctx.prefix)
+            if not res then
+                log.error("etcd get: ", err)
+                ngx_sleep(3)
+            else
+                watch_ctx.rev = tonumber(res.body.header.revision)
+                break
+            end
+        end
+    end
+
+    watch_ctx.rev = rev + 1
+    watch_ctx.started = true
+
+    log.warn("main etcd watcher started, revision=", watch_ctx.rev)
+    for _, sema in pairs(watch_ctx.wait_init) do
+        sema:post()
+    end
+    watch_ctx.wait_init = nil
+
+    local opts = {}
+    opts.timeout = 50 -- second
+    opts.need_cancel = true
+
+    ::restart_watch::
+    while true do
+        opts.start_revision = watch_ctx.rev
+        log.info("restart watchdir: start_revision=", opts.start_revision)
+        local res_func, err, http_cli = watch_ctx.cli:watchdir(watch_ctx.prefix, opts)
+        if not res_func then
+            log.error("watchdir: ", err)
+            ngx_sleep(3)
+            goto restart_watch
+        end
+
+        ::watch_event::
+        while true do
+            local res, err = res_func()
+            log.info("res_func: ", inspect(res))
+
+            if not res then
+                if err ~= "closed" and
+                    err ~= "timeout" and
+                    err ~= "broken pipe" then
+                    log.error("wait watch event: ", err)
+                end
+                cancel_watch(http_cli)
+                break
+            end
+
+            if res.error then
+                log.error("wait watch event: ", inspect(res.error))
+                cancel_watch(http_cli)
+                break
+            end
+
+            if res.result.created then
+                goto watch_event
+            end
+
+            if res.result.canceled then
+                log.warn("watch canceled by etcd, res: ", inspect(res))
+                if res.result.compact_revision then
+                    watch_ctx.rev = tonumber(res.result.compact_revision)
+                    log.warn("etcd compacted, compact_revision=", watch_ctx.rev)
+                    produce_res(nil, "compacted")
+                end
+                cancel_watch(http_cli)
+                break
+            end
+
+            -- cleanup
+            local min_idx = 0
+            for _, idx in pairs(watch_ctx.idx) do
+                if (min_idx == 0) or (idx < min_idx) then
+                    min_idx = idx
+                end
+            end
+
+            for i = 1,min_idx-1 do
+                watch_ctx.res[i] = false
+            end
+
+            if min_idx > 100 then
+                for k, idx in pairs(watch_ctx.idx) do
+                    watch_ctx.idx[k] = idx-min_idx+1

Review Comment:
   ```suggestion
                       watch_ctx.idx[k] = idx - min_idx + 1
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] monkeyDluffy6017 commented on a diff in pull request #9456: feat(config_etcd): use a single long http connection to watch all resources

Posted by "monkeyDluffy6017 (via GitHub)" <gi...@apache.org>.
monkeyDluffy6017 commented on code in PR #9456:
URL: https://github.com/apache/apisix/pull/9456#discussion_r1199008097


##########
apisix/core/config_etcd.lua:
##########
@@ -75,6 +82,203 @@ local mt = {
 }
 
 
+local get_etcd
+do
+    local etcd_cli
+
+    function get_etcd()
+        if etcd_cli ~= nil then
+            return etcd_cli
+        end
+
+        local _, err
+        etcd_cli, _, err = etcd_apisix.get_etcd_syncer()
+        return etcd_cli, err
+    end
+end
+
+
+local function cancel_watch(http_cli)
+    local res, err = watch_ctx.cli:watchcancel(http_cli)
+    if res == 1 then
+        log.info("cancel watch connection success")
+    else
+        log.error("cancel watch failed: ", err)
+    end
+end
+
+
+-- append res to the queue and notify pending watchers
+local function produce_res(res, err)
+    log.info("append res: ", inspect(res), ", err: ", inspect(err))
+    insert_tab(watch_ctx.res, {res=res, err=err})
+    for _, sema in pairs(watch_ctx.sema) do
+        sema:post()
+    end
+    table.clear(watch_ctx.sema)
+end
+
+
+local function run_watch(premature)
+    if premature then
+        return
+    end
+
+    local local_conf, err = config_local.local_conf()
+    if not local_conf then
+        error("no local conf: " .. err)
+    end
+    watch_ctx.prefix = local_conf.etcd.prefix .. "/"
+
+    watch_ctx.cli, err = get_etcd()
+    if not watch_ctx.cli then
+        error("failed to create etcd instance: " .. string(err))
+    end
+
+    local rev = 0
+    if loaded_configuration then
+        local _, res = next(loaded_configuration)
+        if res then
+            rev = tonumber(res.headers["X-Etcd-Index"])
+            assert(rev > 0, 'invalid res.headers["X-Etcd-Index"]')
+        end
+    end
+
+    if rev == 0 then
+        while true do
+            local res, err = watch_ctx.cli:get(watch_ctx.prefix)
+            if not res then
+                log.error("etcd get: ", err)
+                ngx_sleep(3)
+            else
+                watch_ctx.rev = tonumber(res.body.header.revision)
+                break
+            end
+        end
+    end
+
+    watch_ctx.rev = rev + 1
+    watch_ctx.started = true
+
+    log.warn("main etcd watcher started, revision=", watch_ctx.rev)
+    for _, sema in pairs(watch_ctx.wait_init) do
+        sema:post()
+    end
+    watch_ctx.wait_init = nil
+
+    local opts = {}
+    opts.timeout = 50 -- second
+    opts.need_cancel = true
+
+    ::restart_watch::
+    while true do
+        opts.start_revision = watch_ctx.rev
+        log.info("restart watchdir: start_revision=", opts.start_revision)
+        local res_func, err, http_cli = watch_ctx.cli:watchdir(watch_ctx.prefix, opts)
+        if not res_func then
+            log.error("watchdir: ", err)
+            ngx_sleep(3)
+            goto restart_watch
+        end
+
+        ::watch_event::
+        while true do
+            local res, err = res_func()
+            log.info("res_func: ", inspect(res))
+
+            if not res then
+                if err ~= "closed" and
+                    err ~= "timeout" and
+                    err ~= "broken pipe" then
+                    log.error("wait watch event: ", err)
+                end
+                cancel_watch(http_cli)
+                break
+            end
+
+            if res.error then
+                log.error("wait watch event: ", inspect(res.error))
+                cancel_watch(http_cli)
+                break
+            end
+
+            if res.result.created then
+                goto watch_event
+            end
+
+            if res.result.canceled then
+                log.warn("watch canceled by etcd, res: ", inspect(res))
+                if res.result.compact_revision then
+                    watch_ctx.rev = tonumber(res.result.compact_revision)
+                    log.warn("etcd compacted, compact_revision=", watch_ctx.rev)
+                    produce_res(nil, "compacted")
+                end
+                cancel_watch(http_cli)
+                break
+            end
+
+            -- cleanup
+            local min_idx = 0
+            for _, idx in pairs(watch_ctx.idx) do
+                if (min_idx == 0) or (idx < min_idx) then
+                    min_idx = idx
+                end
+            end
+
+            for i = 1,min_idx-1 do
+                watch_ctx.res[i] = false
+            end
+
+            if min_idx > 100 then
+                for k, idx in pairs(watch_ctx.idx) do
+                    watch_ctx.idx[k] = idx-min_idx+1
+                end
+                -- trim the res table
+                for i = 1,min_idx-1 do

Review Comment:
   ```suggestion
                   for i = 1, min_idx - 1 do
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] monkeyDluffy6017 commented on pull request #9456: feat(config_etcd): use a single long http connection to watch all resources

Posted by "monkeyDluffy6017 (via GitHub)" <gi...@apache.org>.
monkeyDluffy6017 commented on PR #9456:
URL: https://github.com/apache/apisix/pull/9456#issuecomment-1554771715

   Why do you call this "long http connection", the http_cli will be recreated every time: https://github.com/api7/lua-resty-etcd/blob/master/lib/resty/etcd/v3.lua#L803?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org