You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shenyu.apache.org by im...@apache.org on 2022/06/10 06:06:24 UTC
[incubator-shenyu-nginx] branch main updated: support discover shenyu instances from nacos (#9)
This is an automated email from the ASF dual-hosted git repository.
impactcn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-shenyu-nginx.git
The following commit(s) were added to refs/heads/main by this push:
new 9246251 support discover shenyu instances from nacos (#9)
9246251 is described below
commit 9246251b37a0483b2b2e67f31cbcdd56bc25504a
Author: Luke.Z <10...@users.noreply.github.com>
AuthorDate: Fri Jun 10 14:06:20 2022 +0800
support discover shenyu instances from nacos (#9)
* support discover shenyu instances from nacos
* update
---
example/{ => etcd}/nginx.conf | 4 +-
example/{ => nacos}/nginx.conf | 10 +-
lib/shenyu/client.lua | 278 ----------------------------------
lib/shenyu/register/nacos.lua | 233 ++++++++++++++++++++++++++++
rockspec/shenyu-nginx-main-0.rockspec | 2 +-
5 files changed, 240 insertions(+), 287 deletions(-)
diff --git a/example/nginx.conf b/example/etcd/nginx.conf
similarity index 95%
copy from example/nginx.conf
copy to example/etcd/nginx.conf
index 868cfcf..ef2c410 100644
--- a/example/nginx.conf
+++ b/example/etcd/nginx.conf
@@ -30,7 +30,7 @@ http {
register.init({
shenyu_storage = ngx.shared.shenyu_storage,
balancer_type = "chash",
- etcd_base_url = "http://192.168.0.106:2379",
+ etcd_base_url = "http://127.0.0.01:2379",
})
}
@@ -49,5 +49,3 @@ http {
}
}
}
-
-
diff --git a/example/nginx.conf b/example/nacos/nginx.conf
similarity index 84%
rename from example/nginx.conf
rename to example/nacos/nginx.conf
index 868cfcf..d954917 100644
--- a/example/nginx.conf
+++ b/example/nacos/nginx.conf
@@ -26,18 +26,20 @@ http {
lua_shared_dict shenyu_storage 1m;
init_worker_by_lua_block {
- local register = require("shenyu.register.etcd")
+ local register = require("shenyu.register.nacos")
register.init({
shenyu_storage = ngx.shared.shenyu_storage,
balancer_type = "chash",
- etcd_base_url = "http://192.168.0.106:2379",
+ nacos_base_url = "http://127.0.0.1:8848",
+ username = "nacos",
+ password = "naocs",
})
}
upstream shenyu {
server 0.0.0.1;
balancer_by_lua_block {
- require("shenyu.register.etcd").pick_and_set_peer()
+ require("shenyu.register.nacos").pick_and_set_peer()
}
}
@@ -49,5 +51,3 @@ http {
}
}
}
-
-
diff --git a/lib/shenyu/client.lua b/lib/shenyu/client.lua
deleted file mode 100644
index a3759c4..0000000
--- a/lib/shenyu/client.lua
+++ /dev/null
@@ -1,278 +0,0 @@
-local http = require "http"
-local json = require("cjson.safe")
-local encode_base64 = ngx.encode_base64
-local decode_base64 = ngx.decode_base64
-local lrucache = require("resty.lrucache.pureffi")
-
-local ngx_time = ngx.time
-local ngx_timer_at = ngx.timer.at
-local ngx_worker_exiting = ngx.worker.exiting
-
-local re = ngx.re.match
-
-local ngx = ngx
-local log = ngx.log
-local ERR = ngx.ERR
-local WARN = ngx.WARN
-local INFO = ngx.INFO
-
-local _M = {
- start_key = "/shenyu/register/instance/ ",
- end_key = "/shenyu/register/instance/~",
- revision = 0,
-}
-
--- lua_shared_dict shenyu_server_list 1m
--- local _M.server_list = ngx.shared.shenyu_server_list
-
--- conf = {
--- balance_type = "chash",
--- shenyu_server_list = {},
--- etcd_base_url = "http://127.0.0.1:2379",
--- }
-function _M.init(conf)
- if ngx.worker.id ~= 0 then
- return
- end
-
- if conf.balancer_type == "chash" then
- local balancer = require("resty.chash")
- _M.build_upstream_servers = function(servers)
- log(ERR, "not support yet.")
- end
- _M.balancer = balancer
- else
- local balancer = require("resty.roundrobin")
- _M.build_upstream_servers = function(servers)
- balancer:reinit(servers)
- end
- _M.balancer = balancer
- end
-
- -- Start the etcd watcher
- --local ok, err = ngx_timer_at(0, watch)
- --if not ok then
- -- log(ERR, "failed to start watch: " .. err)
- --end
-end
-
-
-function pick_and_set_peer()
- local server = _M.balancer.find()
- balancer.set_current_peer(server)
-end
-
-
-local function parse_base_url(base_url)
- local m, err = re(base_url, [=[([^\/]+):\/\/([\da-zA-Z.-]+|\[[\da-fA-F:]+\]):?(\d+)?(\/)?$]=], "jo")
- if not m then
- log(ERR, err)
- end
-
- local base_url = m[1] .. "://" .. m[2] .. ":" .. m[3]
- return {
- scheme = m[1],
- host = m[2],
- port = tonumber(m[3]),
- base_url = base_url,
- prefix = detect_etcd_version(base_url),
- }
-end
-
-
--- <= 3.2 /v3alpha
--- = 3.3 /v3beta
--- >= 3.4 /v3
-local function detect_etcd_version(base_url)
- local httpc = http.new()
- local res, err = httpc:request_uri(base_url .. "/version")
- if not res then
- log(ERR, "failed to get version from etcd server.", err)
- end
-
- local m
- local response = json.decode(res.body)
- m, err = re(response.etcdcluster, "^(\\d+)\\.(\\d+)\\.(\\d+)$")
- if not m then
- log(ERR, "failed to resolve etcd version.", err)
- end
-
- if tonumber(m[1]) ~= 3 then
- log(ERR, "support etcd v3 only.")
- end
-
- local ver_minor = tonumber(m[2])
- if ver_minor <= 2 then
- return "/v3alpha"
- elseif ver_minor == 3 then
- return "/v3beta"
- else
- return "/v3"
- end
-end
-
-
-local function fetch_shenyu_instances(etcd_conf, shenyu_server_list)
- local server_list = shenyu_server_list
-
- local range_request = {
- key = encode_base64(start_key),
- range_end = encode_base64(end_key),
- }
- local httpc = http.new()
- local res, err = httpc.request_uri(etcd_conf.base_url .. etcd_conf.prefix .. "/kv/range", {
- method = "POST",
- body = json.encode(range_request),
- })
-
- if not res then
- log(ERR, "failed to list shenyu instances from etcd", err)
- end
-
- -- server_list = {
- -- ["host:port"] = {
- -- host,
- -- port,
- -- ...
- -- }
- -- }
- local kvs = json.decode(res.body).kvs
- for _, kv in pairs(kvs) do
- update_revision(kv.mod_revision)
- server_list:set(kv.key, parse_value(kv.value))
- end
-
- build_server_list(server_list)
-end
-
-
-local function update_revision(mode_revision, force)
- if force and revision > mod_revision then
- log(ERR, "failed to update revision because the revision greater than specific")
- return
- end
- revision = mod_revision
-end
-
-
-local function watch(premature, etcd_conf, watching)
- if premature or ngx_worker_exiting() then
- return
- end
-
- local httpc = http.new()
- if not watching then
- local etcd_conf = parse_base_url(conf.etcd_base_url)
-
- _M.server_list = conf.shenyu_server_list
- fetch_shenyu_instances(etcd_conf, server_list)
- return
- end
-
- local ok, err = httpc:connect(etcd_conf.host, etcd_conf.port, {
- scheme = etcd_conf.scheme,
- })
- if not ok then
- -- return nil, "faliled to connect to etcd server", err
- log(ERR, "faliled to connect to etcd server", err)
- end
-
- -- message WatchCreateRequest {
- -- bytes key = 1;
- -- bytes range_end = 2;
- -- int64 start_revision = 3;
- -- bool progress_notify = 4;
- -- enum FilterType {
- -- NOPUT = 0;
- -- NODELETE = 1;
- -- }
- -- repeated FilterType filters = 5;
- -- bool prev_kv = 6;
- -- }
- local request = {
- create_request = {
- key = encode_base64(start_key),
- range_end = encode_base64(end_key),
- start_revision = _M.revision,
- }
- }
-
- local res, err = httpc:request({
- path = "/v3/watch",
- method = "POST",
- body = json.encode(request),
- })
- if not res then
- log(ERR, "failed to watch keys under '/shenyu/register/instance/'", err)
- end
-
- local reader = res.body_reader
- local buffer_size = 8192
-
- repeat
- local buffer, err = reader(buffer_size)
- if err then
- if err == "timeout" then
- ngx.log(ngx.ERROR, "============", err)
- end
- ngx.log(ngx.ERROR, err)
- end
-
- if buffer then
- print(buffer)
- parse_watch_response(buffer)
- end
- until not buffer
-
- local ok, err = ngx_timer_at(1, watch, etcd_conf, true)
- if not ok then
- log(ERR, "faield start watch: ", err)
- end
-end
-
-local function parse_watch_response(response_body)
- -- message WatchResponse {
- -- ResponseHeader header = 1;
- -- int64 watch_id = 2;
- -- bool created = 3;
- -- bool canceled = 4;
- -- int64 compact_revision = 5;
-
- -- repeated mvccpb.Event events = 11;
- -- }
- local response = json.decode(response_body)
- local events = response.events
-
- -- not updated
- if not events then
- return
- end
-
- local server_list = _M.shenyu_server_list
- -- message Event {
- -- enum EventType {
- -- PUT = 0;
- -- DELETE = 1;
- -- }
- -- EventType type = 1;
- -- KeyValue kv = 2;
- -- KeyValue prev_kv = 3;
- -- }
- for _, event in pairs(events) do
- local kv = event.kv
- update_revision(kv.mod_revision, true)
-
- -- event.type: delete
- if event.type == 1 then
- log(INFO, "remove shenyu server instance[" .. kv.key .. "].")
- server_list:delete(kv.key)
- else
- server_list:set(kv.key, 1)
- end
- end
-
- build_upstream_servers(server_list)
-end
-
-
-return _M
diff --git a/lib/shenyu/register/nacos.lua b/lib/shenyu/register/nacos.lua
new file mode 100644
index 0000000..5f6901b
--- /dev/null
+++ b/lib/shenyu/register/nacos.lua
@@ -0,0 +1,233 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local _M = {}
+
+local http = require("resty.http")
+local json = require("cjson.safe")
+local ngx_balancer = require("ngx.balancer")
+
+local balancer = require("shenyu.register.balancer")
+
+local ngx = ngx
+
+local ngx_timer_at = ngx.timer.at
+local ngx_worker_exiting = ngx.worker.exiting
+
+
+local log = ngx.log
+local ERR = ngx.ERR
+local INFO = ngx.INFO
+
+_M.access_token = nil
+
+local function login(username, password)
+ local res, err = httpc:request_uri(nacos_base, {
+ method = "POST",
+ path = "/nacos/v1/auth/login",
+ query = "username=" .. username .. "&password=" .. password,
+ })
+ if not res then
+ return nil, err
+ end
+
+ if res.status >= 400 then
+ return nil, res.body
+ end
+
+ return json.decode(res.body).accessToken
+end
+
+local function get_server_list(serviceName, groupName, namespaceId, clusters)
+ if not namespaceId then
+ namespaceId = ""
+ end
+ if not groupName then
+ groupName = ""
+ end
+ if not clusters then
+ clusters = ""
+ end
+
+ local server_list = {}
+ local res, err = httpc:request_uri(nacos_base, {
+ method = "GET",
+ path = "/nacos/v1/ns/instance/list",
+ query = "serviceName=" .. serviceName ..
+ "&groupName=" .. groupName ..
+ "&namespaceId=" .. namespaceId ..
+ "&clusters=" .. clusters ..
+ "&healthOnly=true",
+ headers = {
+ ["accessToken"] = _M.access_token,
+ }
+ })
+
+ if not res then
+ return nil, nil, err
+ end
+
+ if res.status == 200 then
+ local list_inst_resp = json.encode(res.body)
+
+ local hosts = list_inst_resp.hosts
+ for inst in pairs(hosts) do
+ server_list[inst.instanceId] = inst.weight
+ end
+ server_list["_length_"] = #hosts
+
+ return server_list, list_inst_resp.lastRefTime
+ end
+ return nil, nil, res.body
+end
+
+-- conf = {
+-- balance_type = "chash",
+-- nacos_base_url = "http://127.0.0.1:8848",
+-- username = "nacos",
+-- password = "nacos",
+-- namespace = "",
+-- service_name = "",
+-- group_name = "",
+-- }
+function _M.init(conf)
+ _M.storage = conf.shenyu_storage
+ _M.balancer = balancer.new(conf.balancer_type)
+
+ if ngx.worker.id() == 0 then
+ _M.shenyu_instances = {}
+ _M.nacos_base_url = conf.nacos_base_url
+
+ -- subscribed by polling, privileged
+ local ok, err = ngx_timer_at(0, subscribe)
+ if not ok then
+ log(ERR, "failed to start watch: " .. err)
+ end
+ return
+ end
+
+ -- synchronize server_list from privileged processor to workers
+ local ok, err = ngx_timer_at(2, sync)
+ if not ok then
+ log(ERR, "failed to start sync ", err)
+ end
+end
+
+local function subscribe(premature, initialized)
+ if premature or ngx_worker_exiting() then
+ return
+ end
+
+ if not initialized then
+ local token, err = login(_M.username, _M.password)
+ if not token then
+ log(ERR, err)
+ goto continue
+ end
+ _M.access_token = token
+
+ local server_list, revision, err = get_server_list(_M.service_name, _M.group_name, _M.namespace, _M.clusters)
+ if not server_list then
+ log(ERR, "", err)
+ goto continue
+ end
+ local servers_length = server_list["_length_"]
+ server_list["_length_"] = nil
+ _M.servers_length = servers_length
+
+ _M.balancer:init(server_list)
+ _M.revision = revision
+
+ local server_list_in_json = json.encode(server_list)
+ _M.storage:set("server_list", server_list_in_json)
+ _M.storage:set("revision", revision)
+
+ initialized = false
+ else
+ local server_list, revision, err = get_server_list(_M.service_name, _M.group_name, _M.namespace, _M.clusters)
+ if not server_list then
+ log(ERR, "", err)
+ goto continue
+ end
+
+ local updated = false
+ local servers_length = server_list["_length_"]
+ server_list["_length_"] = nil
+
+ if _M.servers_length == servers_length then
+ local services = _M.server_list
+ for srv, weight in pairs(server_list) do
+ if services[srv] ~= weight then
+ updated = true
+ break
+ end
+ end
+ else
+ updated = true
+ end
+
+ if not updated then
+ goto contiue
+ end
+
+ _M.balancer:reinit(server_list)
+ _M.revision = revision
+
+ local server_list_in_json = json.encode(server_list)
+ _M.storage:set("server_list", server_list_in_json)
+ _M.storage:set("revision", revision)
+ end
+
+ :: continue ::
+ local ok, err = ngx_timer_at(2, subscribe, initialized)
+ if not ok then
+ log(ERR, "failed to subscribe: ", err)
+ end
+ return
+end
+
+local function sync(premature)
+ if premature or ngx_worker_exiting() then
+ return
+ end
+
+ local storage = _M.storage
+ local ver = storage:get("revision")
+
+ if ver > _M.revision then
+ local server_list = storage:get("server_list")
+ local servers = json.decode(server_list)
+ if _M.revision <= 1 then
+ _M.balancer:init(servers)
+ else
+ _M.balancer:reinit(servers)
+ end
+ _M.revision = ver
+ end
+
+ local ok, err = ngx_timer_at(2, sync)
+ if not ok then
+ log(ERR, "failed to start sync ", err)
+ end
+end
+
+function _M.pick_and_set_peer(key)
+ local server = _M.balancer:find(key)
+ ngx_balancer.set_current_peer(server)
+end
+
+return _M
diff --git a/rockspec/shenyu-nginx-main-0.rockspec b/rockspec/shenyu-nginx-main-0.rockspec
index 07f31af..f6ef077 100644
--- a/rockspec/shenyu-nginx-main-0.rockspec
+++ b/rockspec/shenyu-nginx-main-0.rockspec
@@ -21,7 +21,7 @@ build = {
type = "builtin",
modules = {
["shenyu.register.etcd"] = "lib/shenyu/register/etcd.lua",
+ ["shenyu.register.nacos"] = "lib/shenyu/register/nacos.lua",
["shenyu.register.balancer"] = "lib/shenyu/register/balancer.lua",
}
}
-