You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shenyu.apache.org by im...@apache.org on 2022/06/10 06:06:24 UTC

[incubator-shenyu-nginx] branch main updated: support discover shenyu instances from nacos (#9)

This is an automated email from the ASF dual-hosted git repository.

impactcn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-shenyu-nginx.git


The following commit(s) were added to refs/heads/main by this push:
     new 9246251  support discover shenyu instances from nacos (#9)
9246251 is described below

commit 9246251b37a0483b2b2e67f31cbcdd56bc25504a
Author: Luke.Z <10...@users.noreply.github.com>
AuthorDate: Fri Jun 10 14:06:20 2022 +0800

    support discover shenyu instances from nacos (#9)
    
    * support discover shenyu instances from nacos
    
    * update
---
 example/{ => etcd}/nginx.conf         |   4 +-
 example/{ => nacos}/nginx.conf        |  10 +-
 lib/shenyu/client.lua                 | 278 ----------------------------------
 lib/shenyu/register/nacos.lua         | 233 ++++++++++++++++++++++++++++
 rockspec/shenyu-nginx-main-0.rockspec |   2 +-
 5 files changed, 240 insertions(+), 287 deletions(-)

diff --git a/example/nginx.conf b/example/etcd/nginx.conf
similarity index 95%
copy from example/nginx.conf
copy to example/etcd/nginx.conf
index 868cfcf..ef2c410 100644
--- a/example/nginx.conf
+++ b/example/etcd/nginx.conf
@@ -30,7 +30,7 @@ http {
         register.init({
             shenyu_storage = ngx.shared.shenyu_storage,
             balancer_type = "chash",
-            etcd_base_url = "http://192.168.0.106:2379",
+            etcd_base_url = "http://127.0.0.01:2379",
         })
     }
 
@@ -49,5 +49,3 @@ http {
         }
     }
 }
-
-	
diff --git a/example/nginx.conf b/example/nacos/nginx.conf
similarity index 84%
rename from example/nginx.conf
rename to example/nacos/nginx.conf
index 868cfcf..d954917 100644
--- a/example/nginx.conf
+++ b/example/nacos/nginx.conf
@@ -26,18 +26,20 @@ http {
     lua_shared_dict shenyu_storage 1m;
 
     init_worker_by_lua_block {
-        local register = require("shenyu.register.etcd")
+        local register = require("shenyu.register.nacos")
         register.init({
             shenyu_storage = ngx.shared.shenyu_storage,
             balancer_type = "chash",
-            etcd_base_url = "http://192.168.0.106:2379",
+            nacos_base_url = "http://127.0.0.1:8848",
+            username = "nacos",
+            password = "naocs",
         })
     }
 
     upstream shenyu {
         server 0.0.0.1;
         balancer_by_lua_block {
-            require("shenyu.register.etcd").pick_and_set_peer()
+            require("shenyu.register.nacos").pick_and_set_peer()
         }
     }
 
@@ -49,5 +51,3 @@ http {
         }
     }
 }
-
-	
diff --git a/lib/shenyu/client.lua b/lib/shenyu/client.lua
deleted file mode 100644
index a3759c4..0000000
--- a/lib/shenyu/client.lua
+++ /dev/null
@@ -1,278 +0,0 @@
-local http = require "http"
-local json = require("cjson.safe")
-local encode_base64 = ngx.encode_base64
-local decode_base64 = ngx.decode_base64
-local lrucache = require("resty.lrucache.pureffi")
-
-local ngx_time = ngx.time
-local ngx_timer_at = ngx.timer.at
-local ngx_worker_exiting = ngx.worker.exiting
-
-local re = ngx.re.match
-
-local ngx = ngx
-local log = ngx.log
-local ERR = ngx.ERR
-local WARN = ngx.WARN
-local INFO = ngx.INFO
-
-local _M = {
-  start_key = "/shenyu/register/instance/ ",
-  end_key = "/shenyu/register/instance/~",
-  revision = 0,
-}
-
--- lua_shared_dict shenyu_server_list 1m
--- local _M.server_list = ngx.shared.shenyu_server_list
-
--- conf = {
---   balance_type = "chash",
---   shenyu_server_list = {},
---   etcd_base_url = "http://127.0.0.1:2379",
--- }
-function _M.init(conf)
-  if ngx.worker.id ~= 0 then
-    return
-  end
-
-  if conf.balancer_type == "chash" then
-    local balancer = require("resty.chash")
-    _M.build_upstream_servers = function(servers)
-      log(ERR, "not support yet.")
-    end
-    _M.balancer = balancer
-  else
-    local balancer = require("resty.roundrobin")
-    _M.build_upstream_servers = function(servers)
-      balancer:reinit(servers)
-    end
-    _M.balancer = balancer
-  end
-
-  -- Start the etcd watcher
-  --local ok, err = ngx_timer_at(0, watch)
-  --if not ok then
-  --    log(ERR, "failed to start watch: " .. err)
-  --end
-end
-
-
-function pick_and_set_peer()
-  local server = _M.balancer.find()
-  balancer.set_current_peer(server)
-end
-
-
-local function parse_base_url(base_url)
-  local m, err = re(base_url, [=[([^\/]+):\/\/([\da-zA-Z.-]+|\[[\da-fA-F:]+\]):?(\d+)?(\/)?$]=], "jo")
-  if not m then
-    log(ERR, err)
-  end
-
-  local base_url = m[1] .. "://" .. m[2] .. ":" .. m[3]
-  return {
-    scheme = m[1],
-    host = m[2],
-    port = tonumber(m[3]),
-    base_url = base_url,
-    prefix = detect_etcd_version(base_url),
-  }
-end
-
-
--- <= 3.2 /v3alpha
--- = 3.3 /v3beta
--- >= 3.4 /v3
-local function detect_etcd_version(base_url)
-  local httpc = http.new()
-  local res, err = httpc:request_uri(base_url .. "/version")
-  if not res then
-    log(ERR, "failed to get version from etcd server.", err)
-  end
-
-  local m
-  local response = json.decode(res.body)
-  m, err = re(response.etcdcluster, "^(\\d+)\\.(\\d+)\\.(\\d+)$")
-  if not m then
-    log(ERR, "failed to resolve etcd version.", err)
-  end
-
-  if tonumber(m[1]) ~= 3 then
-    log(ERR, "support etcd v3 only.")
-  end
-
-  local ver_minor = tonumber(m[2])
-  if ver_minor <= 2 then
-    return "/v3alpha"
-  elseif ver_minor == 3 then
-    return "/v3beta"
-  else
-    return "/v3"
-  end
-end
-
-
-local function fetch_shenyu_instances(etcd_conf, shenyu_server_list)
-  local server_list = shenyu_server_list
-
-  local range_request = {
-    key = encode_base64(start_key),
-    range_end = encode_base64(end_key),
-  }
-  local httpc = http.new()
-  local res, err = httpc.request_uri(etcd_conf.base_url .. etcd_conf.prefix .. "/kv/range", {
-    method = "POST",
-    body = json.encode(range_request),
-  })
-
-  if not res then
-    log(ERR, "failed to list shenyu instances from etcd", err)
-  end
-
-  -- server_list = {
-  --   ["host:port"] = {
-  --     host,
-  --     port,
-  --     ...
-  --   }
-  -- }
-  local kvs = json.decode(res.body).kvs
-  for _, kv in pairs(kvs) do
-    update_revision(kv.mod_revision)
-    server_list:set(kv.key, parse_value(kv.value))
-  end
-
-  build_server_list(server_list)
-end
-
-
-local function update_revision(mode_revision, force)
-  if force and revision > mod_revision then
-    log(ERR, "failed to update revision because the revision greater than specific")
-    return
-  end
-  revision = mod_revision
-end
-
-
-local function watch(premature, etcd_conf, watching)
-  if premature or ngx_worker_exiting() then
-    return
-  end
-
-  local httpc = http.new()
-  if not watching then
-    local etcd_conf = parse_base_url(conf.etcd_base_url)
-
-    _M.server_list = conf.shenyu_server_list
-    fetch_shenyu_instances(etcd_conf, server_list)
-    return
-  end
-
-  local ok, err = httpc:connect(etcd_conf.host, etcd_conf.port, {
-    scheme = etcd_conf.scheme,
-  })
-  if not ok then
-    -- return nil, "faliled to connect to etcd server", err
-    log(ERR, "faliled to connect to etcd server", err)
-  end
-
-  -- message WatchCreateRequest {
-  --   bytes key = 1;
-  --   bytes range_end = 2;
-  --   int64 start_revision = 3;
-  --   bool progress_notify = 4;
-  --   enum FilterType {
-  --     NOPUT = 0;
-  --     NODELETE = 1;
-  --   }
-  --   repeated FilterType filters = 5;
-  --   bool prev_kv = 6;
-  -- }
-  local request = {
-    create_request = {
-      key = encode_base64(start_key),
-      range_end = encode_base64(end_key),
-      start_revision = _M.revision,
-    }
-  }
-
-  local res, err = httpc:request({
-    path = "/v3/watch",
-    method = "POST",
-    body = json.encode(request),
-  })
-  if not res then
-    log(ERR, "failed to watch keys under '/shenyu/register/instance/'", err)
-  end
-
-  local reader = res.body_reader
-  local buffer_size = 8192
-
-  repeat
-  	local buffer, err = reader(buffer_size)
-  	if err then
-      if err == "timeout" then
-        ngx.log(ngx.ERROR, "============", err)
-      end
-      ngx.log(ngx.ERROR, err)
-  	end
-
-  	if buffer then
-      print(buffer)
-      parse_watch_response(buffer)
-    end
-  until not buffer
-
-  local ok, err = ngx_timer_at(1, watch, etcd_conf, true)
-  if not ok then
-      log(ERR, "faield start watch: ", err)
-  end
-end
-
-local function parse_watch_response(response_body)
-  -- message WatchResponse {
-  --   ResponseHeader header = 1;
-  --   int64 watch_id = 2;
-  --   bool created = 3;
-  --   bool canceled = 4;
-  --   int64 compact_revision = 5;
-
-  --   repeated mvccpb.Event events = 11;
-  -- }
-  local response = json.decode(response_body)
-  local events = response.events
-
-  -- not updated
-  if not events then
-    return
-  end
-
-  local server_list = _M.shenyu_server_list
-  -- message Event {
-  --   enum EventType {
-  --     PUT = 0;
-  --     DELETE = 1;
-  --   }
-  --   EventType type = 1;
-  --   KeyValue kv = 2;
-  --   KeyValue prev_kv = 3;
-  -- }
-  for _, event in pairs(events) do
-    local kv = event.kv
-    update_revision(kv.mod_revision, true)
-
-    -- event.type: delete
-    if event.type == 1 then
-      log(INFO, "remove shenyu server instance[" .. kv.key .. "].")
-      server_list:delete(kv.key)
-    else
-      server_list:set(kv.key, 1)
-    end
-  end
-
-  build_upstream_servers(server_list)
-end
-
-
-return _M
diff --git a/lib/shenyu/register/nacos.lua b/lib/shenyu/register/nacos.lua
new file mode 100644
index 0000000..5f6901b
--- /dev/null
+++ b/lib/shenyu/register/nacos.lua
@@ -0,0 +1,233 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--    http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local _M = {}
+
+local http = require("resty.http")
+local json = require("cjson.safe")
+local ngx_balancer = require("ngx.balancer")
+
+local balancer = require("shenyu.register.balancer")
+
+local ngx = ngx
+
+local ngx_timer_at = ngx.timer.at
+local ngx_worker_exiting = ngx.worker.exiting
+
+
+local log = ngx.log
+local ERR = ngx.ERR
+local INFO = ngx.INFO
+
+_M.access_token = nil
+
+local function login(username, password)
+    local res, err = httpc:request_uri(nacos_base, {
+        method = "POST",
+        path = "/nacos/v1/auth/login",
+        query = "username=" .. username .. "&password=" .. password,
+    })
+    if not res then
+        return nil, err
+    end
+
+    if res.status >= 400 then
+        return nil, res.body
+    end
+
+    return json.decode(res.body).accessToken
+end
+
+local function get_server_list(serviceName, groupName, namespaceId, clusters)
+    if not namespaceId then
+        namespaceId = ""
+    end
+    if not groupName then
+        groupName = ""
+    end
+    if not clusters then
+        clusters = ""
+    end
+
+    local server_list = {}
+    local res, err = httpc:request_uri(nacos_base, {
+        method = "GET",
+        path = "/nacos/v1/ns/instance/list",
+        query = "serviceName=" .. serviceName ..
+                "&groupName=" .. groupName ..
+                "&namespaceId=" .. namespaceId ..
+                "&clusters=" .. clusters ..
+                "&healthOnly=true",
+        headers = {
+            ["accessToken"] = _M.access_token,
+        }
+    })
+
+    if not res then
+        return nil, nil, err
+    end
+
+    if res.status == 200 then
+        local list_inst_resp = json.encode(res.body)
+
+        local hosts = list_inst_resp.hosts
+        for inst in pairs(hosts) do
+            server_list[inst.instanceId] = inst.weight
+        end
+        server_list["_length_"] = #hosts
+
+        return server_list, list_inst_resp.lastRefTime
+    end
+    return nil, nil, res.body
+end
+
+-- conf = {
+--   balance_type = "chash",
+--   nacos_base_url = "http://127.0.0.1:8848",
+--   username = "nacos",
+--   password = "nacos",
+--   namespace = "",
+--   service_name = "",
+--   group_name = "",
+-- }
+function _M.init(conf)
+    _M.storage = conf.shenyu_storage
+    _M.balancer = balancer.new(conf.balancer_type)
+
+    if ngx.worker.id() == 0 then
+        _M.shenyu_instances = {}
+        _M.nacos_base_url = conf.nacos_base_url
+
+        -- subscribed by polling, privileged
+        local ok, err = ngx_timer_at(0, subscribe)
+        if not ok then
+            log(ERR, "failed to start watch: " .. err)
+        end
+        return
+    end
+
+    -- synchronize server_list from privileged processor to workers
+    local ok, err = ngx_timer_at(2, sync)
+    if not ok then
+        log(ERR, "failed to start sync ", err)
+    end
+end
+
+local function subscribe(premature, initialized)
+    if premature or ngx_worker_exiting() then
+        return
+    end
+
+    if not initialized then
+        local token, err = login(_M.username, _M.password)
+        if not token then
+            log(ERR, err)
+            goto continue
+        end
+        _M.access_token = token
+
+        local server_list, revision, err = get_server_list(_M.service_name, _M.group_name, _M.namespace, _M.clusters)
+        if not server_list then
+            log(ERR, "", err)
+            goto continue
+        end
+        local servers_length = server_list["_length_"]
+        server_list["_length_"] = nil
+        _M.servers_length = servers_length
+
+        _M.balancer:init(server_list)
+        _M.revision = revision
+
+        local server_list_in_json = json.encode(server_list)
+        _M.storage:set("server_list", server_list_in_json)
+        _M.storage:set("revision", revision)
+
+        initialized = false
+    else
+        local server_list, revision, err = get_server_list(_M.service_name, _M.group_name, _M.namespace, _M.clusters)
+        if not server_list then
+            log(ERR, "", err)
+            goto continue
+        end
+
+        local updated = false
+        local servers_length = server_list["_length_"]
+        server_list["_length_"] = nil
+
+        if _M.servers_length == servers_length then
+            local services = _M.server_list
+            for srv, weight in pairs(server_list) do
+                if services[srv] ~= weight then
+                    updated = true
+                    break
+                end
+            end
+        else
+            updated = true
+        end
+
+        if not updated then
+            goto contiue
+        end
+
+        _M.balancer:reinit(server_list)
+        _M.revision = revision
+
+        local server_list_in_json = json.encode(server_list)
+        _M.storage:set("server_list", server_list_in_json)
+        _M.storage:set("revision", revision)
+    end
+
+    :: continue ::
+    local ok, err = ngx_timer_at(2, subscribe, initialized)
+    if not ok then
+        log(ERR, "failed to subscribe: ", err)
+    end
+    return
+end
+
+local function sync(premature)
+    if premature or ngx_worker_exiting() then
+        return
+    end
+
+    local storage = _M.storage
+    local ver = storage:get("revision")
+
+    if ver > _M.revision then
+        local server_list = storage:get("server_list")
+        local servers = json.decode(server_list)
+        if _M.revision <= 1 then
+            _M.balancer:init(servers)
+        else
+            _M.balancer:reinit(servers)
+        end
+        _M.revision = ver
+    end
+
+    local ok, err = ngx_timer_at(2, sync)
+    if not ok then
+        log(ERR, "failed to start sync ", err)
+    end
+end
+
+function _M.pick_and_set_peer(key)
+    local server = _M.balancer:find(key)
+    ngx_balancer.set_current_peer(server)
+end
+
+return _M
diff --git a/rockspec/shenyu-nginx-main-0.rockspec b/rockspec/shenyu-nginx-main-0.rockspec
index 07f31af..f6ef077 100644
--- a/rockspec/shenyu-nginx-main-0.rockspec
+++ b/rockspec/shenyu-nginx-main-0.rockspec
@@ -21,7 +21,7 @@ build = {
    type = "builtin",
    modules = {
       ["shenyu.register.etcd"] = "lib/shenyu/register/etcd.lua",
+      ["shenyu.register.nacos"] = "lib/shenyu/register/nacos.lua",
       ["shenyu.register.balancer"] = "lib/shenyu/register/balancer.lua",
    }
 }
-