You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by GitBox <gi...@apache.org> on 2022/11/23 06:45:44 UTC

[GitHub] [apisix] Fabriceli opened a new pull request, #8380: Feat: add consul discovery module

Fabriceli opened a new pull request, #8380:
URL: https://github.com/apache/apisix/pull/8380

   ### Description
   
   As I mentioned previously in #8371 , my team submit our `consul` discovery module
   
   
   ### Checklist
   
   - [x] I have explained the need for this PR and the problem it solves
   - [x] I have explained the changes or the new features added to this PR
   - [x] I have added tests corresponding to this change
   - [ ] I have updated the documentation to reflect this change
   - [x] I have verified that this change is backward compatible (If not, please discuss on the [APISIX mailing list](https://github.com/apache/apisix/tree/master#community) first)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] Fabriceli commented on pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
Fabriceli commented on PR #8380:
URL: https://github.com/apache/apisix/pull/8380#issuecomment-1338501190

   Some error in Chaos Test, May you give some hint about that? cc @spacewander @tzssangglass 
   ```sh
   Failure [0.077 seconds]
   Test APISIX Delay When Add ETCD Delay
   /home/runner/work/apisix/apisix/t/chaos/delayetcd/delayetcd.go:100
     get default apisix delay [It]
     /home/runner/work/apisix/apisix/t/chaos/delayetcd/delayetcd.go:133
   
     Expected
         <time.Duration>: 152[46](https://github.com/apache/apisix/actions/runs/3617352817/jobs/6100135416#step:11:47)514
     to be <
         <time.Duration>: 1[50](https://github.com/apache/apisix/actions/runs/3617352817/jobs/6100135416#step:11:51)00000
   
     /home/runner/work/apisix/apisix/t/chaos/delayetcd/delayetcd.go:136
   ```
   Error Stack: https://github.com/apache/apisix/actions/runs/3617352817/jobs/6100135416 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] Fabriceli commented on pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
Fabriceli commented on PR #8380:
URL: https://github.com/apache/apisix/pull/8380#issuecomment-1335082390

   > @Fabriceli Could you follow the discussion in [#8433 (comment)](https://github.com/apache/apisix/pull/8433#issuecomment-1334709384)? Thanks!
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] Fabriceli commented on a diff in pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
Fabriceli commented on code in PR #8380:
URL: https://github.com/apache/apisix/pull/8380#discussion_r1035653792


##########
apisix/discovery/consul/init.lua:
##########
@@ -0,0 +1,418 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local require            = require
+local local_conf         = require("apisix.core.config_local").local_conf()
+local core               = require("apisix.core")
+local core_sleep         = require("apisix.core.utils").sleep
+local resty_consul       = require('resty.consul')
+local http               = require('resty.http')
+local util               = require("apisix.cli.util")
+local ipairs             = ipairs
+local error              = error
+local ngx                = ngx
+local unpack             = unpack
+local tonumber           = tonumber
+local pairs              = pairs
+local ngx_timer_at       = ngx.timer.at
+local ngx_timer_every    = ngx.timer.every
+local log                = core.log
+local json_delay_encode  = core.json.delay_encode
+
+local all_services = core.table.new(0, 5)
+local default_service
+local default_weight
+local skip_service_map = core.table.new(0, 1)
+local dump_params
+
+local events
+local events_list
+local consul_services
+
+local _M = {
+    version = 0.1,
+}
+
+
+local function discovery_consul_callback(data, event, source, pid)
+    all_services = data
+    log.notice("update local variable all_services, event is: ", event,
+            "source: ", source, "server pid:", pid,
+            ", all services: ", json_delay_encode(all_services, true))
+end
+
+
+function _M.all_nodes()
+    return all_services
+end
+
+
+function _M.nodes(service_name)
+    if not all_services then
+        log.error("all_services is nil, failed to fetch nodes for : ", service_name)
+        return
+    end
+
+    local resp_list = all_services[service_name]
+
+    if not resp_list then
+        log.error("fetch nodes failed by ", service_name, ", return default service")
+        return default_service and {default_service}
+    end
+
+    log.info("process id: ", ngx.worker.id(), ", all_services[", service_name, "] = ",
+            json_delay_encode(resp_list, true))
+
+    return resp_list
+end
+
+
+local function parse_instance(node)
+    local service_name, host, port = node.Service, node.Address, node.Port
+    -- if exist, skip special service name
+    if service_name and skip_service_map[service_name] then
+        return false
+    end
+    return true, host, tonumber(port), "", service_name
+end
+
+
+local function update_all_services(server_name_prefix, data)
+    local sn
+    local up_services = core.table.new(0, #data)
+    local weight = default_weight
+    for _, node in pairs(data) do
+        local succ, ip, port, metadata, server_name = parse_instance(node)
+        if succ then
+            sn = server_name
+            local nodes = up_services[sn]
+            if not nodes then
+                nodes = core.table.new(1, 0)
+                up_services[sn] = nodes
+            end
+            core.table.insert(nodes, {
+                host = ip,
+                port = port,
+                weight = metadata and metadata.weight or weight,
+            })
+        end
+    end
+
+    -- clean old unused data
+    local old_services = consul_services[server_name_prefix] or {}
+    for k, _ in pairs(old_services) do
+        all_services[k] = nil
+    end
+    core.table.clear(old_services)
+
+    for k, v in pairs(up_services) do
+        all_services[k] = v
+    end
+    consul_services[server_name_prefix] = up_services
+
+    log.info("update all services: ", json_delay_encode(all_services, true))
+end
+
+
+local function read_dump_services()
+    local data, err = util.read_file(dump_params.path)
+    if not data then
+        log.error("read dump file get error: ", err)
+        return
+    end
+
+    log.info("read dump file: ", data)
+    data = util.trim(data)
+    if #data == 0 then
+        log.error("dump file is empty")
+        return
+    end
+
+    local entity, err  = core.json.decode(data)
+    if not entity then
+        log.error("decoded dump data got error: ", err, ", file content: ", data)
+        return
+    end
+
+    if not entity.services or not entity.last_update then
+        log.warn("decoded dump data miss fields, file content: ", data)
+        return
+    end
+
+    local now_time = ngx.time()
+    log.info("dump file last_update: ", entity.last_update, ", dump_params.expire: ",
+            dump_params.expire, ", now_time: ", now_time)
+    if dump_params.expire ~= 0  and (entity.last_update + dump_params.expire) < now_time then
+        log.warn("dump file: ", dump_params.path, " had expired, ignored it")
+        return
+    end
+
+    all_services = entity.services
+    log.info("load dump file into memory success")
+end
+
+
+local function write_dump_services()
+    local entity = {
+        services = all_services,
+        last_update = ngx.time(),
+        expire = dump_params.expire, -- later need handle it
+    }
+    local data = core.json.encode(entity)
+    local succ, err =  util.write_file(dump_params.path, data)
+    if not succ then
+        log.error("write dump into file got error: ", err)
+    end
+end
+
+
+local function show_dump_file()
+    if not dump_params then
+        return 503, "dump params is nil"
+    end
+
+    local data, err = util.read_file(dump_params.path)
+    if not data then
+        return 503, err
+    end
+
+    return 200, data
+end
+
+
+function _M.connect(premature, consul_server, retry_delay)
+    if premature then
+        return
+    end
+
+    local consul_client = resty_consul:new({
+        host = consul_server.host,
+        port = consul_server.port,
+        connect_timeout = consul_server.connect_timeout,
+        read_timeout = consul_server.read_timeout,
+        default_args = consul_server.default_args,
+    })
+
+    log.info("consul_server: ", json_delay_encode(consul_server, true))
+    local watch_result, watch_err = consul_client:get(consul_server.consul_watch_sub_url)
+    local watch_error_info = (watch_err ~= nil and watch_err)
+            or ((watch_result ~= nil and watch_result.status ~= 200)
+            and watch_result.status)
+    if watch_error_info then
+        log.error("connect consul: ", consul_server.consul_server_url,
+                " by sub url: ", consul_server.consul_watch_sub_url,
+                ", got watch result: ", json_delay_encode(watch_result, true),
+                 ", with error: ", watch_error_info)
+
+        if not retry_delay then
+            retry_delay = 1
+        else
+            retry_delay = retry_delay * 4
+        end
+
+        log.warn("retry connecting consul after ", retry_delay, " seconds")
+        core_sleep(retry_delay)
+
+        goto ERR
+    end
+
+    log.info("connect consul: ", consul_server.consul_server_url,
+            ", watch_result status: ", watch_result.status,
+            ", watch_result.headers.index: ", watch_result.headers['X-Consul-Index'],
+            ", consul_server.index: ", consul_server.index,
+            ", consul_server: ", json_delay_encode(consul_server, true))
+
+    -- if current index different last index then update service
+    if consul_server.index ~= watch_result.headers['X-Consul-Index'] then
+
+        -- fetch all services info
+        local result, err = consul_client:get(consul_server.consul_sub_url)
+
+        local error_info = (err ~= nil and err) or
+         ((result ~= nil and result.status ~= 200) and result.status)
+
+        if error_info then
+            log.error("connect consul: ", consul_server.consul_server_url,
+                    " by sub url: ", consul_server.consul_sub_url,
+                    ", got result: ", json_delay_encode(result, true),
+                    ", with error: ", error_info)
+
+            if not retry_delay then
+                retry_delay = 1
+            else
+                retry_delay = retry_delay * 4
+            end
+
+            log.warn("retry connecting consul after ", retry_delay, " seconds")
+            core_sleep(retry_delay)
+
+            goto ERR
+        end
+
+        consul_server.index = watch_result.headers['X-Consul-Index']
+        -- only long connect type use index
+        if consul_server.keepalive then
+            consul_server.default_args.index = watch_result.headers['X-Consul-Index']
+        end
+        -- decode body, decode json, update service, error handling
+        if result.body then
+            log.notice("server_name: ", consul_server.consul_server_url,
+                    ", header: ", json_delay_encode(result.headers, true),
+                    ", body: ", json_delay_encode(result.body, true))
+            update_all_services(consul_server.consul_server_url, result.body)
+            --update events
+            local ok, err = events.post(events_list._source, events_list.updating, all_services)
+            if not ok then
+                log.error("post_event failure with ", events_list._source,
+                        ", update all services error: ", err)
+            end
+
+            if dump_params then
+                ngx_timer_at(0, write_dump_services)
+            end
+        end
+    end
+
+    :: ERR ::
+    local keepalive = consul_server.keepalive
+    if keepalive then
+        local ok, err = ngx_timer_at(0, _M.connect, consul_server, retry_delay)
+        if not ok then
+            log.error("create ngx_timer_at got error: ", err)
+            return
+        end
+    end
+end
+
+
+local function format_consul_params(consul_conf)
+    local consul_server_list = core.table.new(0, #consul_conf.servers)
+    local args
+
+    if consul_conf.keepalive == false then
+        args = {}
+    elseif consul_conf.keepalive then
+        args = {
+            wait = consul_conf.timeout.wait, --blocked wait!=0; unblocked by wait=0
+            index = 0,
+        }
+    end
+
+    for _, v in pairs(consul_conf.servers) do
+        local scheme, host, port, path = unpack(http.parse_uri(nil, v))
+        if scheme ~= "http" then
+            return nil, "only support consul http schema address, eg: http://address:port"
+        elseif path ~= "/" or core.string.has_suffix(v, '/') then
+            return nil, "invalid consul server address, the valid format: http://address:port"
+        end
+
+        core.table.insert(consul_server_list, {
+            host = host,
+            port = port,
+            connect_timeout = consul_conf.timeout.connect,
+            read_timeout = consul_conf.timeout.read,
+            consul_sub_url = "/agent/services",
+            consul_watch_sub_url = "/catalog/services",
+            consul_server_url = v .. "/v1",
+            weight = consul_conf.weight,
+            keepalive = consul_conf.keepalive,
+            default_args = args,
+            index = 0,
+            fetch_interval = consul_conf.fetch_interval -- fetch interval to next connect consul
+        })
+    end
+
+    return consul_server_list
+end
+
+
+function _M.init_worker()
+    local consul_conf = local_conf.discovery.consul
+
+    if consul_conf.dump then
+        local dump = consul_conf.dump
+        dump_params = dump
+
+        if dump.load_on_init then
+            read_dump_services()
+        end
+    end
+
+    events = require("resty.worker.events")
+    events_list = events.event_list(
+            "discovery_consul_update_all_services",
+            "updating"
+    )
+
+    if 0 ~= ngx.worker.id() then
+        events.register(discovery_consul_callback, events_list._source, events_list.updating)
+        return
+    end
+
+    log.notice("consul_conf: ", json_delay_encode(consul_conf, true))
+    default_weight = consul_conf.weight
+    -- set default service, used when the server node cannot be found
+    if consul_conf.default_service then
+        default_service = consul_conf.default_service
+        default_service.weight = default_weight
+    end
+    if consul_conf.skip_services then
+        skip_service_map = core.table.new(0, #consul_conf.skip_services)
+        for _, v in ipairs(consul_conf.skip_services) do
+            skip_service_map[v] = true
+        end
+    end
+
+    local consul_servers_list, err = format_consul_params(consul_conf)
+    if err then
+        error(err)
+        return

Review Comment:
   the `format_consul_params` will return error if the config uri is not valid



##########
conf/config-default.yaml:
##########
@@ -309,6 +309,29 @@ nginx_config:                     # config for render the template to generate n
 #    dump:                         # if you need, when registered nodes updated can dump into file
 #       path: "logs/consul_kv.dump"
 #       expire: 2592000            # unit sec, here is 30 day
+#  consul:
+#    servers:
+#      - "http://127.0.0.1:8500"
+#      - "http://127.0.0.1:8600"
+#    skip_services:                    # if you need to skip special keys

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] Fabriceli commented on pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
Fabriceli commented on PR #8380:
URL: https://github.com/apache/apisix/pull/8380#issuecomment-1329945111

   @spacewander @tzssangglass I had finished, and I had fixed all the CI error


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] Fabriceli commented on pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
Fabriceli commented on PR #8380:
URL: https://github.com/apache/apisix/pull/8380#issuecomment-1333638582

   @spacewander @tzssangglass the [CI](https://github.com/apache/apisix/actions/runs/3590288740/jobs/6047153281) with the t/xds-library/config_xds_2.t TEST 7 is not stable, I did not modified anything about that


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] Fabriceli commented on a diff in pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
Fabriceli commented on code in PR #8380:
URL: https://github.com/apache/apisix/pull/8380#discussion_r1039168426


##########
t/discovery/consul.t:
##########
@@ -0,0 +1,581 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+log_level('info');
+no_root_location();
+no_shuffle();
+
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    my $http_config = $block->http_config // <<_EOC_;
+
+    server {
+        listen 20999;
+
+        location / {
+            content_by_lua_block {
+                ngx.say("missing consul services")
+            }
+        }
+    }
+
+    server {
+        listen 30511;
+
+        location /hello {
+            content_by_lua_block {
+                ngx.say("server 1")
+            }
+        }
+    }
+    server {
+        listen 30512;
+
+        location /hello {
+            content_by_lua_block {
+                ngx.say("server 2")
+            }
+        }
+    }
+    server {
+        listen 30513;
+
+        location /hello {
+            content_by_lua_block {
+                ngx.say("server 3")
+            }
+        }
+    }
+    server {
+        listen 30514;
+
+        location /hello {
+            content_by_lua_block {
+                ngx.say("server 4")
+            }
+        }
+    }
+_EOC_
+
+    $block->set_value("http_config", $http_config);
+});
+
+our $yaml_config = <<_EOC_;
+apisix:
+  node_listen: 1984
+  enable_control: true
+  control:
+    ip: 127.0.0.1
+    port: 9090
+deployment:
+  role: data_plane
+  role_data_plane:
+    config_provider: yaml
+discovery:
+  consul:
+    servers:
+      - "http://127.0.0.1:8500"
+      - "http://127.0.0.1:8600"
+    skip_services:
+      - "service_c"
+    timeout:
+      connect: 1000
+      read: 1000
+      wait: 60
+    weight: 1
+    fetch_interval: 1
+    keepalive: true
+    default_service:
+      host: "127.0.0.1"
+      port: 20999
+      metadata:
+        fail_timeout: 1
+        weight: 1
+        max_fails: 1
+_EOC_
+
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: prepare consul catalog register nodes
+--- config
+location /consul1 {
+    rewrite  ^/consul1/(.*) /v1/agent/service/$1 break;
+    proxy_pass http://127.0.0.1:8500;
+}
+
+location /consul2 {
+    rewrite  ^/consul2/(.*) /v1/agent/service/$1 break;
+    proxy_pass http://127.0.0.1:8600;
+}
+--- pipelined_requests eval
+[
+    "PUT /consul1/deregister/service_a1",
+    "PUT /consul1/deregister/service_b1",
+    "PUT /consul1/deregister/service_a2",
+    "PUT /consul1/deregister/service_b2",
+    "PUT /consul2/deregister/service_a1",
+    "PUT /consul2/deregister/service_b1",
+    "PUT /consul2/deregister/service_a2",
+    "PUT /consul2/deregister/service_b2",
+    "PUT /consul1/register\n" . "{\"ID\":\"service_a1\",\"Name\":\"service_a\",\"Tags\":[\"primary\",\"v1\"],\"Address\":\"127.0.0.1\",\"Port\":30511,\"Meta\":{\"service_a_version\":\"4.0\"},\"EnableTagOverride\":false,\"Weights\":{\"Passing\":10,\"Warning\":1}}",
+    "PUT /consul1/register\n" . "{\"ID\":\"service_a2\",\"Name\":\"service_a\",\"Tags\":[\"primary\",\"v1\"],\"Address\":\"127.0.0.1\",\"Port\":30512,\"Meta\":{\"service_a_version\":\"4.0\"},\"EnableTagOverride\":false,\"Weights\":{\"Passing\":10,\"Warning\":1}}",
+    "PUT /consul1/register\n" . "{\"ID\":\"service_b1\",\"Name\":\"service_b\",\"Tags\":[\"primary\",\"v1\"],\"Address\":\"127.0.0.1\",\"Port\":30513,\"Meta\":{\"service_b_version\":\"4.1\"},\"EnableTagOverride\":false,\"Weights\":{\"Passing\":10,\"Warning\":1}}",
+    "PUT /consul1/register\n" . "{\"ID\":\"service_b2\",\"Name\":\"service_b\",\"Tags\":[\"primary\",\"v1\"],\"Address\":\"127.0.0.1\",\"Port\":30514,\"Meta\":{\"service_b_version\":\"4.1\"},\"EnableTagOverride\":false,\"Weights\":{\"Passing\":10,\"Warning\":1}}",
+]
+--- response_body eval
+["", "", "", "", "", "", "", "", "", "", "", ""]

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] Fabriceli commented on a diff in pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
Fabriceli commented on code in PR #8380:
URL: https://github.com/apache/apisix/pull/8380#discussion_r1032014328


##########
docs/en/latest/discovery/consul.md:
##########
@@ -0,0 +1,296 @@
+---
+title: consul
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+## Summary
+
+APACHE APISIX supports Consul as a service discovery
+
+## Configuration for discovery client
+
+### Configuration for Consul
+
+First of all, we need to add following configuration in `conf/config.yaml` :
+
+```yaml
+discovery:
+  consul:
+    servers:
+      - "http://127.0.0.1:8500"
+      - "http://127.0.0.1:8600"

Review Comment:
   they are different clusters, i add some comment in consul.md



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] Fabriceli commented on pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
Fabriceli commented on PR #8380:
URL: https://github.com/apache/apisix/pull/8380#issuecomment-1333067921

   @spacewander @tzssangglass Updated, please check again, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] tzssangglass commented on a diff in pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
tzssangglass commented on code in PR #8380:
URL: https://github.com/apache/apisix/pull/8380#discussion_r1039116651


##########
t/discovery/consul.t:
##########
@@ -0,0 +1,581 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+log_level('info');
+no_root_location();
+no_shuffle();
+
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    my $http_config = $block->http_config // <<_EOC_;
+
+    server {
+        listen 20999;
+
+        location / {
+            content_by_lua_block {
+                ngx.say("missing consul services")
+            }
+        }
+    }
+
+    server {
+        listen 30511;
+
+        location /hello {
+            content_by_lua_block {
+                ngx.say("server 1")
+            }
+        }
+    }
+    server {
+        listen 30512;
+
+        location /hello {
+            content_by_lua_block {
+                ngx.say("server 2")
+            }
+        }
+    }
+    server {
+        listen 30513;
+
+        location /hello {
+            content_by_lua_block {
+                ngx.say("server 3")
+            }
+        }
+    }
+    server {
+        listen 30514;
+
+        location /hello {
+            content_by_lua_block {
+                ngx.say("server 4")
+            }
+        }
+    }
+_EOC_
+
+    $block->set_value("http_config", $http_config);
+});
+
+our $yaml_config = <<_EOC_;
+apisix:
+  node_listen: 1984
+  enable_control: true
+  control:
+    ip: 127.0.0.1
+    port: 9090
+deployment:
+  role: data_plane
+  role_data_plane:
+    config_provider: yaml
+discovery:
+  consul:
+    servers:
+      - "http://127.0.0.1:8500"
+      - "http://127.0.0.1:8600"
+    skip_services:
+      - "service_c"
+    timeout:
+      connect: 1000
+      read: 1000
+      wait: 60
+    weight: 1
+    fetch_interval: 1
+    keepalive: true
+    default_service:
+      host: "127.0.0.1"
+      port: 20999
+      metadata:
+        fail_timeout: 1
+        weight: 1
+        max_fails: 1
+_EOC_
+
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: prepare consul catalog register nodes
+--- config
+location /consul1 {
+    rewrite  ^/consul1/(.*) /v1/agent/service/$1 break;
+    proxy_pass http://127.0.0.1:8500;
+}
+
+location /consul2 {
+    rewrite  ^/consul2/(.*) /v1/agent/service/$1 break;
+    proxy_pass http://127.0.0.1:8600;
+}
+--- pipelined_requests eval
+[
+    "PUT /consul1/deregister/service_a1",
+    "PUT /consul1/deregister/service_b1",
+    "PUT /consul1/deregister/service_a2",
+    "PUT /consul1/deregister/service_b2",
+    "PUT /consul2/deregister/service_a1",
+    "PUT /consul2/deregister/service_b1",
+    "PUT /consul2/deregister/service_a2",
+    "PUT /consul2/deregister/service_b2",
+    "PUT /consul1/register\n" . "{\"ID\":\"service_a1\",\"Name\":\"service_a\",\"Tags\":[\"primary\",\"v1\"],\"Address\":\"127.0.0.1\",\"Port\":30511,\"Meta\":{\"service_a_version\":\"4.0\"},\"EnableTagOverride\":false,\"Weights\":{\"Passing\":10,\"Warning\":1}}",
+    "PUT /consul1/register\n" . "{\"ID\":\"service_a2\",\"Name\":\"service_a\",\"Tags\":[\"primary\",\"v1\"],\"Address\":\"127.0.0.1\",\"Port\":30512,\"Meta\":{\"service_a_version\":\"4.0\"},\"EnableTagOverride\":false,\"Weights\":{\"Passing\":10,\"Warning\":1}}",
+    "PUT /consul1/register\n" . "{\"ID\":\"service_b1\",\"Name\":\"service_b\",\"Tags\":[\"primary\",\"v1\"],\"Address\":\"127.0.0.1\",\"Port\":30513,\"Meta\":{\"service_b_version\":\"4.1\"},\"EnableTagOverride\":false,\"Weights\":{\"Passing\":10,\"Warning\":1}}",
+    "PUT /consul1/register\n" . "{\"ID\":\"service_b2\",\"Name\":\"service_b\",\"Tags\":[\"primary\",\"v1\"],\"Address\":\"127.0.0.1\",\"Port\":30514,\"Meta\":{\"service_b_version\":\"4.1\"},\"EnableTagOverride\":false,\"Weights\":{\"Passing\":10,\"Warning\":1}}",
+]
+--- response_body eval
+["", "", "", "", "", "", "", "", "", "", "", ""]

Review Comment:
   check `--- error_code eval` is more clear?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] Fabriceli commented on pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
Fabriceli commented on PR #8380:
URL: https://github.com/apache/apisix/pull/8380#issuecomment-1326114833

   > Please make the CI pass, thanks!
   
   ok, i fixed it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] spacewander commented on a diff in pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
spacewander commented on code in PR #8380:
URL: https://github.com/apache/apisix/pull/8380#discussion_r1036714122


##########
apisix/discovery/consul/init.lua:
##########
@@ -0,0 +1,418 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local require            = require
+local local_conf         = require("apisix.core.config_local").local_conf()
+local core               = require("apisix.core")
+local core_sleep         = require("apisix.core.utils").sleep
+local resty_consul       = require('resty.consul')
+local http               = require('resty.http')
+local util               = require("apisix.cli.util")
+local ipairs             = ipairs
+local error              = error
+local ngx                = ngx
+local unpack             = unpack
+local tonumber           = tonumber
+local pairs              = pairs
+local ngx_timer_at       = ngx.timer.at
+local ngx_timer_every    = ngx.timer.every
+local log                = core.log
+local json_delay_encode  = core.json.delay_encode
+
+local all_services = core.table.new(0, 5)
+local default_service
+local default_weight
+local skip_service_map = core.table.new(0, 1)
+local dump_params
+
+local events
+local events_list
+local consul_services
+
+local _M = {
+    version = 0.1,
+}
+
+
+local function discovery_consul_callback(data, event, source, pid)
+    all_services = data
+    log.notice("update local variable all_services, event is: ", event,
+            "source: ", source, "server pid:", pid,
+            ", all services: ", json_delay_encode(all_services, true))
+end
+
+
+function _M.all_nodes()
+    return all_services
+end
+
+
+function _M.nodes(service_name)
+    if not all_services then
+        log.error("all_services is nil, failed to fetch nodes for : ", service_name)
+        return
+    end
+
+    local resp_list = all_services[service_name]
+
+    if not resp_list then
+        log.error("fetch nodes failed by ", service_name, ", return default service")
+        return default_service and {default_service}
+    end
+
+    log.info("process id: ", ngx.worker.id(), ", all_services[", service_name, "] = ",
+            json_delay_encode(resp_list, true))
+
+    return resp_list
+end
+
+
+local function parse_instance(node)
+    local service_name, host, port = node.Service, node.Address, node.Port
+    -- if exist, skip special service name
+    if service_name and skip_service_map[service_name] then
+        return false
+    end
+    return true, host, tonumber(port), "", service_name
+end
+
+
+local function update_all_services(server_name_prefix, data)
+    local sn
+    local up_services = core.table.new(0, #data)
+    local weight = default_weight
+    for _, node in pairs(data) do
+        local succ, ip, port, metadata, server_name = parse_instance(node)
+        if succ then
+            sn = server_name
+            local nodes = up_services[sn]
+            if not nodes then
+                nodes = core.table.new(1, 0)
+                up_services[sn] = nodes
+            end
+            core.table.insert(nodes, {
+                host = ip,
+                port = port,
+                weight = metadata and metadata.weight or weight,
+            })
+        end
+    end
+
+    -- clean old unused data
+    local old_services = consul_services[server_name_prefix] or {}
+    for k, _ in pairs(old_services) do
+        all_services[k] = nil
+    end
+    core.table.clear(old_services)
+
+    for k, v in pairs(up_services) do
+        all_services[k] = v
+    end
+    consul_services[server_name_prefix] = up_services
+
+    log.info("update all services: ", json_delay_encode(all_services, true))
+end
+
+
+local function read_dump_services()
+    local data, err = util.read_file(dump_params.path)
+    if not data then
+        log.error("read dump file get error: ", err)
+        return
+    end
+
+    log.info("read dump file: ", data)
+    data = util.trim(data)
+    if #data == 0 then
+        log.error("dump file is empty")
+        return
+    end
+
+    local entity, err  = core.json.decode(data)
+    if not entity then
+        log.error("decoded dump data got error: ", err, ", file content: ", data)
+        return
+    end
+
+    if not entity.services or not entity.last_update then
+        log.warn("decoded dump data miss fields, file content: ", data)
+        return
+    end
+
+    local now_time = ngx.time()
+    log.info("dump file last_update: ", entity.last_update, ", dump_params.expire: ",
+            dump_params.expire, ", now_time: ", now_time)
+    if dump_params.expire ~= 0  and (entity.last_update + dump_params.expire) < now_time then
+        log.warn("dump file: ", dump_params.path, " had expired, ignored it")
+        return
+    end
+
+    all_services = entity.services
+    log.info("load dump file into memory success")
+end
+
+
+local function write_dump_services()
+    local entity = {
+        services = all_services,
+        last_update = ngx.time(),
+        expire = dump_params.expire, -- later need handle it
+    }
+    local data = core.json.encode(entity)
+    local succ, err =  util.write_file(dump_params.path, data)
+    if not succ then
+        log.error("write dump into file got error: ", err)
+    end
+end
+
+
+local function show_dump_file()
+    if not dump_params then
+        return 503, "dump params is nil"
+    end
+
+    local data, err = util.read_file(dump_params.path)
+    if not data then
+        return 503, err
+    end
+
+    return 200, data
+end
+
+
+function _M.connect(premature, consul_server, retry_delay)
+    if premature then
+        return
+    end
+
+    local consul_client = resty_consul:new({
+        host = consul_server.host,
+        port = consul_server.port,
+        connect_timeout = consul_server.connect_timeout,
+        read_timeout = consul_server.read_timeout,
+        default_args = consul_server.default_args,
+    })
+
+    log.info("consul_server: ", json_delay_encode(consul_server, true))
+    local watch_result, watch_err = consul_client:get(consul_server.consul_watch_sub_url)
+    local watch_error_info = (watch_err ~= nil and watch_err)
+            or ((watch_result ~= nil and watch_result.status ~= 200)
+            and watch_result.status)
+    if watch_error_info then
+        log.error("connect consul: ", consul_server.consul_server_url,
+                " by sub url: ", consul_server.consul_watch_sub_url,
+                ", got watch result: ", json_delay_encode(watch_result, true),
+                 ", with error: ", watch_error_info)
+
+        if not retry_delay then
+            retry_delay = 1
+        else
+            retry_delay = retry_delay * 4
+        end
+
+        log.warn("retry connecting consul after ", retry_delay, " seconds")
+        core_sleep(retry_delay)
+
+        goto ERR
+    end
+
+    log.info("connect consul: ", consul_server.consul_server_url,
+            ", watch_result status: ", watch_result.status,
+            ", watch_result.headers.index: ", watch_result.headers['X-Consul-Index'],
+            ", consul_server.index: ", consul_server.index,
+            ", consul_server: ", json_delay_encode(consul_server, true))
+
+    -- if current index different last index then update service
+    if consul_server.index ~= watch_result.headers['X-Consul-Index'] then
+
+        -- fetch all services info
+        local result, err = consul_client:get(consul_server.consul_sub_url)
+
+        local error_info = (err ~= nil and err) or
+         ((result ~= nil and result.status ~= 200) and result.status)
+
+        if error_info then
+            log.error("connect consul: ", consul_server.consul_server_url,
+                    " by sub url: ", consul_server.consul_sub_url,
+                    ", got result: ", json_delay_encode(result, true),
+                    ", with error: ", error_info)
+
+            if not retry_delay then
+                retry_delay = 1
+            else
+                retry_delay = retry_delay * 4
+            end
+
+            log.warn("retry connecting consul after ", retry_delay, " seconds")
+            core_sleep(retry_delay)
+
+            goto ERR
+        end
+
+        consul_server.index = watch_result.headers['X-Consul-Index']
+        -- only long connect type use index
+        if consul_server.keepalive then
+            consul_server.default_args.index = watch_result.headers['X-Consul-Index']
+        end
+        -- decode body, decode json, update service, error handling
+        if result.body then
+            log.notice("server_name: ", consul_server.consul_server_url,
+                    ", header: ", json_delay_encode(result.headers, true),
+                    ", body: ", json_delay_encode(result.body, true))
+            update_all_services(consul_server.consul_server_url, result.body)
+            --update events
+            local ok, err = events.post(events_list._source, events_list.updating, all_services)
+            if not ok then
+                log.error("post_event failure with ", events_list._source,
+                        ", update all services error: ", err)
+            end
+
+            if dump_params then
+                ngx_timer_at(0, write_dump_services)
+            end
+        end
+    end
+
+    :: ERR ::
+    local keepalive = consul_server.keepalive
+    if keepalive then
+        local ok, err = ngx_timer_at(0, _M.connect, consul_server, retry_delay)
+        if not ok then
+            log.error("create ngx_timer_at got error: ", err)
+            return
+        end
+    end
+end
+
+
+local function format_consul_params(consul_conf)
+    local consul_server_list = core.table.new(0, #consul_conf.servers)
+    local args
+
+    if consul_conf.keepalive == false then
+        args = {}
+    elseif consul_conf.keepalive then
+        args = {
+            wait = consul_conf.timeout.wait, --blocked wait!=0; unblocked by wait=0
+            index = 0,
+        }
+    end
+
+    for _, v in pairs(consul_conf.servers) do
+        local scheme, host, port, path = unpack(http.parse_uri(nil, v))
+        if scheme ~= "http" then
+            return nil, "only support consul http schema address, eg: http://address:port"
+        elseif path ~= "/" or core.string.has_suffix(v, '/') then
+            return nil, "invalid consul server address, the valid format: http://address:port"
+        end
+
+        core.table.insert(consul_server_list, {
+            host = host,
+            port = port,
+            connect_timeout = consul_conf.timeout.connect,
+            read_timeout = consul_conf.timeout.read,
+            consul_sub_url = "/agent/services",
+            consul_watch_sub_url = "/catalog/services",
+            consul_server_url = v .. "/v1",
+            weight = consul_conf.weight,
+            keepalive = consul_conf.keepalive,
+            default_args = args,
+            index = 0,
+            fetch_interval = consul_conf.fetch_interval -- fetch interval to next connect consul
+        })
+    end
+
+    return consul_server_list
+end
+
+
+function _M.init_worker()
+    local consul_conf = local_conf.discovery.consul
+
+    if consul_conf.dump then
+        local dump = consul_conf.dump
+        dump_params = dump
+
+        if dump.load_on_init then
+            read_dump_services()
+        end
+    end
+
+    events = require("resty.worker.events")
+    events_list = events.event_list(
+            "discovery_consul_update_all_services",
+            "updating"
+    )
+
+    if 0 ~= ngx.worker.id() then
+        events.register(discovery_consul_callback, events_list._source, events_list.updating)
+        return
+    end
+
+    log.notice("consul_conf: ", json_delay_encode(consul_conf, true))
+    default_weight = consul_conf.weight
+    -- set default service, used when the server node cannot be found
+    if consul_conf.default_service then
+        default_service = consul_conf.default_service
+        default_service.weight = default_weight
+    end
+    if consul_conf.skip_services then
+        skip_service_map = core.table.new(0, #consul_conf.skip_services)
+        for _, v in ipairs(consul_conf.skip_services) do
+            skip_service_map[v] = true
+        end
+    end
+
+    local consul_servers_list, err = format_consul_params(consul_conf)
+    if err then
+        error(err)
+        return

Review Comment:
   @Fabriceli 
   Yes. I mean if the `error` is hit, the `return` here is not executed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] tzssangglass commented on a diff in pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
tzssangglass commented on code in PR #8380:
URL: https://github.com/apache/apisix/pull/8380#discussion_r1039112336


##########
apisix/discovery/consul/init.lua:
##########
@@ -0,0 +1,416 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local require            = require
+local local_conf         = require("apisix.core.config_local").local_conf()
+local core               = require("apisix.core")
+local core_sleep         = require("apisix.core.utils").sleep
+local resty_consul       = require('resty.consul')
+local http               = require('resty.http')
+local util               = require("apisix.cli.util")
+local ipairs             = ipairs
+local error              = error
+local ngx                = ngx
+local unpack             = unpack
+local tonumber           = tonumber
+local pairs              = pairs
+local ngx_timer_at       = ngx.timer.at
+local ngx_timer_every    = ngx.timer.every
+local log                = core.log
+local json_delay_encode  = core.json.delay_encode
+
+local all_services = core.table.new(0, 5)
+local default_service
+local default_weight
+local skip_service_map = core.table.new(0, 1)
+local dump_params
+
+local events
+local events_list
+local consul_services
+
+local _M = {
+    version = 0.1,
+}
+
+
+local function discovery_consul_callback(data, event, source, pid)
+    all_services = data
+    log.notice("update local variable all_services, event is: ", event,
+            "source: ", source, "server pid:", pid,
+            ", all services: ", json_delay_encode(all_services, true))
+end
+
+
+function _M.all_nodes()
+    return all_services
+end
+
+
+function _M.nodes(service_name)
+    if not all_services then
+        log.error("all_services is nil, failed to fetch nodes for : ", service_name)
+        return
+    end
+
+    local resp_list = all_services[service_name]
+
+    if not resp_list then
+        log.error("fetch nodes failed by ", service_name, ", return default service")
+        return default_service and {default_service}
+    end
+
+    log.info("process id: ", ngx.worker.id(), ", all_services[", service_name, "] = ",
+            json_delay_encode(resp_list, true))

Review Comment:
   it is better to use
   
   ```
   local ngx_worker_id = ngx.worker.id
   
   ...
   
   log.info("process id: ", ngx_worker_id(), ", all_services[", service_name, "] = ",
               json_delay_encode(resp_list, true))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] spacewander commented on pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
spacewander commented on PR #8380:
URL: https://github.com/apache/apisix/pull/8380#issuecomment-1326089854

   Please make the CI pass, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] spacewander commented on pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
spacewander commented on PR #8380:
URL: https://github.com/apache/apisix/pull/8380#issuecomment-1326971843

   > > Learn more.
   > 
   > Could you start the other pipeline?
   
   Done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] Fabriceli commented on pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
Fabriceli commented on PR #8380:
URL: https://github.com/apache/apisix/pull/8380#issuecomment-1331700545

   > > Could you give some tips for fix this pipeline [build (ubuntu-20.04, linux_openresty, t/gm)](https://github.com/apache/apisix/actions/runs/3571445230/jobs/6004502287#logs) error, I didnt modified anything about the `grpc_server_example`, thanks
   > 
   > rerun CI
   
   I think the Github pipeline something wrong happened, the 12 pipelines have been running for more than 2 hours...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] Fabriceli commented on a diff in pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
Fabriceli commented on code in PR #8380:
URL: https://github.com/apache/apisix/pull/8380#discussion_r1035653987


##########
docs/en/latest/discovery/consul.md:
##########
@@ -0,0 +1,298 @@
+---
+title: consul
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+## Summary
+
+APACHE APISIX supports Consul as a service discovery
+
+## Configuration for discovery client
+
+### Configuration for Consul
+
+First of all, we need to add following configuration in `conf/config.yaml` :
+
+```yaml
+discovery:
+  consul:
+    servers:
+      - "http://127.0.0.1:8500"
+      - "http://127.0.0.1:8600"   # `http://127.0.0.1:8500` and `http://127.0.0.1:8600` are different clusters
+    skip_services:                # if you need to skip special keys

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] tzssangglass commented on pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
tzssangglass commented on PR #8380:
URL: https://github.com/apache/apisix/pull/8380#issuecomment-1338523356

   > Some error in Chaos Test, May you give some hint about that
   
   rerun it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] Fabriceli commented on pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
Fabriceli commented on PR #8380:
URL: https://github.com/apache/apisix/pull/8380#issuecomment-1329938637

   @spacewander @tzssangglass I had finished, and I had fixed all the CI error


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] Fabriceli commented on pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
Fabriceli commented on PR #8380:
URL: https://github.com/apache/apisix/pull/8380#issuecomment-1329936452

   @spacewander @tzssangglass I had finished, and I had fixed all the CI error


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] Fabriceli commented on a diff in pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
Fabriceli commented on code in PR #8380:
URL: https://github.com/apache/apisix/pull/8380#discussion_r1035652357


##########
apisix/discovery/consul/init.lua:
##########
@@ -0,0 +1,418 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local require            = require
+local local_conf         = require("apisix.core.config_local").local_conf()
+local core               = require("apisix.core")
+local core_sleep         = require("apisix.core.utils").sleep
+local resty_consul       = require('resty.consul')
+local http               = require('resty.http')
+local util               = require("apisix.cli.util")
+local ipairs             = ipairs
+local error              = error
+local ngx                = ngx
+local unpack             = unpack
+local tonumber           = tonumber
+local pairs              = pairs
+local ngx_timer_at       = ngx.timer.at
+local ngx_timer_every    = ngx.timer.every
+local log                = core.log
+local json_delay_encode  = core.json.delay_encode
+
+local all_services = core.table.new(0, 5)
+local default_service
+local default_weight
+local skip_service_map = core.table.new(0, 1)
+local dump_params
+
+local events
+local events_list
+local consul_services
+
+local _M = {
+    version = 0.1,
+}
+
+
+local function discovery_consul_callback(data, event, source, pid)
+    all_services = data
+    log.notice("update local variable all_services, event is: ", event,
+            "source: ", source, "server pid:", pid,
+            ", all services: ", json_delay_encode(all_services, true))
+end
+
+
+function _M.all_nodes()
+    return all_services
+end
+
+
+function _M.nodes(service_name)
+    if not all_services then
+        log.error("all_services is nil, failed to fetch nodes for : ", service_name)
+        return
+    end
+
+    local resp_list = all_services[service_name]
+
+    if not resp_list then
+        log.error("fetch nodes failed by ", service_name, ", return default service")
+        return default_service and {default_service}
+    end
+
+    log.info("process id: ", ngx.worker.id(), ", all_services[", service_name, "] = ",
+            json_delay_encode(resp_list, true))
+
+    return resp_list
+end
+
+
+local function parse_instance(node)
+    local service_name, host, port = node.Service, node.Address, node.Port
+    -- if exist, skip special service name
+    if service_name and skip_service_map[service_name] then
+        return false
+    end
+    return true, host, tonumber(port), "", service_name
+end
+
+
+local function update_all_services(server_name_prefix, data)
+    local sn
+    local up_services = core.table.new(0, #data)
+    local weight = default_weight
+    for _, node in pairs(data) do
+        local succ, ip, port, metadata, server_name = parse_instance(node)
+        if succ then
+            sn = server_name
+            local nodes = up_services[sn]
+            if not nodes then
+                nodes = core.table.new(1, 0)
+                up_services[sn] = nodes
+            end
+            core.table.insert(nodes, {
+                host = ip,
+                port = port,
+                weight = metadata and metadata.weight or weight,
+            })
+        end
+    end
+
+    -- clean old unused data
+    local old_services = consul_services[server_name_prefix] or {}
+    for k, _ in pairs(old_services) do
+        all_services[k] = nil
+    end
+    core.table.clear(old_services)
+
+    for k, v in pairs(up_services) do
+        all_services[k] = v
+    end
+    consul_services[server_name_prefix] = up_services
+
+    log.info("update all services: ", json_delay_encode(all_services, true))
+end
+
+
+local function read_dump_services()
+    local data, err = util.read_file(dump_params.path)
+    if not data then
+        log.error("read dump file get error: ", err)
+        return
+    end
+
+    log.info("read dump file: ", data)
+    data = util.trim(data)
+    if #data == 0 then
+        log.error("dump file is empty")
+        return
+    end
+
+    local entity, err  = core.json.decode(data)
+    if not entity then
+        log.error("decoded dump data got error: ", err, ", file content: ", data)
+        return
+    end
+
+    if not entity.services or not entity.last_update then
+        log.warn("decoded dump data miss fields, file content: ", data)
+        return
+    end
+
+    local now_time = ngx.time()
+    log.info("dump file last_update: ", entity.last_update, ", dump_params.expire: ",
+            dump_params.expire, ", now_time: ", now_time)
+    if dump_params.expire ~= 0  and (entity.last_update + dump_params.expire) < now_time then
+        log.warn("dump file: ", dump_params.path, " had expired, ignored it")
+        return
+    end
+
+    all_services = entity.services
+    log.info("load dump file into memory success")
+end
+
+
+local function write_dump_services()
+    local entity = {
+        services = all_services,
+        last_update = ngx.time(),
+        expire = dump_params.expire, -- later need handle it
+    }
+    local data = core.json.encode(entity)
+    local succ, err =  util.write_file(dump_params.path, data)
+    if not succ then
+        log.error("write dump into file got error: ", err)
+    end
+end
+
+
+local function show_dump_file()
+    if not dump_params then
+        return 503, "dump params is nil"
+    end
+
+    local data, err = util.read_file(dump_params.path)
+    if not data then
+        return 503, err
+    end
+
+    return 200, data
+end
+
+
+function _M.connect(premature, consul_server, retry_delay)
+    if premature then
+        return
+    end
+
+    local consul_client = resty_consul:new({
+        host = consul_server.host,
+        port = consul_server.port,
+        connect_timeout = consul_server.connect_timeout,
+        read_timeout = consul_server.read_timeout,
+        default_args = consul_server.default_args,
+    })
+
+    log.info("consul_server: ", json_delay_encode(consul_server, true))
+    local watch_result, watch_err = consul_client:get(consul_server.consul_watch_sub_url)
+    local watch_error_info = (watch_err ~= nil and watch_err)
+            or ((watch_result ~= nil and watch_result.status ~= 200)
+            and watch_result.status)
+    if watch_error_info then
+        log.error("connect consul: ", consul_server.consul_server_url,
+                " by sub url: ", consul_server.consul_watch_sub_url,
+                ", got watch result: ", json_delay_encode(watch_result, true),
+                 ", with error: ", watch_error_info)
+
+        if not retry_delay then
+            retry_delay = 1
+        else
+            retry_delay = retry_delay * 4
+        end
+
+        log.warn("retry connecting consul after ", retry_delay, " seconds")
+        core_sleep(retry_delay)
+
+        goto ERR
+    end
+
+    log.info("connect consul: ", consul_server.consul_server_url,
+            ", watch_result status: ", watch_result.status,
+            ", watch_result.headers.index: ", watch_result.headers['X-Consul-Index'],
+            ", consul_server.index: ", consul_server.index,
+            ", consul_server: ", json_delay_encode(consul_server, true))
+
+    -- if current index different last index then update service
+    if consul_server.index ~= watch_result.headers['X-Consul-Index'] then
+
+        -- fetch all services info
+        local result, err = consul_client:get(consul_server.consul_sub_url)
+
+        local error_info = (err ~= nil and err) or
+         ((result ~= nil and result.status ~= 200) and result.status)
+
+        if error_info then
+            log.error("connect consul: ", consul_server.consul_server_url,
+                    " by sub url: ", consul_server.consul_sub_url,
+                    ", got result: ", json_delay_encode(result, true),
+                    ", with error: ", error_info)
+
+            if not retry_delay then
+                retry_delay = 1
+            else
+                retry_delay = retry_delay * 4

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] Fabriceli closed pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
Fabriceli closed pull request #8380: feat: add consul discovery module
URL: https://github.com/apache/apisix/pull/8380


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] Fabriceli commented on pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
Fabriceli commented on PR #8380:
URL: https://github.com/apache/apisix/pull/8380#issuecomment-1334774451

   @spacewander @tzssangglass May you RETURN CI to rerun the CI again, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] Fabriceli commented on a diff in pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
Fabriceli commented on code in PR #8380:
URL: https://github.com/apache/apisix/pull/8380#discussion_r1039168315


##########
apisix/discovery/consul/init.lua:
##########
@@ -0,0 +1,416 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local require            = require
+local local_conf         = require("apisix.core.config_local").local_conf()
+local core               = require("apisix.core")
+local core_sleep         = require("apisix.core.utils").sleep
+local resty_consul       = require('resty.consul')
+local http               = require('resty.http')
+local util               = require("apisix.cli.util")
+local ipairs             = ipairs
+local error              = error
+local ngx                = ngx
+local unpack             = unpack
+local tonumber           = tonumber
+local pairs              = pairs
+local ngx_timer_at       = ngx.timer.at
+local ngx_timer_every    = ngx.timer.every
+local log                = core.log
+local json_delay_encode  = core.json.delay_encode
+
+local all_services = core.table.new(0, 5)
+local default_service
+local default_weight
+local skip_service_map = core.table.new(0, 1)
+local dump_params
+
+local events
+local events_list
+local consul_services
+
+local _M = {
+    version = 0.1,
+}
+
+
+local function discovery_consul_callback(data, event, source, pid)
+    all_services = data
+    log.notice("update local variable all_services, event is: ", event,
+            "source: ", source, "server pid:", pid,
+            ", all services: ", json_delay_encode(all_services, true))
+end
+
+
+function _M.all_nodes()
+    return all_services
+end
+
+
+function _M.nodes(service_name)
+    if not all_services then
+        log.error("all_services is nil, failed to fetch nodes for : ", service_name)
+        return
+    end
+
+    local resp_list = all_services[service_name]
+
+    if not resp_list then
+        log.error("fetch nodes failed by ", service_name, ", return default service")
+        return default_service and {default_service}
+    end
+
+    log.info("process id: ", ngx.worker.id(), ", all_services[", service_name, "] = ",
+            json_delay_encode(resp_list, true))
+
+    return resp_list
+end
+
+
+local function parse_instance(node)
+    local service_name, host, port = node.Service, node.Address, node.Port
+    -- if exist, skip special service name
+    if service_name and skip_service_map[service_name] then
+        return false
+    end
+    return true, host, tonumber(port), "", service_name
+end
+
+
+local function update_all_services(server_name_prefix, data)
+    local sn
+    local up_services = core.table.new(0, #data)
+    local weight = default_weight
+    for _, node in pairs(data) do
+        local succ, ip, port, metadata, server_name = parse_instance(node)
+        if succ then
+            sn = server_name

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] Fabriceli commented on pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
Fabriceli commented on PR #8380:
URL: https://github.com/apache/apisix/pull/8380#issuecomment-1326394997

   > Learn more.
   
   Could you start the other pipeline?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] Fabriceli commented on pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
Fabriceli commented on PR #8380:
URL: https://github.com/apache/apisix/pull/8380#issuecomment-1336018995

   @spacewander @tzssangglass I have merged upstream master, could you re-start the CI? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] tzssangglass commented on a diff in pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
tzssangglass commented on code in PR #8380:
URL: https://github.com/apache/apisix/pull/8380#discussion_r1039118228


##########
t/discovery/consul.t:
##########
@@ -0,0 +1,581 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+log_level('info');
+no_root_location();
+no_shuffle();
+
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    my $http_config = $block->http_config // <<_EOC_;
+
+    server {
+        listen 20999;
+
+        location / {
+            content_by_lua_block {
+                ngx.say("missing consul services")
+            }
+        }
+    }
+
+    server {
+        listen 30511;
+
+        location /hello {
+            content_by_lua_block {
+                ngx.say("server 1")
+            }
+        }
+    }
+    server {
+        listen 30512;
+
+        location /hello {
+            content_by_lua_block {
+                ngx.say("server 2")
+            }
+        }
+    }
+    server {
+        listen 30513;
+
+        location /hello {
+            content_by_lua_block {
+                ngx.say("server 3")
+            }
+        }
+    }
+    server {
+        listen 30514;
+
+        location /hello {
+            content_by_lua_block {
+                ngx.say("server 4")
+            }
+        }
+    }
+_EOC_
+
+    $block->set_value("http_config", $http_config);
+});
+
+our $yaml_config = <<_EOC_;
+apisix:
+  node_listen: 1984
+  enable_control: true
+  control:
+    ip: 127.0.0.1
+    port: 9090
+deployment:
+  role: data_plane
+  role_data_plane:
+    config_provider: yaml
+discovery:
+  consul:
+    servers:
+      - "http://127.0.0.1:8500"
+      - "http://127.0.0.1:8600"
+    skip_services:
+      - "service_c"
+    timeout:
+      connect: 1000
+      read: 1000
+      wait: 60
+    weight: 1
+    fetch_interval: 1
+    keepalive: true
+    default_service:
+      host: "127.0.0.1"
+      port: 20999
+      metadata:
+        fail_timeout: 1
+        weight: 1
+        max_fails: 1
+_EOC_
+
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: prepare consul catalog register nodes
+--- config
+location /consul1 {
+    rewrite  ^/consul1/(.*) /v1/agent/service/$1 break;
+    proxy_pass http://127.0.0.1:8500;
+}
+
+location /consul2 {
+    rewrite  ^/consul2/(.*) /v1/agent/service/$1 break;
+    proxy_pass http://127.0.0.1:8600;
+}
+--- pipelined_requests eval
+[
+    "PUT /consul1/deregister/service_a1",
+    "PUT /consul1/deregister/service_b1",
+    "PUT /consul1/deregister/service_a2",
+    "PUT /consul1/deregister/service_b2",
+    "PUT /consul2/deregister/service_a1",
+    "PUT /consul2/deregister/service_b1",
+    "PUT /consul2/deregister/service_a2",
+    "PUT /consul2/deregister/service_b2",
+    "PUT /consul1/register\n" . "{\"ID\":\"service_a1\",\"Name\":\"service_a\",\"Tags\":[\"primary\",\"v1\"],\"Address\":\"127.0.0.1\",\"Port\":30511,\"Meta\":{\"service_a_version\":\"4.0\"},\"EnableTagOverride\":false,\"Weights\":{\"Passing\":10,\"Warning\":1}}",
+    "PUT /consul1/register\n" . "{\"ID\":\"service_a2\",\"Name\":\"service_a\",\"Tags\":[\"primary\",\"v1\"],\"Address\":\"127.0.0.1\",\"Port\":30512,\"Meta\":{\"service_a_version\":\"4.0\"},\"EnableTagOverride\":false,\"Weights\":{\"Passing\":10,\"Warning\":1}}",
+    "PUT /consul1/register\n" . "{\"ID\":\"service_b1\",\"Name\":\"service_b\",\"Tags\":[\"primary\",\"v1\"],\"Address\":\"127.0.0.1\",\"Port\":30513,\"Meta\":{\"service_b_version\":\"4.1\"},\"EnableTagOverride\":false,\"Weights\":{\"Passing\":10,\"Warning\":1}}",
+    "PUT /consul1/register\n" . "{\"ID\":\"service_b2\",\"Name\":\"service_b\",\"Tags\":[\"primary\",\"v1\"],\"Address\":\"127.0.0.1\",\"Port\":30514,\"Meta\":{\"service_b_version\":\"4.1\"},\"EnableTagOverride\":false,\"Weights\":{\"Passing\":10,\"Warning\":1}}",
+]
+--- response_body eval
+["", "", "", "", "", "", "", "", "", "", "", ""]
+
+
+
+=== TEST 2: test consul server 1
+--- yaml_config eval: $::yaml_config
+--- apisix_yaml
+routes:
+  -
+    uri: /*
+    upstream:
+      service_name: service_a
+      discovery_type: consul
+      type: roundrobin
+#END
+--- pipelined_requests eval
+[
+    "GET /hello",
+    "GET /hello",
+]
+--- response_body_like eval
+[
+    qr/server [1-2]\n/,
+    qr/server [1-2]\n/,
+]
+--- no_error_log
+[error, error]

Review Comment:
   what does this mean?  and cc @spacewander 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] tzssangglass commented on a diff in pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
tzssangglass commented on code in PR #8380:
URL: https://github.com/apache/apisix/pull/8380#discussion_r1039114460


##########
apisix/discovery/consul/init.lua:
##########
@@ -0,0 +1,416 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local require            = require
+local local_conf         = require("apisix.core.config_local").local_conf()
+local core               = require("apisix.core")
+local core_sleep         = require("apisix.core.utils").sleep
+local resty_consul       = require('resty.consul')
+local http               = require('resty.http')
+local util               = require("apisix.cli.util")
+local ipairs             = ipairs
+local error              = error
+local ngx                = ngx
+local unpack             = unpack
+local tonumber           = tonumber
+local pairs              = pairs
+local ngx_timer_at       = ngx.timer.at
+local ngx_timer_every    = ngx.timer.every
+local log                = core.log
+local json_delay_encode  = core.json.delay_encode
+
+local all_services = core.table.new(0, 5)
+local default_service
+local default_weight
+local skip_service_map = core.table.new(0, 1)
+local dump_params
+
+local events
+local events_list
+local consul_services
+
+local _M = {
+    version = 0.1,
+}
+
+
+local function discovery_consul_callback(data, event, source, pid)
+    all_services = data
+    log.notice("update local variable all_services, event is: ", event,
+            "source: ", source, "server pid:", pid,
+            ", all services: ", json_delay_encode(all_services, true))
+end
+
+
+function _M.all_nodes()
+    return all_services
+end
+
+
+function _M.nodes(service_name)
+    if not all_services then
+        log.error("all_services is nil, failed to fetch nodes for : ", service_name)
+        return
+    end
+
+    local resp_list = all_services[service_name]
+
+    if not resp_list then
+        log.error("fetch nodes failed by ", service_name, ", return default service")
+        return default_service and {default_service}
+    end
+
+    log.info("process id: ", ngx.worker.id(), ", all_services[", service_name, "] = ",
+            json_delay_encode(resp_list, true))
+
+    return resp_list
+end
+
+
+local function parse_instance(node)
+    local service_name, host, port = node.Service, node.Address, node.Port
+    -- if exist, skip special service name
+    if service_name and skip_service_map[service_name] then
+        return false
+    end
+    return true, host, tonumber(port), "", service_name
+end
+
+
+local function update_all_services(server_name_prefix, data)
+    local sn
+    local up_services = core.table.new(0, #data)
+    local weight = default_weight
+    for _, node in pairs(data) do
+        local succ, ip, port, metadata, server_name = parse_instance(node)
+        if succ then
+            sn = server_name

Review Comment:
   It looks like we don't need to define `sn` variables within this function?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] spacewander merged pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
spacewander merged PR #8380:
URL: https://github.com/apache/apisix/pull/8380


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] spacewander commented on pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
spacewander commented on PR #8380:
URL: https://github.com/apache/apisix/pull/8380#issuecomment-1334817912

   @Fabriceli 
   Could you follow the discussion in https://github.com/apache/apisix/pull/8433#issuecomment-1334709384?
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] Fabriceli commented on pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
Fabriceli commented on PR #8380:
URL: https://github.com/apache/apisix/pull/8380#issuecomment-1334830897

   > @Fabriceli Could you follow the discussion in [#8433 (comment)](https://github.com/apache/apisix/pull/8433#issuecomment-1334709384)? Thanks!
   
   DONE, I had merge upstream master to dev branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] Fabriceli commented on a diff in pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
Fabriceli commented on code in PR #8380:
URL: https://github.com/apache/apisix/pull/8380#discussion_r1039168184


##########
apisix/discovery/consul/init.lua:
##########
@@ -0,0 +1,416 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local require            = require
+local local_conf         = require("apisix.core.config_local").local_conf()
+local core               = require("apisix.core")
+local core_sleep         = require("apisix.core.utils").sleep
+local resty_consul       = require('resty.consul')
+local http               = require('resty.http')
+local util               = require("apisix.cli.util")
+local ipairs             = ipairs
+local error              = error
+local ngx                = ngx
+local unpack             = unpack
+local tonumber           = tonumber
+local pairs              = pairs
+local ngx_timer_at       = ngx.timer.at
+local ngx_timer_every    = ngx.timer.every
+local log                = core.log
+local json_delay_encode  = core.json.delay_encode
+
+local all_services = core.table.new(0, 5)
+local default_service
+local default_weight
+local skip_service_map = core.table.new(0, 1)
+local dump_params
+
+local events
+local events_list
+local consul_services
+
+local _M = {
+    version = 0.1,
+}
+
+
+local function discovery_consul_callback(data, event, source, pid)
+    all_services = data
+    log.notice("update local variable all_services, event is: ", event,
+            "source: ", source, "server pid:", pid,
+            ", all services: ", json_delay_encode(all_services, true))
+end
+
+
+function _M.all_nodes()
+    return all_services
+end
+
+
+function _M.nodes(service_name)
+    if not all_services then
+        log.error("all_services is nil, failed to fetch nodes for : ", service_name)
+        return
+    end
+
+    local resp_list = all_services[service_name]
+
+    if not resp_list then
+        log.error("fetch nodes failed by ", service_name, ", return default service")
+        return default_service and {default_service}
+    end
+
+    log.info("process id: ", ngx.worker.id(), ", all_services[", service_name, "] = ",
+            json_delay_encode(resp_list, true))
+
+    return resp_list
+end
+
+
+local function parse_instance(node)
+    local service_name, host, port = node.Service, node.Address, node.Port
+    -- if exist, skip special service name
+    if service_name and skip_service_map[service_name] then
+        return false
+    end
+    return true, host, tonumber(port), "", service_name

Review Comment:
   done 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] spacewander commented on a diff in pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
spacewander commented on code in PR #8380:
URL: https://github.com/apache/apisix/pull/8380#discussion_r1039145669


##########
t/discovery/consul.t:
##########
@@ -0,0 +1,581 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+log_level('info');
+no_root_location();
+no_shuffle();
+
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    my $http_config = $block->http_config // <<_EOC_;
+
+    server {
+        listen 20999;
+
+        location / {
+            content_by_lua_block {
+                ngx.say("missing consul services")
+            }
+        }
+    }
+
+    server {
+        listen 30511;
+
+        location /hello {
+            content_by_lua_block {
+                ngx.say("server 1")
+            }
+        }
+    }
+    server {
+        listen 30512;
+
+        location /hello {
+            content_by_lua_block {
+                ngx.say("server 2")
+            }
+        }
+    }
+    server {
+        listen 30513;
+
+        location /hello {
+            content_by_lua_block {
+                ngx.say("server 3")
+            }
+        }
+    }
+    server {
+        listen 30514;
+
+        location /hello {
+            content_by_lua_block {
+                ngx.say("server 4")
+            }
+        }
+    }
+_EOC_
+
+    $block->set_value("http_config", $http_config);
+});
+
+our $yaml_config = <<_EOC_;
+apisix:
+  node_listen: 1984
+  enable_control: true
+  control:
+    ip: 127.0.0.1
+    port: 9090
+deployment:
+  role: data_plane
+  role_data_plane:
+    config_provider: yaml
+discovery:
+  consul:
+    servers:
+      - "http://127.0.0.1:8500"
+      - "http://127.0.0.1:8600"
+    skip_services:
+      - "service_c"
+    timeout:
+      connect: 1000
+      read: 1000
+      wait: 60
+    weight: 1
+    fetch_interval: 1
+    keepalive: true
+    default_service:
+      host: "127.0.0.1"
+      port: 20999
+      metadata:
+        fail_timeout: 1
+        weight: 1
+        max_fails: 1
+_EOC_
+
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: prepare consul catalog register nodes
+--- config
+location /consul1 {
+    rewrite  ^/consul1/(.*) /v1/agent/service/$1 break;
+    proxy_pass http://127.0.0.1:8500;
+}
+
+location /consul2 {
+    rewrite  ^/consul2/(.*) /v1/agent/service/$1 break;
+    proxy_pass http://127.0.0.1:8600;
+}
+--- pipelined_requests eval
+[
+    "PUT /consul1/deregister/service_a1",
+    "PUT /consul1/deregister/service_b1",
+    "PUT /consul1/deregister/service_a2",
+    "PUT /consul1/deregister/service_b2",
+    "PUT /consul2/deregister/service_a1",
+    "PUT /consul2/deregister/service_b1",
+    "PUT /consul2/deregister/service_a2",
+    "PUT /consul2/deregister/service_b2",
+    "PUT /consul1/register\n" . "{\"ID\":\"service_a1\",\"Name\":\"service_a\",\"Tags\":[\"primary\",\"v1\"],\"Address\":\"127.0.0.1\",\"Port\":30511,\"Meta\":{\"service_a_version\":\"4.0\"},\"EnableTagOverride\":false,\"Weights\":{\"Passing\":10,\"Warning\":1}}",
+    "PUT /consul1/register\n" . "{\"ID\":\"service_a2\",\"Name\":\"service_a\",\"Tags\":[\"primary\",\"v1\"],\"Address\":\"127.0.0.1\",\"Port\":30512,\"Meta\":{\"service_a_version\":\"4.0\"},\"EnableTagOverride\":false,\"Weights\":{\"Passing\":10,\"Warning\":1}}",
+    "PUT /consul1/register\n" . "{\"ID\":\"service_b1\",\"Name\":\"service_b\",\"Tags\":[\"primary\",\"v1\"],\"Address\":\"127.0.0.1\",\"Port\":30513,\"Meta\":{\"service_b_version\":\"4.1\"},\"EnableTagOverride\":false,\"Weights\":{\"Passing\":10,\"Warning\":1}}",
+    "PUT /consul1/register\n" . "{\"ID\":\"service_b2\",\"Name\":\"service_b\",\"Tags\":[\"primary\",\"v1\"],\"Address\":\"127.0.0.1\",\"Port\":30514,\"Meta\":{\"service_b_version\":\"4.1\"},\"EnableTagOverride\":false,\"Weights\":{\"Passing\":10,\"Warning\":1}}",
+]
+--- response_body eval
+["", "", "", "", "", "", "", "", "", "", "", ""]
+
+
+
+=== TEST 2: test consul server 1
+--- yaml_config eval: $::yaml_config
+--- apisix_yaml
+routes:
+  -
+    uri: /*
+    upstream:
+      service_name: service_a
+      discovery_type: consul
+      type: roundrobin
+#END
+--- pipelined_requests eval
+[
+    "GET /hello",
+    "GET /hello",
+]
+--- response_body_like eval
+[
+    qr/server [1-2]\n/,
+    qr/server [1-2]\n/,
+]
+--- no_error_log
+[error, error]

Review Comment:
   A special `no_error_log` for pipelined_requests 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] tzssangglass commented on a diff in pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
tzssangglass commented on code in PR #8380:
URL: https://github.com/apache/apisix/pull/8380#discussion_r1031979611


##########
docs/en/latest/discovery/consul.md:
##########
@@ -0,0 +1,296 @@
+---
+title: consul
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+## Summary
+
+APACHE APISIX supports Consul as a service discovery
+
+## Configuration for discovery client
+
+### Configuration for Consul
+
+First of all, we need to add following configuration in `conf/config.yaml` :
+
+```yaml
+discovery:
+  consul:
+    servers:
+      - "http://127.0.0.1:8500"
+      - "http://127.0.0.1:8600"

Review Comment:
   8500 and 8600 are different consul DNS servers that provide different query information?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] tzssangglass commented on pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
tzssangglass commented on PR #8380:
URL: https://github.com/apache/apisix/pull/8380#issuecomment-1326990538

   pls fix doc lint


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] tzssangglass commented on pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
tzssangglass commented on PR #8380:
URL: https://github.com/apache/apisix/pull/8380#issuecomment-1331633997

   > Could you give some tips for fix this pipeline [build (ubuntu-20.04, linux_openresty, t/gm)](https://github.com/apache/apisix/actions/runs/3571445230/jobs/6004502287#logs) error, I didnt modified anything about the `grpc_server_example`, thanks
   
   rerun CI


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] spacewander commented on a diff in pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
spacewander commented on code in PR #8380:
URL: https://github.com/apache/apisix/pull/8380#discussion_r1035593762


##########
apisix/discovery/consul/init.lua:
##########
@@ -0,0 +1,418 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local require            = require
+local local_conf         = require("apisix.core.config_local").local_conf()
+local core               = require("apisix.core")
+local core_sleep         = require("apisix.core.utils").sleep
+local resty_consul       = require('resty.consul')
+local http               = require('resty.http')
+local util               = require("apisix.cli.util")
+local ipairs             = ipairs
+local error              = error
+local ngx                = ngx
+local unpack             = unpack
+local tonumber           = tonumber
+local pairs              = pairs
+local ngx_timer_at       = ngx.timer.at
+local ngx_timer_every    = ngx.timer.every
+local log                = core.log
+local json_delay_encode  = core.json.delay_encode
+
+local all_services = core.table.new(0, 5)
+local default_service
+local default_weight
+local skip_service_map = core.table.new(0, 1)
+local dump_params
+
+local events
+local events_list
+local consul_services
+
+local _M = {
+    version = 0.1,
+}
+
+
+local function discovery_consul_callback(data, event, source, pid)
+    all_services = data
+    log.notice("update local variable all_services, event is: ", event,
+            "source: ", source, "server pid:", pid,
+            ", all services: ", json_delay_encode(all_services, true))
+end
+
+
+function _M.all_nodes()
+    return all_services
+end
+
+
+function _M.nodes(service_name)
+    if not all_services then
+        log.error("all_services is nil, failed to fetch nodes for : ", service_name)
+        return
+    end
+
+    local resp_list = all_services[service_name]
+
+    if not resp_list then
+        log.error("fetch nodes failed by ", service_name, ", return default service")
+        return default_service and {default_service}
+    end
+
+    log.info("process id: ", ngx.worker.id(), ", all_services[", service_name, "] = ",
+            json_delay_encode(resp_list, true))
+
+    return resp_list
+end
+
+
+local function parse_instance(node)
+    local service_name, host, port = node.Service, node.Address, node.Port
+    -- if exist, skip special service name
+    if service_name and skip_service_map[service_name] then
+        return false
+    end
+    return true, host, tonumber(port), "", service_name
+end
+
+
+local function update_all_services(server_name_prefix, data)
+    local sn
+    local up_services = core.table.new(0, #data)
+    local weight = default_weight
+    for _, node in pairs(data) do
+        local succ, ip, port, metadata, server_name = parse_instance(node)
+        if succ then
+            sn = server_name
+            local nodes = up_services[sn]
+            if not nodes then
+                nodes = core.table.new(1, 0)
+                up_services[sn] = nodes
+            end
+            core.table.insert(nodes, {
+                host = ip,
+                port = port,
+                weight = metadata and metadata.weight or weight,
+            })
+        end
+    end
+
+    -- clean old unused data
+    local old_services = consul_services[server_name_prefix] or {}
+    for k, _ in pairs(old_services) do
+        all_services[k] = nil
+    end
+    core.table.clear(old_services)
+
+    for k, v in pairs(up_services) do
+        all_services[k] = v
+    end
+    consul_services[server_name_prefix] = up_services
+
+    log.info("update all services: ", json_delay_encode(all_services, true))
+end
+
+
+local function read_dump_services()
+    local data, err = util.read_file(dump_params.path)
+    if not data then
+        log.error("read dump file get error: ", err)
+        return
+    end
+
+    log.info("read dump file: ", data)
+    data = util.trim(data)
+    if #data == 0 then
+        log.error("dump file is empty")
+        return
+    end
+
+    local entity, err  = core.json.decode(data)
+    if not entity then
+        log.error("decoded dump data got error: ", err, ", file content: ", data)
+        return
+    end
+
+    if not entity.services or not entity.last_update then
+        log.warn("decoded dump data miss fields, file content: ", data)
+        return
+    end
+
+    local now_time = ngx.time()
+    log.info("dump file last_update: ", entity.last_update, ", dump_params.expire: ",
+            dump_params.expire, ", now_time: ", now_time)
+    if dump_params.expire ~= 0  and (entity.last_update + dump_params.expire) < now_time then
+        log.warn("dump file: ", dump_params.path, " had expired, ignored it")
+        return
+    end
+
+    all_services = entity.services
+    log.info("load dump file into memory success")
+end
+
+
+local function write_dump_services()
+    local entity = {
+        services = all_services,
+        last_update = ngx.time(),
+        expire = dump_params.expire, -- later need handle it
+    }
+    local data = core.json.encode(entity)
+    local succ, err =  util.write_file(dump_params.path, data)
+    if not succ then
+        log.error("write dump into file got error: ", err)
+    end
+end
+
+
+local function show_dump_file()
+    if not dump_params then
+        return 503, "dump params is nil"
+    end
+
+    local data, err = util.read_file(dump_params.path)
+    if not data then
+        return 503, err
+    end
+
+    return 200, data
+end
+
+
+function _M.connect(premature, consul_server, retry_delay)
+    if premature then
+        return
+    end
+
+    local consul_client = resty_consul:new({
+        host = consul_server.host,
+        port = consul_server.port,
+        connect_timeout = consul_server.connect_timeout,
+        read_timeout = consul_server.read_timeout,
+        default_args = consul_server.default_args,
+    })
+
+    log.info("consul_server: ", json_delay_encode(consul_server, true))
+    local watch_result, watch_err = consul_client:get(consul_server.consul_watch_sub_url)
+    local watch_error_info = (watch_err ~= nil and watch_err)
+            or ((watch_result ~= nil and watch_result.status ~= 200)
+            and watch_result.status)
+    if watch_error_info then
+        log.error("connect consul: ", consul_server.consul_server_url,
+                " by sub url: ", consul_server.consul_watch_sub_url,
+                ", got watch result: ", json_delay_encode(watch_result, true),
+                 ", with error: ", watch_error_info)
+
+        if not retry_delay then
+            retry_delay = 1
+        else
+            retry_delay = retry_delay * 4
+        end
+
+        log.warn("retry connecting consul after ", retry_delay, " seconds")
+        core_sleep(retry_delay)
+
+        goto ERR
+    end
+
+    log.info("connect consul: ", consul_server.consul_server_url,
+            ", watch_result status: ", watch_result.status,
+            ", watch_result.headers.index: ", watch_result.headers['X-Consul-Index'],
+            ", consul_server.index: ", consul_server.index,
+            ", consul_server: ", json_delay_encode(consul_server, true))
+
+    -- if current index different last index then update service
+    if consul_server.index ~= watch_result.headers['X-Consul-Index'] then
+
+        -- fetch all services info
+        local result, err = consul_client:get(consul_server.consul_sub_url)
+
+        local error_info = (err ~= nil and err) or
+         ((result ~= nil and result.status ~= 200) and result.status)
+
+        if error_info then
+            log.error("connect consul: ", consul_server.consul_server_url,
+                    " by sub url: ", consul_server.consul_sub_url,
+                    ", got result: ", json_delay_encode(result, true),
+                    ", with error: ", error_info)
+
+            if not retry_delay then
+                retry_delay = 1
+            else
+                retry_delay = retry_delay * 4

Review Comment:
   Can we wrap the get+retry into a function?



##########
apisix/discovery/consul/init.lua:
##########
@@ -0,0 +1,418 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local require            = require
+local local_conf         = require("apisix.core.config_local").local_conf()
+local core               = require("apisix.core")
+local core_sleep         = require("apisix.core.utils").sleep
+local resty_consul       = require('resty.consul')
+local http               = require('resty.http')
+local util               = require("apisix.cli.util")
+local ipairs             = ipairs
+local error              = error
+local ngx                = ngx
+local unpack             = unpack
+local tonumber           = tonumber
+local pairs              = pairs
+local ngx_timer_at       = ngx.timer.at
+local ngx_timer_every    = ngx.timer.every
+local log                = core.log
+local json_delay_encode  = core.json.delay_encode
+
+local all_services = core.table.new(0, 5)
+local default_service
+local default_weight
+local skip_service_map = core.table.new(0, 1)
+local dump_params
+
+local events
+local events_list
+local consul_services
+
+local _M = {
+    version = 0.1,
+}
+
+
+local function discovery_consul_callback(data, event, source, pid)
+    all_services = data
+    log.notice("update local variable all_services, event is: ", event,
+            "source: ", source, "server pid:", pid,
+            ", all services: ", json_delay_encode(all_services, true))
+end
+
+
+function _M.all_nodes()
+    return all_services
+end
+
+
+function _M.nodes(service_name)
+    if not all_services then
+        log.error("all_services is nil, failed to fetch nodes for : ", service_name)
+        return
+    end
+
+    local resp_list = all_services[service_name]
+
+    if not resp_list then
+        log.error("fetch nodes failed by ", service_name, ", return default service")
+        return default_service and {default_service}
+    end
+
+    log.info("process id: ", ngx.worker.id(), ", all_services[", service_name, "] = ",
+            json_delay_encode(resp_list, true))
+
+    return resp_list
+end
+
+
+local function parse_instance(node)
+    local service_name, host, port = node.Service, node.Address, node.Port
+    -- if exist, skip special service name
+    if service_name and skip_service_map[service_name] then
+        return false
+    end
+    return true, host, tonumber(port), "", service_name
+end
+
+
+local function update_all_services(server_name_prefix, data)
+    local sn
+    local up_services = core.table.new(0, #data)
+    local weight = default_weight
+    for _, node in pairs(data) do
+        local succ, ip, port, metadata, server_name = parse_instance(node)
+        if succ then
+            sn = server_name
+            local nodes = up_services[sn]
+            if not nodes then
+                nodes = core.table.new(1, 0)
+                up_services[sn] = nodes
+            end
+            core.table.insert(nodes, {
+                host = ip,
+                port = port,
+                weight = metadata and metadata.weight or weight,
+            })
+        end
+    end
+
+    -- clean old unused data
+    local old_services = consul_services[server_name_prefix] or {}
+    for k, _ in pairs(old_services) do
+        all_services[k] = nil
+    end
+    core.table.clear(old_services)
+
+    for k, v in pairs(up_services) do
+        all_services[k] = v
+    end
+    consul_services[server_name_prefix] = up_services
+
+    log.info("update all services: ", json_delay_encode(all_services, true))
+end
+
+
+local function read_dump_services()
+    local data, err = util.read_file(dump_params.path)
+    if not data then
+        log.error("read dump file get error: ", err)
+        return
+    end
+
+    log.info("read dump file: ", data)
+    data = util.trim(data)
+    if #data == 0 then
+        log.error("dump file is empty")
+        return
+    end
+
+    local entity, err  = core.json.decode(data)

Review Comment:
   ```suggestion
       local entity, err = core.json.decode(data)
   ```



##########
Makefile:
##########
@@ -283,8 +283,9 @@ install: runtime
 
 	$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/discovery
 	$(ENV_INSTALL) apisix/discovery/*.lua $(ENV_INST_LUADIR)/apisix/discovery/
-	$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/discovery/{consul_kv,dns,eureka,nacos,kubernetes,tars}
+	$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/discovery/{consul_kv,consul,dns,eureka,nacos,kubernetes,tars}
 	$(ENV_INSTALL) apisix/discovery/consul_kv/*.lua $(ENV_INST_LUADIR)/apisix/discovery/consul_kv
+	$(ENV_INSTALL) apisix/discovery/consul/*.lua $(ENV_INST_LUADIR)/apisix/discovery/consul

Review Comment:
   In alphabetical order, the consul needs to be in front of consul_kv



##########
docs/en/latest/discovery/consul.md:
##########
@@ -0,0 +1,298 @@
+---
+title: consul
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+## Summary
+
+APACHE APISIX supports Consul as a service discovery
+
+## Configuration for discovery client
+
+### Configuration for Consul
+
+First of all, we need to add following configuration in `conf/config.yaml` :
+
+```yaml
+discovery:
+  consul:
+    servers:
+      - "http://127.0.0.1:8500"
+      - "http://127.0.0.1:8600"   # `http://127.0.0.1:8500` and `http://127.0.0.1:8600` are different clusters
+    skip_services:                # if you need to skip special keys

Review Comment:
   Ditto



##########
apisix/discovery/consul/init.lua:
##########
@@ -0,0 +1,418 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local require            = require
+local local_conf         = require("apisix.core.config_local").local_conf()
+local core               = require("apisix.core")
+local core_sleep         = require("apisix.core.utils").sleep
+local resty_consul       = require('resty.consul')
+local http               = require('resty.http')
+local util               = require("apisix.cli.util")
+local ipairs             = ipairs
+local error              = error
+local ngx                = ngx
+local unpack             = unpack
+local tonumber           = tonumber
+local pairs              = pairs
+local ngx_timer_at       = ngx.timer.at
+local ngx_timer_every    = ngx.timer.every
+local log                = core.log
+local json_delay_encode  = core.json.delay_encode
+
+local all_services = core.table.new(0, 5)
+local default_service
+local default_weight
+local skip_service_map = core.table.new(0, 1)
+local dump_params
+
+local events
+local events_list
+local consul_services
+
+local _M = {
+    version = 0.1,
+}
+
+
+local function discovery_consul_callback(data, event, source, pid)
+    all_services = data
+    log.notice("update local variable all_services, event is: ", event,
+            "source: ", source, "server pid:", pid,
+            ", all services: ", json_delay_encode(all_services, true))
+end
+
+
+function _M.all_nodes()
+    return all_services
+end
+
+
+function _M.nodes(service_name)
+    if not all_services then
+        log.error("all_services is nil, failed to fetch nodes for : ", service_name)
+        return
+    end
+
+    local resp_list = all_services[service_name]
+
+    if not resp_list then
+        log.error("fetch nodes failed by ", service_name, ", return default service")
+        return default_service and {default_service}
+    end
+
+    log.info("process id: ", ngx.worker.id(), ", all_services[", service_name, "] = ",
+            json_delay_encode(resp_list, true))
+
+    return resp_list
+end
+
+
+local function parse_instance(node)
+    local service_name, host, port = node.Service, node.Address, node.Port
+    -- if exist, skip special service name
+    if service_name and skip_service_map[service_name] then
+        return false
+    end
+    return true, host, tonumber(port), "", service_name
+end
+
+
+local function update_all_services(server_name_prefix, data)
+    local sn
+    local up_services = core.table.new(0, #data)
+    local weight = default_weight
+    for _, node in pairs(data) do
+        local succ, ip, port, metadata, server_name = parse_instance(node)
+        if succ then
+            sn = server_name
+            local nodes = up_services[sn]
+            if not nodes then
+                nodes = core.table.new(1, 0)
+                up_services[sn] = nodes
+            end
+            core.table.insert(nodes, {
+                host = ip,
+                port = port,
+                weight = metadata and metadata.weight or weight,
+            })
+        end
+    end
+
+    -- clean old unused data
+    local old_services = consul_services[server_name_prefix] or {}
+    for k, _ in pairs(old_services) do
+        all_services[k] = nil
+    end
+    core.table.clear(old_services)
+
+    for k, v in pairs(up_services) do
+        all_services[k] = v
+    end
+    consul_services[server_name_prefix] = up_services
+
+    log.info("update all services: ", json_delay_encode(all_services, true))
+end
+
+
+local function read_dump_services()
+    local data, err = util.read_file(dump_params.path)
+    if not data then
+        log.error("read dump file get error: ", err)
+        return
+    end
+
+    log.info("read dump file: ", data)
+    data = util.trim(data)
+    if #data == 0 then
+        log.error("dump file is empty")
+        return
+    end
+
+    local entity, err  = core.json.decode(data)
+    if not entity then
+        log.error("decoded dump data got error: ", err, ", file content: ", data)
+        return
+    end
+
+    if not entity.services or not entity.last_update then
+        log.warn("decoded dump data miss fields, file content: ", data)
+        return
+    end
+
+    local now_time = ngx.time()
+    log.info("dump file last_update: ", entity.last_update, ", dump_params.expire: ",
+            dump_params.expire, ", now_time: ", now_time)
+    if dump_params.expire ~= 0  and (entity.last_update + dump_params.expire) < now_time then
+        log.warn("dump file: ", dump_params.path, " had expired, ignored it")
+        return
+    end
+
+    all_services = entity.services
+    log.info("load dump file into memory success")
+end
+
+
+local function write_dump_services()
+    local entity = {
+        services = all_services,
+        last_update = ngx.time(),
+        expire = dump_params.expire, -- later need handle it
+    }
+    local data = core.json.encode(entity)
+    local succ, err =  util.write_file(dump_params.path, data)
+    if not succ then
+        log.error("write dump into file got error: ", err)
+    end
+end
+
+
+local function show_dump_file()
+    if not dump_params then
+        return 503, "dump params is nil"
+    end
+
+    local data, err = util.read_file(dump_params.path)
+    if not data then
+        return 503, err
+    end
+
+    return 200, data
+end
+
+
+function _M.connect(premature, consul_server, retry_delay)
+    if premature then
+        return
+    end
+
+    local consul_client = resty_consul:new({
+        host = consul_server.host,
+        port = consul_server.port,
+        connect_timeout = consul_server.connect_timeout,
+        read_timeout = consul_server.read_timeout,
+        default_args = consul_server.default_args,
+    })
+
+    log.info("consul_server: ", json_delay_encode(consul_server, true))
+    local watch_result, watch_err = consul_client:get(consul_server.consul_watch_sub_url)
+    local watch_error_info = (watch_err ~= nil and watch_err)
+            or ((watch_result ~= nil and watch_result.status ~= 200)
+            and watch_result.status)
+    if watch_error_info then
+        log.error("connect consul: ", consul_server.consul_server_url,
+                " by sub url: ", consul_server.consul_watch_sub_url,
+                ", got watch result: ", json_delay_encode(watch_result, true),
+                 ", with error: ", watch_error_info)
+
+        if not retry_delay then
+            retry_delay = 1
+        else
+            retry_delay = retry_delay * 4
+        end
+
+        log.warn("retry connecting consul after ", retry_delay, " seconds")
+        core_sleep(retry_delay)
+
+        goto ERR
+    end
+
+    log.info("connect consul: ", consul_server.consul_server_url,
+            ", watch_result status: ", watch_result.status,
+            ", watch_result.headers.index: ", watch_result.headers['X-Consul-Index'],
+            ", consul_server.index: ", consul_server.index,
+            ", consul_server: ", json_delay_encode(consul_server, true))
+
+    -- if current index different last index then update service
+    if consul_server.index ~= watch_result.headers['X-Consul-Index'] then
+
+        -- fetch all services info
+        local result, err = consul_client:get(consul_server.consul_sub_url)
+
+        local error_info = (err ~= nil and err) or
+         ((result ~= nil and result.status ~= 200) and result.status)
+
+        if error_info then
+            log.error("connect consul: ", consul_server.consul_server_url,
+                    " by sub url: ", consul_server.consul_sub_url,
+                    ", got result: ", json_delay_encode(result, true),
+                    ", with error: ", error_info)
+
+            if not retry_delay then
+                retry_delay = 1
+            else
+                retry_delay = retry_delay * 4
+            end
+
+            log.warn("retry connecting consul after ", retry_delay, " seconds")
+            core_sleep(retry_delay)
+
+            goto ERR
+        end
+
+        consul_server.index = watch_result.headers['X-Consul-Index']
+        -- only long connect type use index
+        if consul_server.keepalive then
+            consul_server.default_args.index = watch_result.headers['X-Consul-Index']
+        end
+        -- decode body, decode json, update service, error handling
+        if result.body then
+            log.notice("server_name: ", consul_server.consul_server_url,
+                    ", header: ", json_delay_encode(result.headers, true),
+                    ", body: ", json_delay_encode(result.body, true))
+            update_all_services(consul_server.consul_server_url, result.body)
+            --update events
+            local ok, err = events.post(events_list._source, events_list.updating, all_services)
+            if not ok then
+                log.error("post_event failure with ", events_list._source,
+                        ", update all services error: ", err)
+            end
+
+            if dump_params then
+                ngx_timer_at(0, write_dump_services)
+            end
+        end
+    end
+
+    :: ERR ::
+    local keepalive = consul_server.keepalive
+    if keepalive then
+        local ok, err = ngx_timer_at(0, _M.connect, consul_server, retry_delay)
+        if not ok then
+            log.error("create ngx_timer_at got error: ", err)
+            return
+        end
+    end
+end
+
+
+local function format_consul_params(consul_conf)
+    local consul_server_list = core.table.new(0, #consul_conf.servers)
+    local args
+
+    if consul_conf.keepalive == false then
+        args = {}
+    elseif consul_conf.keepalive then
+        args = {
+            wait = consul_conf.timeout.wait, --blocked wait!=0; unblocked by wait=0
+            index = 0,
+        }
+    end
+
+    for _, v in pairs(consul_conf.servers) do
+        local scheme, host, port, path = unpack(http.parse_uri(nil, v))
+        if scheme ~= "http" then
+            return nil, "only support consul http schema address, eg: http://address:port"
+        elseif path ~= "/" or core.string.has_suffix(v, '/') then
+            return nil, "invalid consul server address, the valid format: http://address:port"
+        end
+
+        core.table.insert(consul_server_list, {
+            host = host,
+            port = port,
+            connect_timeout = consul_conf.timeout.connect,
+            read_timeout = consul_conf.timeout.read,
+            consul_sub_url = "/agent/services",
+            consul_watch_sub_url = "/catalog/services",
+            consul_server_url = v .. "/v1",
+            weight = consul_conf.weight,
+            keepalive = consul_conf.keepalive,
+            default_args = args,
+            index = 0,
+            fetch_interval = consul_conf.fetch_interval -- fetch interval to next connect consul
+        })
+    end
+
+    return consul_server_list
+end
+
+
+function _M.init_worker()
+    local consul_conf = local_conf.discovery.consul
+
+    if consul_conf.dump then
+        local dump = consul_conf.dump
+        dump_params = dump
+
+        if dump.load_on_init then
+            read_dump_services()
+        end
+    end
+
+    events = require("resty.worker.events")
+    events_list = events.event_list(
+            "discovery_consul_update_all_services",
+            "updating"
+    )
+
+    if 0 ~= ngx.worker.id() then
+        events.register(discovery_consul_callback, events_list._source, events_list.updating)
+        return
+    end
+
+    log.notice("consul_conf: ", json_delay_encode(consul_conf, true))
+    default_weight = consul_conf.weight
+    -- set default service, used when the server node cannot be found
+    if consul_conf.default_service then
+        default_service = consul_conf.default_service
+        default_service.weight = default_weight
+    end
+    if consul_conf.skip_services then
+        skip_service_map = core.table.new(0, #consul_conf.skip_services)
+        for _, v in ipairs(consul_conf.skip_services) do
+            skip_service_map[v] = true
+        end
+    end
+
+    local consul_servers_list, err = format_consul_params(consul_conf)
+    if err then
+        error(err)
+        return

Review Comment:
   Can we remove the unreachable code here?



##########
conf/config-default.yaml:
##########
@@ -309,6 +309,29 @@ nginx_config:                     # config for render the template to generate n
 #    dump:                         # if you need, when registered nodes updated can dump into file
 #       path: "logs/consul_kv.dump"
 #       expire: 2592000            # unit sec, here is 30 day
+#  consul:
+#    servers:
+#      - "http://127.0.0.1:8500"
+#      - "http://127.0.0.1:8600"
+#    skip_services:                    # if you need to skip special keys

Review Comment:
   Special services?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] Fabriceli commented on a diff in pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
Fabriceli commented on code in PR #8380:
URL: https://github.com/apache/apisix/pull/8380#discussion_r1035652170


##########
Makefile:
##########
@@ -283,8 +283,9 @@ install: runtime
 
 	$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/discovery
 	$(ENV_INSTALL) apisix/discovery/*.lua $(ENV_INST_LUADIR)/apisix/discovery/
-	$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/discovery/{consul_kv,dns,eureka,nacos,kubernetes,tars}
+	$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/discovery/{consul_kv,consul,dns,eureka,nacos,kubernetes,tars}
 	$(ENV_INSTALL) apisix/discovery/consul_kv/*.lua $(ENV_INST_LUADIR)/apisix/discovery/consul_kv
+	$(ENV_INSTALL) apisix/discovery/consul/*.lua $(ENV_INST_LUADIR)/apisix/discovery/consul

Review Comment:
   done



##########
apisix/discovery/consul/init.lua:
##########
@@ -0,0 +1,418 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local require            = require
+local local_conf         = require("apisix.core.config_local").local_conf()
+local core               = require("apisix.core")
+local core_sleep         = require("apisix.core.utils").sleep
+local resty_consul       = require('resty.consul')
+local http               = require('resty.http')
+local util               = require("apisix.cli.util")
+local ipairs             = ipairs
+local error              = error
+local ngx                = ngx
+local unpack             = unpack
+local tonumber           = tonumber
+local pairs              = pairs
+local ngx_timer_at       = ngx.timer.at
+local ngx_timer_every    = ngx.timer.every
+local log                = core.log
+local json_delay_encode  = core.json.delay_encode
+
+local all_services = core.table.new(0, 5)
+local default_service
+local default_weight
+local skip_service_map = core.table.new(0, 1)
+local dump_params
+
+local events
+local events_list
+local consul_services
+
+local _M = {
+    version = 0.1,
+}
+
+
+local function discovery_consul_callback(data, event, source, pid)
+    all_services = data
+    log.notice("update local variable all_services, event is: ", event,
+            "source: ", source, "server pid:", pid,
+            ", all services: ", json_delay_encode(all_services, true))
+end
+
+
+function _M.all_nodes()
+    return all_services
+end
+
+
+function _M.nodes(service_name)
+    if not all_services then
+        log.error("all_services is nil, failed to fetch nodes for : ", service_name)
+        return
+    end
+
+    local resp_list = all_services[service_name]
+
+    if not resp_list then
+        log.error("fetch nodes failed by ", service_name, ", return default service")
+        return default_service and {default_service}
+    end
+
+    log.info("process id: ", ngx.worker.id(), ", all_services[", service_name, "] = ",
+            json_delay_encode(resp_list, true))
+
+    return resp_list
+end
+
+
+local function parse_instance(node)
+    local service_name, host, port = node.Service, node.Address, node.Port
+    -- if exist, skip special service name
+    if service_name and skip_service_map[service_name] then
+        return false
+    end
+    return true, host, tonumber(port), "", service_name
+end
+
+
+local function update_all_services(server_name_prefix, data)
+    local sn
+    local up_services = core.table.new(0, #data)
+    local weight = default_weight
+    for _, node in pairs(data) do
+        local succ, ip, port, metadata, server_name = parse_instance(node)
+        if succ then
+            sn = server_name
+            local nodes = up_services[sn]
+            if not nodes then
+                nodes = core.table.new(1, 0)
+                up_services[sn] = nodes
+            end
+            core.table.insert(nodes, {
+                host = ip,
+                port = port,
+                weight = metadata and metadata.weight or weight,
+            })
+        end
+    end
+
+    -- clean old unused data
+    local old_services = consul_services[server_name_prefix] or {}
+    for k, _ in pairs(old_services) do
+        all_services[k] = nil
+    end
+    core.table.clear(old_services)
+
+    for k, v in pairs(up_services) do
+        all_services[k] = v
+    end
+    consul_services[server_name_prefix] = up_services
+
+    log.info("update all services: ", json_delay_encode(all_services, true))
+end
+
+
+local function read_dump_services()
+    local data, err = util.read_file(dump_params.path)
+    if not data then
+        log.error("read dump file get error: ", err)
+        return
+    end
+
+    log.info("read dump file: ", data)
+    data = util.trim(data)
+    if #data == 0 then
+        log.error("dump file is empty")
+        return
+    end
+
+    local entity, err  = core.json.decode(data)

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] Fabriceli commented on pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
Fabriceli commented on PR #8380:
URL: https://github.com/apache/apisix/pull/8380#issuecomment-1327113109

   > pls fix doc lint
   
   done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] tzssangglass commented on a diff in pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
tzssangglass commented on code in PR #8380:
URL: https://github.com/apache/apisix/pull/8380#discussion_r1031979847


##########
apisix/discovery/consul/init.lua:
##########
@@ -0,0 +1,418 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local require            = require
+local local_conf         = require("apisix.core.config_local").local_conf()
+local core               = require("apisix.core")
+local core_sleep         = require("apisix.core.utils").sleep
+local resty_consul       = require('resty.consul')
+local http               = require('resty.http')
+local util               = require("apisix.cli.util")
+local ipairs             = ipairs
+local error              = error
+local ngx                = ngx
+local unpack             = unpack
+local tonumber           = tonumber
+local pairs              = pairs
+local ngx_timer_at       = ngx.timer.at
+local ngx_timer_every    = ngx.timer.every
+local log                = core.log
+local json_delay_encode  = core.json.delay_encode
+
+local all_services = core.table.new(0, 5)
+local default_service
+local default_weight
+local skip_service_map = core.table.new(0, 1)
+local dump_params
+
+local events
+local events_list
+local consul_services
+
+local _M = {
+    version = 0.1,
+}
+
+
+local function discovery_consul_callback(data, event, source, pid)
+    all_services = data
+    log.notice("update local variable all_services, event is: ", event,
+            "source: ", source, "server pid:", pid,
+            ", all services: ", core.json.encode(all_services, true))

Review Comment:
   ```suggestion
               ", all services: ", core.json.delay_encode(all_services, true))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] Fabriceli commented on a diff in pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
Fabriceli commented on code in PR #8380:
URL: https://github.com/apache/apisix/pull/8380#discussion_r1032017469


##########
apisix/discovery/consul/init.lua:
##########
@@ -0,0 +1,418 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local require            = require
+local local_conf         = require("apisix.core.config_local").local_conf()
+local core               = require("apisix.core")
+local core_sleep         = require("apisix.core.utils").sleep
+local resty_consul       = require('resty.consul')
+local http               = require('resty.http')
+local util               = require("apisix.cli.util")
+local ipairs             = ipairs
+local error              = error
+local ngx                = ngx
+local unpack             = unpack
+local tonumber           = tonumber
+local pairs              = pairs
+local ngx_timer_at       = ngx.timer.at
+local ngx_timer_every    = ngx.timer.every
+local log                = core.log
+local json_delay_encode  = core.json.delay_encode
+
+local all_services = core.table.new(0, 5)
+local default_service
+local default_weight
+local skip_service_map = core.table.new(0, 1)
+local dump_params
+
+local events
+local events_list
+local consul_services
+
+local _M = {
+    version = 0.1,
+}
+
+
+local function discovery_consul_callback(data, event, source, pid)
+    all_services = data
+    log.notice("update local variable all_services, event is: ", event,
+            "source: ", source, "server pid:", pid,
+            ", all services: ", core.json.encode(all_services, true))

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] Fabriceli commented on a diff in pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
Fabriceli commented on code in PR #8380:
URL: https://github.com/apache/apisix/pull/8380#discussion_r1036753790


##########
apisix/discovery/consul/init.lua:
##########
@@ -0,0 +1,418 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local require            = require
+local local_conf         = require("apisix.core.config_local").local_conf()
+local core               = require("apisix.core")
+local core_sleep         = require("apisix.core.utils").sleep
+local resty_consul       = require('resty.consul')
+local http               = require('resty.http')
+local util               = require("apisix.cli.util")
+local ipairs             = ipairs
+local error              = error
+local ngx                = ngx
+local unpack             = unpack
+local tonumber           = tonumber
+local pairs              = pairs
+local ngx_timer_at       = ngx.timer.at
+local ngx_timer_every    = ngx.timer.every
+local log                = core.log
+local json_delay_encode  = core.json.delay_encode
+
+local all_services = core.table.new(0, 5)
+local default_service
+local default_weight
+local skip_service_map = core.table.new(0, 1)
+local dump_params
+
+local events
+local events_list
+local consul_services
+
+local _M = {
+    version = 0.1,
+}
+
+
+local function discovery_consul_callback(data, event, source, pid)
+    all_services = data
+    log.notice("update local variable all_services, event is: ", event,
+            "source: ", source, "server pid:", pid,
+            ", all services: ", json_delay_encode(all_services, true))
+end
+
+
+function _M.all_nodes()
+    return all_services
+end
+
+
+function _M.nodes(service_name)
+    if not all_services then
+        log.error("all_services is nil, failed to fetch nodes for : ", service_name)
+        return
+    end
+
+    local resp_list = all_services[service_name]
+
+    if not resp_list then
+        log.error("fetch nodes failed by ", service_name, ", return default service")
+        return default_service and {default_service}
+    end
+
+    log.info("process id: ", ngx.worker.id(), ", all_services[", service_name, "] = ",
+            json_delay_encode(resp_list, true))
+
+    return resp_list
+end
+
+
+local function parse_instance(node)
+    local service_name, host, port = node.Service, node.Address, node.Port
+    -- if exist, skip special service name
+    if service_name and skip_service_map[service_name] then
+        return false
+    end
+    return true, host, tonumber(port), "", service_name
+end
+
+
+local function update_all_services(server_name_prefix, data)
+    local sn
+    local up_services = core.table.new(0, #data)
+    local weight = default_weight
+    for _, node in pairs(data) do
+        local succ, ip, port, metadata, server_name = parse_instance(node)
+        if succ then
+            sn = server_name
+            local nodes = up_services[sn]
+            if not nodes then
+                nodes = core.table.new(1, 0)
+                up_services[sn] = nodes
+            end
+            core.table.insert(nodes, {
+                host = ip,
+                port = port,
+                weight = metadata and metadata.weight or weight,
+            })
+        end
+    end
+
+    -- clean old unused data
+    local old_services = consul_services[server_name_prefix] or {}
+    for k, _ in pairs(old_services) do
+        all_services[k] = nil
+    end
+    core.table.clear(old_services)
+
+    for k, v in pairs(up_services) do
+        all_services[k] = v
+    end
+    consul_services[server_name_prefix] = up_services
+
+    log.info("update all services: ", json_delay_encode(all_services, true))
+end
+
+
+local function read_dump_services()
+    local data, err = util.read_file(dump_params.path)
+    if not data then
+        log.error("read dump file get error: ", err)
+        return
+    end
+
+    log.info("read dump file: ", data)
+    data = util.trim(data)
+    if #data == 0 then
+        log.error("dump file is empty")
+        return
+    end
+
+    local entity, err  = core.json.decode(data)
+    if not entity then
+        log.error("decoded dump data got error: ", err, ", file content: ", data)
+        return
+    end
+
+    if not entity.services or not entity.last_update then
+        log.warn("decoded dump data miss fields, file content: ", data)
+        return
+    end
+
+    local now_time = ngx.time()
+    log.info("dump file last_update: ", entity.last_update, ", dump_params.expire: ",
+            dump_params.expire, ", now_time: ", now_time)
+    if dump_params.expire ~= 0  and (entity.last_update + dump_params.expire) < now_time then
+        log.warn("dump file: ", dump_params.path, " had expired, ignored it")
+        return
+    end
+
+    all_services = entity.services
+    log.info("load dump file into memory success")
+end
+
+
+local function write_dump_services()
+    local entity = {
+        services = all_services,
+        last_update = ngx.time(),
+        expire = dump_params.expire, -- later need handle it
+    }
+    local data = core.json.encode(entity)
+    local succ, err =  util.write_file(dump_params.path, data)
+    if not succ then
+        log.error("write dump into file got error: ", err)
+    end
+end
+
+
+local function show_dump_file()
+    if not dump_params then
+        return 503, "dump params is nil"
+    end
+
+    local data, err = util.read_file(dump_params.path)
+    if not data then
+        return 503, err
+    end
+
+    return 200, data
+end
+
+
+function _M.connect(premature, consul_server, retry_delay)
+    if premature then
+        return
+    end
+
+    local consul_client = resty_consul:new({
+        host = consul_server.host,
+        port = consul_server.port,
+        connect_timeout = consul_server.connect_timeout,
+        read_timeout = consul_server.read_timeout,
+        default_args = consul_server.default_args,
+    })
+
+    log.info("consul_server: ", json_delay_encode(consul_server, true))
+    local watch_result, watch_err = consul_client:get(consul_server.consul_watch_sub_url)
+    local watch_error_info = (watch_err ~= nil and watch_err)
+            or ((watch_result ~= nil and watch_result.status ~= 200)
+            and watch_result.status)
+    if watch_error_info then
+        log.error("connect consul: ", consul_server.consul_server_url,
+                " by sub url: ", consul_server.consul_watch_sub_url,
+                ", got watch result: ", json_delay_encode(watch_result, true),
+                 ", with error: ", watch_error_info)
+
+        if not retry_delay then
+            retry_delay = 1
+        else
+            retry_delay = retry_delay * 4
+        end
+
+        log.warn("retry connecting consul after ", retry_delay, " seconds")
+        core_sleep(retry_delay)
+
+        goto ERR
+    end
+
+    log.info("connect consul: ", consul_server.consul_server_url,
+            ", watch_result status: ", watch_result.status,
+            ", watch_result.headers.index: ", watch_result.headers['X-Consul-Index'],
+            ", consul_server.index: ", consul_server.index,
+            ", consul_server: ", json_delay_encode(consul_server, true))
+
+    -- if current index different last index then update service
+    if consul_server.index ~= watch_result.headers['X-Consul-Index'] then
+
+        -- fetch all services info
+        local result, err = consul_client:get(consul_server.consul_sub_url)
+
+        local error_info = (err ~= nil and err) or
+         ((result ~= nil and result.status ~= 200) and result.status)
+
+        if error_info then
+            log.error("connect consul: ", consul_server.consul_server_url,
+                    " by sub url: ", consul_server.consul_sub_url,
+                    ", got result: ", json_delay_encode(result, true),
+                    ", with error: ", error_info)
+
+            if not retry_delay then
+                retry_delay = 1
+            else
+                retry_delay = retry_delay * 4
+            end
+
+            log.warn("retry connecting consul after ", retry_delay, " seconds")
+            core_sleep(retry_delay)
+
+            goto ERR
+        end
+
+        consul_server.index = watch_result.headers['X-Consul-Index']
+        -- only long connect type use index
+        if consul_server.keepalive then
+            consul_server.default_args.index = watch_result.headers['X-Consul-Index']
+        end
+        -- decode body, decode json, update service, error handling
+        if result.body then
+            log.notice("server_name: ", consul_server.consul_server_url,
+                    ", header: ", json_delay_encode(result.headers, true),
+                    ", body: ", json_delay_encode(result.body, true))
+            update_all_services(consul_server.consul_server_url, result.body)
+            --update events
+            local ok, err = events.post(events_list._source, events_list.updating, all_services)
+            if not ok then
+                log.error("post_event failure with ", events_list._source,
+                        ", update all services error: ", err)
+            end
+
+            if dump_params then
+                ngx_timer_at(0, write_dump_services)
+            end
+        end
+    end
+
+    :: ERR ::
+    local keepalive = consul_server.keepalive
+    if keepalive then
+        local ok, err = ngx_timer_at(0, _M.connect, consul_server, retry_delay)
+        if not ok then
+            log.error("create ngx_timer_at got error: ", err)
+            return
+        end
+    end
+end
+
+
+local function format_consul_params(consul_conf)
+    local consul_server_list = core.table.new(0, #consul_conf.servers)
+    local args
+
+    if consul_conf.keepalive == false then
+        args = {}
+    elseif consul_conf.keepalive then
+        args = {
+            wait = consul_conf.timeout.wait, --blocked wait!=0; unblocked by wait=0
+            index = 0,
+        }
+    end
+
+    for _, v in pairs(consul_conf.servers) do
+        local scheme, host, port, path = unpack(http.parse_uri(nil, v))
+        if scheme ~= "http" then
+            return nil, "only support consul http schema address, eg: http://address:port"
+        elseif path ~= "/" or core.string.has_suffix(v, '/') then
+            return nil, "invalid consul server address, the valid format: http://address:port"
+        end
+
+        core.table.insert(consul_server_list, {
+            host = host,
+            port = port,
+            connect_timeout = consul_conf.timeout.connect,
+            read_timeout = consul_conf.timeout.read,
+            consul_sub_url = "/agent/services",
+            consul_watch_sub_url = "/catalog/services",
+            consul_server_url = v .. "/v1",
+            weight = consul_conf.weight,
+            keepalive = consul_conf.keepalive,
+            default_args = args,
+            index = 0,
+            fetch_interval = consul_conf.fetch_interval -- fetch interval to next connect consul
+        })
+    end
+
+    return consul_server_list
+end
+
+
+function _M.init_worker()
+    local consul_conf = local_conf.discovery.consul
+
+    if consul_conf.dump then
+        local dump = consul_conf.dump
+        dump_params = dump
+
+        if dump.load_on_init then
+            read_dump_services()
+        end
+    end
+
+    events = require("resty.worker.events")
+    events_list = events.event_list(
+            "discovery_consul_update_all_services",
+            "updating"
+    )
+
+    if 0 ~= ngx.worker.id() then
+        events.register(discovery_consul_callback, events_list._source, events_list.updating)
+        return
+    end
+
+    log.notice("consul_conf: ", json_delay_encode(consul_conf, true))
+    default_weight = consul_conf.weight
+    -- set default service, used when the server node cannot be found
+    if consul_conf.default_service then
+        default_service = consul_conf.default_service
+        default_service.weight = default_weight
+    end
+    if consul_conf.skip_services then
+        skip_service_map = core.table.new(0, #consul_conf.skip_services)
+        for _, v in ipairs(consul_conf.skip_services) do
+            skip_service_map[v] = true
+        end
+    end
+
+    local consul_servers_list, err = format_consul_params(consul_conf)
+    if err then
+        error(err)
+        return

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] Fabriceli commented on pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
Fabriceli commented on PR #8380:
URL: https://github.com/apache/apisix/pull/8380#issuecomment-1331524165

   @tzssangglass Could you give some tips for fix this pipeline [build (ubuntu-20.04, linux_openresty, t/gm)](https://github.com/apache/apisix/actions/runs/3571445230/jobs/6004502287#logs) error, I didnt modified anything about the  `grpc_server_example`, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] tzssangglass commented on a diff in pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
tzssangglass commented on code in PR #8380:
URL: https://github.com/apache/apisix/pull/8380#discussion_r1039113634


##########
apisix/discovery/consul/init.lua:
##########
@@ -0,0 +1,416 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local require            = require
+local local_conf         = require("apisix.core.config_local").local_conf()
+local core               = require("apisix.core")
+local core_sleep         = require("apisix.core.utils").sleep
+local resty_consul       = require('resty.consul')
+local http               = require('resty.http')
+local util               = require("apisix.cli.util")
+local ipairs             = ipairs
+local error              = error
+local ngx                = ngx
+local unpack             = unpack
+local tonumber           = tonumber
+local pairs              = pairs
+local ngx_timer_at       = ngx.timer.at
+local ngx_timer_every    = ngx.timer.every
+local log                = core.log
+local json_delay_encode  = core.json.delay_encode
+
+local all_services = core.table.new(0, 5)
+local default_service
+local default_weight
+local skip_service_map = core.table.new(0, 1)
+local dump_params
+
+local events
+local events_list
+local consul_services
+
+local _M = {
+    version = 0.1,
+}
+
+
+local function discovery_consul_callback(data, event, source, pid)
+    all_services = data
+    log.notice("update local variable all_services, event is: ", event,
+            "source: ", source, "server pid:", pid,
+            ", all services: ", json_delay_encode(all_services, true))
+end
+
+
+function _M.all_nodes()
+    return all_services
+end
+
+
+function _M.nodes(service_name)
+    if not all_services then
+        log.error("all_services is nil, failed to fetch nodes for : ", service_name)
+        return
+    end
+
+    local resp_list = all_services[service_name]
+
+    if not resp_list then
+        log.error("fetch nodes failed by ", service_name, ", return default service")
+        return default_service and {default_service}
+    end
+
+    log.info("process id: ", ngx.worker.id(), ", all_services[", service_name, "] = ",
+            json_delay_encode(resp_list, true))
+
+    return resp_list
+end
+
+
+local function parse_instance(node)
+    local service_name, host, port = node.Service, node.Address, node.Port
+    -- if exist, skip special service name
+    if service_name and skip_service_map[service_name] then
+        return false
+    end
+    return true, host, tonumber(port), "", service_name

Review Comment:
   what is the usage of `""` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] Fabriceli commented on a diff in pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
Fabriceli commented on code in PR #8380:
URL: https://github.com/apache/apisix/pull/8380#discussion_r1039167972


##########
apisix/discovery/consul/init.lua:
##########
@@ -0,0 +1,416 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local require            = require
+local local_conf         = require("apisix.core.config_local").local_conf()
+local core               = require("apisix.core")
+local core_sleep         = require("apisix.core.utils").sleep
+local resty_consul       = require('resty.consul')
+local http               = require('resty.http')
+local util               = require("apisix.cli.util")
+local ipairs             = ipairs
+local error              = error
+local ngx                = ngx
+local unpack             = unpack
+local tonumber           = tonumber
+local pairs              = pairs
+local ngx_timer_at       = ngx.timer.at
+local ngx_timer_every    = ngx.timer.every
+local log                = core.log
+local json_delay_encode  = core.json.delay_encode
+
+local all_services = core.table.new(0, 5)
+local default_service
+local default_weight
+local skip_service_map = core.table.new(0, 1)
+local dump_params
+
+local events
+local events_list
+local consul_services
+
+local _M = {
+    version = 0.1,
+}
+
+
+local function discovery_consul_callback(data, event, source, pid)
+    all_services = data
+    log.notice("update local variable all_services, event is: ", event,
+            "source: ", source, "server pid:", pid,
+            ", all services: ", json_delay_encode(all_services, true))
+end
+
+
+function _M.all_nodes()
+    return all_services
+end
+
+
+function _M.nodes(service_name)
+    if not all_services then
+        log.error("all_services is nil, failed to fetch nodes for : ", service_name)
+        return
+    end
+
+    local resp_list = all_services[service_name]
+
+    if not resp_list then
+        log.error("fetch nodes failed by ", service_name, ", return default service")
+        return default_service and {default_service}
+    end
+
+    log.info("process id: ", ngx.worker.id(), ", all_services[", service_name, "] = ",
+            json_delay_encode(resp_list, true))

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] Fabriceli commented on a diff in pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
Fabriceli commented on code in PR #8380:
URL: https://github.com/apache/apisix/pull/8380#discussion_r1039152569


##########
apisix/discovery/consul/init.lua:
##########
@@ -0,0 +1,416 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local require            = require
+local local_conf         = require("apisix.core.config_local").local_conf()
+local core               = require("apisix.core")
+local core_sleep         = require("apisix.core.utils").sleep
+local resty_consul       = require('resty.consul')
+local http               = require('resty.http')
+local util               = require("apisix.cli.util")
+local ipairs             = ipairs
+local error              = error
+local ngx                = ngx
+local unpack             = unpack
+local tonumber           = tonumber
+local pairs              = pairs
+local ngx_timer_at       = ngx.timer.at
+local ngx_timer_every    = ngx.timer.every
+local log                = core.log
+local json_delay_encode  = core.json.delay_encode
+
+local all_services = core.table.new(0, 5)
+local default_service
+local default_weight
+local skip_service_map = core.table.new(0, 1)
+local dump_params
+
+local events
+local events_list
+local consul_services
+
+local _M = {
+    version = 0.1,
+}
+
+
+local function discovery_consul_callback(data, event, source, pid)
+    all_services = data
+    log.notice("update local variable all_services, event is: ", event,
+            "source: ", source, "server pid:", pid,
+            ", all services: ", json_delay_encode(all_services, true))
+end
+
+
+function _M.all_nodes()
+    return all_services
+end
+
+
+function _M.nodes(service_name)
+    if not all_services then
+        log.error("all_services is nil, failed to fetch nodes for : ", service_name)
+        return
+    end
+
+    local resp_list = all_services[service_name]
+
+    if not resp_list then
+        log.error("fetch nodes failed by ", service_name, ", return default service")
+        return default_service and {default_service}
+    end
+
+    log.info("process id: ", ngx.worker.id(), ", all_services[", service_name, "] = ",
+            json_delay_encode(resp_list, true))
+
+    return resp_list
+end
+
+
+local function parse_instance(node)
+    local service_name, host, port = node.Service, node.Address, node.Port
+    -- if exist, skip special service name
+    if service_name and skip_service_map[service_name] then
+        return false
+    end
+    return true, host, tonumber(port), "", service_name

Review Comment:
   for metadata, which specifies arbitrary KV metadata linked to the service instance.
   Added comment in the `init.lua` code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] Fabriceli commented on pull request #8380: feat: add consul discovery module

Posted by GitBox <gi...@apache.org>.
Fabriceli commented on PR #8380:
URL: https://github.com/apache/apisix/pull/8380#issuecomment-1338671439

   Please take a look at this CR, thanks, cc @spacewander 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org