You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by "soulbird (via GitHub)" <gi...@apache.org> on 2023/04/26 02:09:05 UTC

[GitHub] [apisix] soulbird commented on a diff in pull request #9204: fix(consul): support to fetch only health endpoint

soulbird commented on code in PR #9204:
URL: https://github.com/apache/apisix/pull/9204#discussion_r1177209512


##########
apisix/discovery/consul/init.lua:
##########
@@ -175,81 +188,309 @@ local function get_retry_delay(retry_delay)
     return retry_delay
 end
 
+local function watch_catalog(consul_server)
+    local opts
+    if consul_server.keepalive then
+        opts = {
+            host = consul_server.host,
+            port = consul_server.port,
+            connect_timeout = consul_server.connect_timeout,
+            read_timeout = consul_server.read_timeout,
+            default_args = {
+                wait = consul_server.wait_timeout, --blocked wait!=0; unblocked by wait=0
+                index = consul_server.catalog_index,
+            },
+        }
+    else
+        opts = {
+            host = consul_server.host,
+            port = consul_server.port,
+            connect_timeout = consul_server.connect_timeout,
+            read_timeout = consul_server.read_timeout,
+        }
+    end
+    local client = resty_consul:new(opts)
+
+    ::RETRY::
+    local watch_result, watch_err = client:get(consul_server.consul_watch_catalog_url)
+    local watch_error_info = (watch_err ~= nil and watch_err)
+            or ((watch_result ~= nil and watch_result.status ~= 200)
+            and watch_result.status)
+    if watch_error_info then
+        log.error("connect consul: ", consul_server.consul_server_url,
+            " by sub url: ", consul_server.consul_watch_catalog_url,
+            ", got watch result: ", json_delay_encode(watch_result, true),
+            ", with error: ", watch_error_info)
+
+        return watch_type_catalog, default_catalog_error_index
+    end
+    if consul_server.catalog_index > 0
+            and consul_server.catalog_index == tonumber(watch_result.headers['X-Consul-Index']) then
+        local random_delay = math_random(default_random_seed)
+        log.warn("watch catalog has no change, retry call consul after ", random_delay, " seconds")
+        core_sleep(random_delay)
+        goto RETRY
+    end
+    return watch_type_catalog, watch_result.headers['X-Consul-Index']
+end
+
+local function watch_health(consul_server)
+    local opts
+    if consul_server.keepalive then
+        opts = {
+            host = consul_server.host,
+            port = consul_server.port,
+            connect_timeout = consul_server.connect_timeout,
+            read_timeout = consul_server.read_timeout,
+            default_args = {
+                wait = consul_server.wait_timeout, --blocked wait!=0; unblocked by wait=0
+                index = consul_server.health_index,
+            },
+        }
+    else
+        opts = {
+            host = consul_server.host,
+            port = consul_server.port,
+            connect_timeout = consul_server.connect_timeout,
+            read_timeout = consul_server.read_timeout,
+        }
+    end
+    local client = resty_consul:new(opts)
+
+    ::RETRY::
+    local watch_result, watch_err = client:get(consul_server.consul_watch_health_url)
+    local watch_error_info = (watch_err ~= nil and watch_err)
+            or ((watch_result ~= nil and watch_result.status ~= 200)
+            and watch_result.status)

Review Comment:
   It is better to write it in the form of `if else`, the readability of this expression is too poor



##########
apisix/discovery/consul/init.lua:
##########
@@ -175,81 +188,309 @@ local function get_retry_delay(retry_delay)
     return retry_delay
 end
 
+local function watch_catalog(consul_server)
+    local opts
+    if consul_server.keepalive then
+        opts = {
+            host = consul_server.host,
+            port = consul_server.port,
+            connect_timeout = consul_server.connect_timeout,
+            read_timeout = consul_server.read_timeout,
+            default_args = {
+                wait = consul_server.wait_timeout, --blocked wait!=0; unblocked by wait=0
+                index = consul_server.catalog_index,
+            },
+        }
+    else
+        opts = {
+            host = consul_server.host,
+            port = consul_server.port,
+            connect_timeout = consul_server.connect_timeout,
+            read_timeout = consul_server.read_timeout,
+        }
+    end
+    local client = resty_consul:new(opts)
+
+    ::RETRY::
+    local watch_result, watch_err = client:get(consul_server.consul_watch_catalog_url)
+    local watch_error_info = (watch_err ~= nil and watch_err)
+            or ((watch_result ~= nil and watch_result.status ~= 200)
+            and watch_result.status)
+    if watch_error_info then
+        log.error("connect consul: ", consul_server.consul_server_url,
+            " by sub url: ", consul_server.consul_watch_catalog_url,
+            ", got watch result: ", json_delay_encode(watch_result, true),
+            ", with error: ", watch_error_info)
+
+        return watch_type_catalog, default_catalog_error_index
+    end
+    if consul_server.catalog_index > 0
+            and consul_server.catalog_index == tonumber(watch_result.headers['X-Consul-Index']) then
+        local random_delay = math_random(default_random_seed)
+        log.warn("watch catalog has no change, retry call consul after ", random_delay, " seconds")
+        core_sleep(random_delay)
+        goto RETRY
+    end
+    return watch_type_catalog, watch_result.headers['X-Consul-Index']
+end
+
+local function watch_health(consul_server)
+    local opts
+    if consul_server.keepalive then

Review Comment:
   It looks like you can write a `get_opts` function



##########
apisix/discovery/consul/init.lua:
##########
@@ -175,81 +188,309 @@ local function get_retry_delay(retry_delay)
     return retry_delay
 end
 
+local function watch_catalog(consul_server)
+    local opts
+    if consul_server.keepalive then
+        opts = {
+            host = consul_server.host,
+            port = consul_server.port,
+            connect_timeout = consul_server.connect_timeout,
+            read_timeout = consul_server.read_timeout,
+            default_args = {
+                wait = consul_server.wait_timeout, --blocked wait!=0; unblocked by wait=0
+                index = consul_server.catalog_index,
+            },
+        }
+    else
+        opts = {
+            host = consul_server.host,
+            port = consul_server.port,
+            connect_timeout = consul_server.connect_timeout,
+            read_timeout = consul_server.read_timeout,
+        }
+    end
+    local client = resty_consul:new(opts)
+
+    ::RETRY::
+    local watch_result, watch_err = client:get(consul_server.consul_watch_catalog_url)
+    local watch_error_info = (watch_err ~= nil and watch_err)
+            or ((watch_result ~= nil and watch_result.status ~= 200)
+            and watch_result.status)
+    if watch_error_info then
+        log.error("connect consul: ", consul_server.consul_server_url,
+            " by sub url: ", consul_server.consul_watch_catalog_url,
+            ", got watch result: ", json_delay_encode(watch_result, true),
+            ", with error: ", watch_error_info)
+
+        return watch_type_catalog, default_catalog_error_index
+    end
+    if consul_server.catalog_index > 0
+            and consul_server.catalog_index == tonumber(watch_result.headers['X-Consul-Index']) then
+        local random_delay = math_random(default_random_seed)
+        log.warn("watch catalog has no change, retry call consul after ", random_delay, " seconds")
+        core_sleep(random_delay)
+        goto RETRY
+    end
+    return watch_type_catalog, watch_result.headers['X-Consul-Index']
+end
+
+local function watch_health(consul_server)
+    local opts
+    if consul_server.keepalive then
+        opts = {
+            host = consul_server.host,
+            port = consul_server.port,
+            connect_timeout = consul_server.connect_timeout,
+            read_timeout = consul_server.read_timeout,
+            default_args = {
+                wait = consul_server.wait_timeout, --blocked wait!=0; unblocked by wait=0
+                index = consul_server.health_index,
+            },
+        }
+    else
+        opts = {
+            host = consul_server.host,
+            port = consul_server.port,
+            connect_timeout = consul_server.connect_timeout,
+            read_timeout = consul_server.read_timeout,
+        }
+    end
+    local client = resty_consul:new(opts)
+
+    ::RETRY::
+    local watch_result, watch_err = client:get(consul_server.consul_watch_health_url)
+    local watch_error_info = (watch_err ~= nil and watch_err)
+            or ((watch_result ~= nil and watch_result.status ~= 200)
+            and watch_result.status)
+    if watch_error_info then
+        log.error("connect consul: ", consul_server.consul_server_url,
+            " by sub url: ", consul_server.consul_watch_health_url,
+            ", got watch result: ", json_delay_encode(watch_result, true),
+            ", with error: ", watch_error_info)
+
+        return watch_type_health, default_health_error_index
+    end
+    if consul_server.health_index > 0
+            and consul_server.health_index == tonumber(watch_result.headers['X-Consul-Index']) then
+        local random_delay = math_random(default_random_seed)
+        log.warn("watch health has no change, retry call consul after ", random_delay, " seconds")
+        core_sleep(random_delay)
+        goto RETRY
+    end
+    return watch_type_health, watch_result.headers['X-Consul-Index']
+end
+
+local function check_keepalive(consul_server, retry_delay)
+    if consul_server.keepalive then
+        local ok, err = ngx_timer_at(0, _M.connect, consul_server, retry_delay)
+        if not ok then
+            log.error("create ngx_timer_at got error: ", err)
+            return
+        end
+    end
+end
+
+local function update_index(consul_server, catalog_index, health_index)
+    local c_index = 0
+    local h_index = 0
+    if catalog_index ~= nil then
+        c_index = tonumber(catalog_index)
+    end
+
+    if health_index ~= nil then
+        h_index = tonumber(health_index)
+    end
+
+    if c_index > 0 then
+        consul_server.catalog_index = c_index
+    end
+
+    if h_index > 0 then
+        consul_server.health_index = h_index
+    end
+end
+
+local function is_not_empty(value)
+    if value == nil or value == null
+            or (type(value) == "table" and not next(value))
+            or (type(value) == "string" and value == "") then
+        return false
+    end
+
+    return true
+end
+
+local function watch_result_is_valid(watch_type, index, catalog_index, health_index)
+    if index <= 0 then
+        return false
+    end
+
+    if watch_type == watch_type_catalog then
+        if index == catalog_index then
+            return false
+        end
+    else
+        if index == health_index then
+            return false
+        end
+    end
+
+    return true
+end
 
 function _M.connect(premature, consul_server, retry_delay)
     if premature then
         return
     end
 
+    local catalog_thread, spawn_catalog_err = thread_spawn(watch_catalog, consul_server)
+    if not catalog_thread then
+        log.error("failed to spawn thread watch catalog: ", spawn_catalog_err)
+        local random_delay = math_random(default_random_seed)
+        log.warn("failed to spawn thread watch catalog, retry connecting consul after ",

Review Comment:
   duplicate log



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org