You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shenyu.apache.org by im...@apache.org on 2022/07/02 09:26:36 UTC
[incubator-shenyu-nginx] branch main updated: 1. commit zookeeper connection (#10)
This is an automated email from the ASF dual-hosted git repository.
impactcn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-shenyu-nginx.git
The following commit(s) were added to refs/heads/main by this push:
new eb1bbc8 1. commit zookeeper connection (#10)
eb1bbc8 is described below
commit eb1bbc8a69f30ad22d1e4e7a97e0bfd40ba9b9e2
Author: Sixh-PrFor <ch...@163.com>
AuthorDate: Sat Jul 2 17:26:33 2022 +0800
1. commit zookeeper connection (#10)
* 1. commit zookeeper connection
* Increase the processing of heartbeats
* 1. Send a request to subscribe
* 1. zookeeper watch event...
* 1. zookeeper watch event...
* 1. update zookeeper proto.
* 1. update zookeeper proto.
* update zookeeper proto.
* update zookeeper proto.
* Modify Chinese
* Modify Chinese
* Modify Chinese
---
.gitignore | 3 +
example/zookeeper/nginx.conf | 55 +++++++
lib/shenyu/register/balancer.lua | 2 -
lib/shenyu/register/core/string.lua | 31 ++++
lib/shenyu/register/core/struct.lua | 209 +++++++++++++++++++++++++++
lib/shenyu/register/core/utils.lua | 39 +++++
lib/shenyu/register/etcd.lua | 46 +++---
lib/shenyu/register/zookeeper.lua | 95 ++++++++++++
lib/shenyu/register/zookeeper/connection.lua | 127 ++++++++++++++++
lib/shenyu/register/zookeeper/zk_client.lua | 186 ++++++++++++++++++++++++
lib/shenyu/register/zookeeper/zk_cluster.lua | 86 +++++++++++
lib/shenyu/register/zookeeper/zk_const.lua | 85 +++++++++++
lib/shenyu/register/zookeeper/zk_proto.lua | 157 ++++++++++++++++++++
rockspec/shenyu-nginx-main-0.rockspec | 10 ++
14 files changed, 1111 insertions(+), 20 deletions(-)
diff --git a/.gitignore b/.gitignore
index 276ba56..af1c99a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -29,3 +29,6 @@ logs/
*.diff
*.patch
*.tmp
+
+/out/
+/.vscode/
diff --git a/example/zookeeper/nginx.conf b/example/zookeeper/nginx.conf
new file mode 100644
index 0000000..a53b238
--- /dev/null
+++ b/example/zookeeper/nginx.conf
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+worker_processes 2;
+daemon off;
+error_log /dev/stdout debug;
+
+events {
+ worker_connections 1024;
+}
+http {
+ lua_shared_dict shenyu_storage 1m;
+
+# lua_package_path "$prefix/lib/?.lua;;";
+
+ init_worker_by_lua_block {
+ local register = require("shenyu.register.zookeeper")
+ register.init({
+ servers = {"127.0.0.1:2181"},
+ shenyu_storage = ngx.shared.shenyu_storage,
+ balancer_type = "chash"
+ });
+ }
+
+ upstream shenyu {
+ server 0.0.0.1;
+ balancer_by_lua_block {
+ require("shenyu.register.zookeeper").pick_and_set_peer()
+ }
+ }
+
+ server {
+ listen 80;
+
+ location ~ /* {
+ proxy_pass http://shenyu;
+ }
+ }
+}
+
+
diff --git a/lib/shenyu/register/balancer.lua b/lib/shenyu/register/balancer.lua
index 39d37ad..5407bc8 100644
--- a/lib/shenyu/register/balancer.lua
+++ b/lib/shenyu/register/balancer.lua
@@ -14,7 +14,6 @@
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
-
local _M = {}
local str_null = string.char(0)
@@ -25,7 +24,6 @@ function _M.new(balancer_type)
local servers, nodes = {}, {}
for serv, weight in pairs(server_list) do
local id = string.gsub(serv, ":", str_null)
-
servers[id] = serv
nodes[id] = weight
end
diff --git a/lib/shenyu/register/core/string.lua b/lib/shenyu/register/core/string.lua
new file mode 100644
index 0000000..089b4b5
--- /dev/null
+++ b/lib/shenyu/register/core/string.lua
@@ -0,0 +1,31 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an"AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local table_insert = table.insert
+local _M ={}
+
+
+function _M.split(str, delimiter)
+ if not str or str == "" then return {} end
+ if not delimiter or delimiter == "" then return { str } end
+ local result = {}
+ for match in (str .. delimiter):gmatch("(.-)" .. delimiter) do
+ table_insert(result, match)
+ end
+ return result
+end
+
+return _M
\ No newline at end of file
diff --git a/lib/shenyu/register/core/struct.lua b/lib/shenyu/register/core/struct.lua
new file mode 100644
index 0000000..d26d350
--- /dev/null
+++ b/lib/shenyu/register/core/struct.lua
@@ -0,0 +1,209 @@
+--[[
+ * Copyright (c) 2015-2020 Iryont <https://github.com/iryont/lua-struct>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+]]
+local unpack = table.unpack or _G.unpack
+local struct = {}
+
+function struct.pack(format, ...)
+ local stream = {}
+ local vars = {...}
+ local endianness = true
+
+ for i = 1, format:len() do
+ local opt = format:sub(i, i)
+
+ if opt == "<" then
+ endianness = true
+ elseif opt == ">" then
+ endianness = false
+ elseif opt:find("[bBhHiIlL]") then
+ local n = opt:find("[hH]") and 2 or opt:find("[iI]") and 4 or opt:find("[lL]") and 8 or 1
+ local val = tonumber(table.remove(vars, 1))
+
+ local bytes = {}
+ for j = 1, n do
+ table.insert(bytes, string.char(val % (2 ^ 8)))
+ val = math.floor(val / (2 ^ 8))
+ end
+
+ if not endianness then
+ table.insert(stream, string.reverse(table.concat(bytes)))
+ else
+ table.insert(stream, table.concat(bytes))
+ end
+ elseif opt:find("[fd]") then
+ local val = tonumber(table.remove(vars, 1))
+ local sign = 0
+
+ if val < 0 then
+ sign = 1
+ val = -val
+ end
+
+ local mantissa, exponent = math.frexp(val)
+ if val == 0 then
+ mantissa = 0
+ exponent = 0
+ else
+ mantissa = (mantissa * 2 - 1) * math.ldexp(0.5, (opt == "d") and 53 or 24)
+ exponent = exponent + ((opt == "d") and 1022 or 126)
+ end
+
+ local bytes = {}
+ if opt == "d" then
+ val = mantissa
+ for i = 1, 6 do
+ table.insert(bytes, string.char(math.floor(val) % (2 ^ 8)))
+ val = math.floor(val / (2 ^ 8))
+ end
+ else
+ table.insert(bytes, string.char(math.floor(mantissa) % (2 ^ 8)))
+ val = math.floor(mantissa / (2 ^ 8))
+ table.insert(bytes, string.char(math.floor(val) % (2 ^ 8)))
+ val = math.floor(val / (2 ^ 8))
+ end
+
+ table.insert(bytes, string.char(math.floor(exponent * ((opt == "d") and 16 or 128) + val) % (2 ^ 8)))
+ val = math.floor((exponent * ((opt == "d") and 16 or 128) + val) / (2 ^ 8))
+ table.insert(bytes, string.char(math.floor(sign * 128 + val) % (2 ^ 8)))
+ val = math.floor((sign * 128 + val) / (2 ^ 8))
+
+ if not endianness then
+ table.insert(stream, string.reverse(table.concat(bytes)))
+ else
+ table.insert(stream, table.concat(bytes))
+ end
+ elseif opt == "s" then
+ table.insert(stream, tostring(table.remove(vars, 1)))
+ table.insert(stream, string.char(0))
+ elseif opt == "c" then
+ local n = format:sub(i + 1):match("%d+")
+ local str = tostring(table.remove(vars, 1))
+ local len = tonumber(n)
+ if len <= 0 then
+ len = str:len()
+ end
+ if len - str:len() > 0 then
+ str = str .. string.rep(" ", len - str:len())
+ end
+ table.insert(stream, str:sub(1, len))
+ i = i + n:len()
+ end
+ end
+
+ return table.concat(stream)
+end
+
+function struct.unpack(format, stream, pos)
+ local vars = {}
+ local iterator = pos or 1
+ local endianness = true
+
+ for i = 1, format:len() do
+ local opt = format:sub(i, i)
+
+ if opt == "<" then
+ endianness = true
+ elseif opt == ">" then
+ endianness = false
+ elseif opt:find("[bBhHiIlL]") then
+ local n = opt:find("[hH]") and 2 or opt:find("[iI]") and 4 or opt:find("[lL]") and 8 or 1
+ local signed = opt:lower() == opt
+
+ local val = 0
+ for j = 1, n do
+ local byte = string.byte(stream:sub(iterator, iterator))
+ if endianness then
+ val = val + byte * (2 ^ ((j - 1) * 8))
+ else
+ val = val + byte * (2 ^ ((n - j) * 8))
+ end
+ iterator = iterator + 1
+ end
+
+ if signed and val >= 2 ^ (n * 8 - 1) then
+ val = val - 2 ^ (n * 8)
+ end
+
+ table.insert(vars, math.floor(val))
+ elseif opt:find("[fd]") then
+ local n = (opt == "d") and 8 or 4
+ local x = stream:sub(iterator, iterator + n - 1)
+ iterator = iterator + n
+
+ if not endianness then
+ x = string.reverse(x)
+ end
+
+ local sign = 1
+ local mantissa = string.byte(x, (opt == "d") and 7 or 3) % ((opt == "d") and 16 or 128)
+ for i = n - 2, 1, -1 do
+ mantissa = mantissa * (2 ^ 8) + string.byte(x, i)
+ end
+
+ if string.byte(x, n) > 127 then
+ sign = -1
+ end
+
+ local exponent =
+ (string.byte(x, n) % 128) * ((opt == "d") and 16 or 2) +
+ math.floor(string.byte(x, n - 1) / ((opt == "d") and 16 or 128))
+ if exponent == 0 then
+ table.insert(vars, 0.0)
+ else
+ mantissa = (math.ldexp(mantissa, (opt == "d") and -52 or -23) + 1) * sign
+ table.insert(vars, math.ldexp(mantissa, exponent - ((opt == "d") and 1023 or 127)))
+ end
+ elseif opt == "s" then
+ local bytes = {}
+ for j = iterator, stream:len() do
+ if stream:sub(j, j) == string.char(0) or stream:sub(j) == "" then
+ break
+ end
+
+ table.insert(bytes, stream:sub(j, j))
+ end
+
+ local str = table.concat(bytes)
+ iterator = iterator + str:len() + 1
+ table.insert(vars, str)
+ elseif opt == "c" then
+ local n = format:sub(i + 1):match("%d+")
+ local len = tonumber(n)
+ if len <= 0 then
+ len = table.remove(vars)
+ end
+
+ table.insert(vars, stream:sub(iterator, iterator + len - 1))
+ iterator = iterator + len
+ i = i + n:len()
+ end
+ end
+
+ return vars, iterator
+end
+
+function struct.tbunpack(vars)
+ -- body
+ return unpack(vars)
+end
+
+return struct
diff --git a/lib/shenyu/register/core/utils.lua b/lib/shenyu/register/core/utils.lua
new file mode 100644
index 0000000..5f74206
--- /dev/null
+++ b/lib/shenyu/register/core/utils.lua
@@ -0,0 +1,39 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an"AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local str = require("shenyu.register.core.string")
+local _M = {}
+
+--Delimited string
+function _M.paras_host(host, delimiter)
+ return str.split(host, delimiter)
+end
+
+function _M.long_to_hex_string(long)
+ return string.format("0X%06X", long)
+end
+
+-- table len
+function _M.table_len(args)
+ -- body
+ local n = 0
+ if args then
+ n = #args
+ end
+ return n
+end
+
+return _M
diff --git a/lib/shenyu/register/etcd.lua b/lib/shenyu/register/etcd.lua
index 4fd7c4f..da20d13 100644
--- a/lib/shenyu/register/etcd.lua
+++ b/lib/shenyu/register/etcd.lua
@@ -83,7 +83,7 @@ local function parse_base_url(base_url)
host = m[2],
port = tonumber(m[3]),
base_url = base_url,
- prefix = detect_etcd_version(base_url),
+ prefix = detect_etcd_version(base_url)
}
end
@@ -95,14 +95,18 @@ end
local function fetch_shenyu_instances(conf)
local range_request = {
key = encode_base64(_M.start_key),
- range_end = encode_base64(_M.end_key),
+ range_end = encode_base64(_M.end_key)
}
local httpc = http.new()
- local res, err = httpc:request_uri(conf.base_url .. conf.prefix .. "/kv/range", {
- method = "POST",
- body = json.encode(range_request),
- })
+ local res, err =
+ httpc:request_uri(
+ conf.base_url .. conf.prefix .. "/kv/range",
+ {
+ method = "POST",
+ body = json.encode(range_request)
+ }
+ )
if not res then
return nil, "failed to list shenyu instances from etcd, " .. (err or "unknown")
end
@@ -267,11 +271,14 @@ local function watch(premature, watching)
else
local conf = _M.etcd_conf
local httpc = http.new()
- local ok, err = httpc:connect({
- scheme = conf.scheme,
- host = conf.host,
- port = tonumber(conf.port),
- })
+ local ok, err =
+ httpc:connect(
+ {
+ scheme = conf.scheme,
+ host = conf.host,
+ port = tonumber(conf.port)
+ }
+ )
if not ok then
log(ERR, "failed to connect to etcd server", err)
_M.time_at = 3
@@ -292,15 +299,18 @@ local function watch(premature, watching)
create_request = {
key = encode_base64(_M.start_key),
range_end = encode_base64(_M.end_key),
- start_revision = _M.revision,
+ start_revision = _M.revision
}
}
- local res, err = httpc:request({
- path = "/v3/watch",
- method = "POST",
- body = json.encode(request),
- })
+ local res, err =
+ httpc:request(
+ {
+ path = "/v3/watch",
+ method = "POST",
+ body = json.encode(request)
+ }
+ )
if not res then
log(ERR, "failed to watch keys under '/shenyu/register/instance/'", err)
_M.time_at = 3
@@ -330,7 +340,7 @@ local function watch(premature, watching)
end
end
- :: continue ::
+ ::continue::
local ok, err = ngx_timer_at(_M.time_at, watch, watching)
if not ok then
log(ERR, "failed to start watch: ", err)
diff --git a/lib/shenyu/register/zookeeper.lua b/lib/shenyu/register/zookeeper.lua
new file mode 100644
index 0000000..b7a3518
--- /dev/null
+++ b/lib/shenyu/register/zookeeper.lua
@@ -0,0 +1,95 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an"AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local zk_cluster = require("shenyu.register.zookeeper.zk_cluster")
+local ngx_balancer = require("ngx.balancer")
+local balancer = require("shenyu.register.balancer")
+local const = require("shenyu.register.zookeeper.zk_const")
+local ngx_timer_at = ngx.timer.at
+local xpcall = xpcall
+local ngx_log = ngx.log
+local math = string.match
+local zc
+local _M = {
+ isload = 0,
+ nodes = {}
+}
+
+local function watch_data(data)
+ local server_lists = {}
+ -- body
+ for index, value in ipairs(data) do
+ if math(value, ":") then
+ server_lists[value] = 1
+ end
+ end
+ local s_nodes = _M.nodes
+ for host, index in pairs(server_lists) do
+ if not s_nodes[host] then
+ ngx_log(ngx.INFO, "add shenyu server:" .. host)
+ end
+ end
+ for host, index in pairs(s_nodes) do
+ if not server_lists[host] then
+ ngx_log(ngx.INFO, "remove shenyu server:" .. host)
+ end
+ end
+ if (_M.isload > 1) then
+ _M.balancer:reinit(server_lists)
+ else
+ _M.balancer:init(server_lists)
+ end
+ _M.isload = _M.isload + 1
+ _M.nodes = server_lists
+end
+
+local function watch(premature, path)
+ local ok, err = zc:connect()
+ if ok then
+ ok, err =
+ xpcall(
+ zc:add_watch(path, watch_data),
+ function(err)
+ ngx_log(ngx.ERR, "zookeeper start watch error..." .. tostring(err))
+ end
+ )
+ end
+ return ok, err
+end
+
+function _M.init(config)
+ _M.storage = config.shenyu_storage
+ _M.balancer = balancer.new(config.balancer_type)
+ zc = zk_cluster:new(config)
+ if ngx.worker.id() == 0 then
+ -- Start the zookeeper watcher
+ local ok, err = ngx_timer_at(2, watch, const.ZK_WATCH_PATH)
+ if not ok then
+ ngx_log(ngx.ERR, "failed to start watch: " .. err)
+ end
+ return
+ end
+end
+
+function _M.pick_and_set_peer(key)
+ local server = _M.balancer:find(key)
+ if not server then
+ ngx_log(ngx.ERR, "not find shenyu server..")
+ return
+ end
+ ngx_balancer.set_current_peer(server)
+end
+return _M
diff --git a/lib/shenyu/register/zookeeper/connection.lua b/lib/shenyu/register/zookeeper/connection.lua
new file mode 100644
index 0000000..4076f91
--- /dev/null
+++ b/lib/shenyu/register/zookeeper/connection.lua
@@ -0,0 +1,127 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an"AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local proto = require("shenyu.register.zookeeper.zk_proto")
+local struct = require("shenyu.register.core.struct")
+local tcp = ngx.socket.tcp
+local pack = struct.pack
+local unpack = struct.unpack
+local tbunpack = struct.tbunpack
+local ngx_log = ngx.log
+local _timeout = 60 * 1000
+local _M = {}
+local mt = {__index = _M}
+
+function _M.new(self)
+ local sock, err = tcp()
+ if not tcp then
+ return nil, err
+ end
+ return setmetatable({sock = sock, timeout = _timeout}, mt)
+end
+
+function _M.connect(self, ip, port)
+ -- body
+ local sock = self.sock
+ if not sock then
+ return nil, "not initialized tcp."
+ end
+ local ok, err = sock:connect(ip, port)
+ if not ok then
+ ngx_log(ngx.DEBUG, "connect host:" .. ip .. err)
+ return ok, err
+ end
+ return ok, nil
+end
+
+function _M.write(self, req)
+ -- body
+ local sock = self.sock
+ if not sock then
+ return nil, "not initialized tpc."
+ end
+ return sock:send(req)
+end
+
+function _M.read(self, len)
+ -- body
+ local sock = self.sock
+ if not sock then
+ return nil, "not initialized tpc."
+ end
+ return sock:receive(len)
+end
+
+function _M.read_len(self)
+ local b, err = self:read(4)
+ if not b then
+ return nil, "error"
+ end
+ local len = tbunpack(unpack(">i", b))
+ return len
+end
+
+function _M.read_headler(self)
+ local len = self:read_len()
+ local b, err = self:read(len)
+ if not b then
+ return nil, "error"
+ end
+ local h, end_index = proto.reply_header:unpack(b, 1)
+ if not h then
+ return nil, nil, 0
+ end
+ return h, b, end_index
+end
+
+function _M.close(self)
+ -- body
+ local sock = self.sock
+ if not sock then
+ return nil, "not initialized tpc."
+ end
+ sock.close()
+end
+
+function _M.set_timeout(self, timeout)
+ -- body
+ local sock = self.sock
+ if not sock then
+ return nil, "not initialized"
+ end
+ sock:settimeout(timeout)
+end
+
+function _M.set_keepalive(self, ...)
+ local sock = self.sock
+ if not sock then
+ return nil, "not initialized"
+ end
+
+ return sock:setkeepalive(...)
+end
+
+function _M.set_timeouts(self, connect_timeout, send_timeout, read_timeout)
+ local sock = self.sock
+ if not sock then
+ error("not initialized", 2)
+ return
+ end
+
+ sock:settimeouts(connect_timeout, send_timeout, read_timeout)
+end
+
+return _M
diff --git a/lib/shenyu/register/zookeeper/zk_client.lua b/lib/shenyu/register/zookeeper/zk_client.lua
new file mode 100644
index 0000000..ceb490b
--- /dev/null
+++ b/lib/shenyu/register/zookeeper/zk_client.lua
@@ -0,0 +1,186 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an"AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local util = require("shenyu.register.core.utils")
+local const = require("shenyu.register.zookeeper.zk_const")
+local proto = require("shenyu.register.zookeeper.zk_proto")
+local connection = require("shenyu.register.zookeeper.connection")
+local ngx_log = ngx.log
+local now = ngx.now
+local exiting = ngx.worker.exiting
+local sleep = ngx.sleep
+local strlen = string.len
+local _timeout = 60 * 1000
+local _M = {}
+local mt = {__index = _M}
+
+function _M.new(self)
+ local conn_, err = connection:new()
+ if not conn_ then
+ return nil, "initialized connection error" .. err
+ end
+ conn_:set_timeout(_timeout)
+ conn_:set_keepalive()
+ return setmetatable({conn = conn_}, mt)
+end
+
+function _M.connect(self, host)
+ -- body
+ local conn = self.conn
+ local iptables = util.paras_host(host, ":")
+ local ip = iptables[1]
+ local port = iptables[2]
+ local byt = conn:connect(ip, port)
+ if not byt then
+ return nil, "connection error" .. host
+ end
+ local bytes, err = proto:serialize(proto.request_header, proto.connect_request)
+ local b, err = conn:write(bytes)
+ if not b then
+ return nil, "connect error " .. ip + ":" .. port
+ end
+ local len = conn:read_len()
+ if not len then
+ return nil, "error"
+ end
+ local bytes = conn:read(len)
+ if not bytes then
+ return nil, "connection read error"
+ end
+ local rsp = proto.connect_response:unpack(bytes, 1)
+ if not rsp then
+ return nil, "error"
+ end
+ self.xid = 0
+ local t = rsp.timeout
+ self.session_timeout = rsp.timeout
+ self.ping_time = (t / 3) / 1000
+ self.host = host
+ self.session_id = rsp.session_id
+ local tostring =
+ "proto_ver:" ..
+ rsp.proto_ver ..
+ "," .. "timeout:" .. rsp.timeout .. "," .. "session_id:" .. util.long_to_hex_string(rsp.session_id)
+ ngx_log(ngx.INFO, tostring)
+ return true, nil
+end
+
+function _M.get_children(self, path)
+ return self:_get_children(path, 0)
+end
+
+function _M._get_children(self, path, is_watch)
+ local conn = self.conn
+ if not conn then
+ return nil, "not initialized connection"
+ end
+ local xid = self.xid + 1
+ local h = proto.request_header
+ h.xid = xid
+ h.type = const.ZOO_GET_CHILDREN
+ local r = proto.get_children_request
+ r.path = path
+ r.watch = is_watch
+ local req = proto:serialize(h, r)
+ local bytes, err = conn:write(req)
+ if not bytes then
+ return bytes, "write bytes error"
+ end
+ -- If other data is received, it means that the data of the _get_children command has not been received
+ ::continue::
+ local rsp_header, bytes, end_index = conn:read_headler()
+ if not rsp_header then
+ return nil, "read headler error"
+ end
+ if rsp_header.err ~= 0 then
+ ngx_log(ngx.ERR, "zookeeper remote error: " .. const.get_err_msg(rsp_header.err) .. "," .. path)
+ return nil, const.get_err_msg(rsp_header.err)
+ end
+ if strlen(bytes) > 16 and rsp_header.xid > 0 then
+ self.xid = rsp_header.xid + 1
+ local get_children_response = proto.get_children_response:unpack(bytes, end_index)
+ return {
+ xid = rsp_header.xid,
+ zxid = rsp_header.zxid,
+ path = get_children_response.paths
+ }
+ end
+ if rsp_header.xid == const.XID_PING then
+ goto continue
+ end
+ return nil, "get_children error"
+end
+
+function _M.add_watch(self, path)
+ -- body
+ local d, e = self:_get_children(path, 1)
+ if not d then
+ return d, e
+ end
+ self.watch = true
+ return d, nil
+end
+
+local function reply_read(self, callback)
+ local conn = self.conn
+ local h = proto.request_header
+ h.xid = const.XID_PING
+ h.type = const.ZOO_PING_OP
+ local req = proto:serialize(h, proto.ping_request)
+ local ok, err = conn:write(req)
+ if ok then
+ local h, bytes, end_start = conn:read_headler()
+ if h.xid == const.XID_PING then
+ ngx_log(
+ ngx.DEBUG,
+ "Got ping zookeeper response host:" ..
+ self.host .. " for sessionId:" .. util.long_to_hex_string(self.session_id)
+ )
+ elseif h.xid == const.XID_WATCH_EVENT then
+ --decoding
+ local watch_event = proto.watch_event:unpack(bytes, end_start)
+ -- local xid, done, err, type, state = unpack(">iliii", bytes)
+ -- local eventPath = unpack_strings(strsub(bytes, 25))
+ local t = watch_event.paths[1]
+ local d, e = self:add_watch("" .. t)
+ if d then
+ callback(d.path)
+ end
+ end
+ end
+ return ok, err
+end
+
+function _M.watch_receive(self, callback)
+ local last_time = 0
+ while true do
+ if exiting() then
+ self.conn.close()
+ return true
+ end
+ local can_ping = now() - last_time > self.ping_time
+ if can_ping then
+ local ok, err = reply_read(self, callback)
+ if err then
+ return nil, err
+ end
+ last_time = now()
+ end
+ sleep(0.2)
+ end
+end
+
+return _M
diff --git a/lib/shenyu/register/zookeeper/zk_cluster.lua b/lib/shenyu/register/zookeeper/zk_cluster.lua
new file mode 100644
index 0000000..d13f4f3
--- /dev/null
+++ b/lib/shenyu/register/zookeeper/zk_cluster.lua
@@ -0,0 +1,86 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an"AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations un
+local zkclient = require("shenyu.register.zookeeper.zk_client")
+local ngx_log = ngx.log
+local ipairs = ipairs
+local _M = {}
+local mt = {__index = _M}
+
+function _M.new(self, zk_config)
+ -- body
+ return setmetatable({servers = zk_config.servers}, mt)
+end
+
+function _M.connect(self)
+ local servers = self.servers
+ if not servers then
+ return nil, "servers is null"
+ end
+ -- initialize
+ local client, err = zkclient:new()
+ if not client then
+ ngx_log(ngx.ERR, "Failed to initialize zk Client" .. err)
+ return nil, err
+ end
+ for _, _host in ipairs(servers) do
+ ngx_log(ngx.INFO, "try to connect to zookeeper host : " .. _host)
+ local ok, err = client:connect(_host)
+ if not ok then
+ ngx_log(ngx.INFO, "Failed to connect to zookeeper host : " .. _host .. err)
+ else
+ ngx_log(ngx.INFO, "Successful connection to zookeeper host : " .. _host)
+ self.client = client
+ return client
+ end
+ end
+ ngx_log(ngx.ERR, "Failed to connect to zookeeper")
+ return nil
+end
+
+function _M.get_children(self, path)
+ local client = self.client
+ if not client then
+ ngx_log(ngx.ERR, "conn not initialized")
+ end
+ local data, error = client:get_children(path)
+ if not data then
+ return nil, error
+ end
+ return data, nil
+end
+
+local function _watch_receive(self, callback)
+ local client = self.client
+ if not client then
+ ngx_log(ngx.ERR, "conn not initialized")
+ end
+ return client:watch_receive(callback)
+end
+
+function _M.add_watch(self, path, callback)
+ local client = self.client
+ if not client then
+ ngx_log(ngx.ERR, "conn not initialized")
+ end
+ local data, err = client:add_watch(path)
+ if data then
+ callback(data.path)
+ return _watch_receive(self, callback)
+ end
+ return data, err
+end
+
+return _M
diff --git a/lib/shenyu/register/zookeeper/zk_const.lua b/lib/shenyu/register/zookeeper/zk_const.lua
new file mode 100644
index 0000000..2aa7e3a
--- /dev/null
+++ b/lib/shenyu/register/zookeeper/zk_const.lua
@@ -0,0 +1,85 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"), you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an"AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local _M = {}
+
+-- XID
+_M.XID_WATCH_EVENT = -1
+
+_M.XID_PING = -2
+
+_M.XID_SET_WATCHES = -8
+
+--op code
+
+_M.ZOO_GET_CHILDREN = 8
+
+_M.ZOO_PING_OP = 11
+
+_M.ZOO_GET_CHILDREN2 = 12
+
+_M.ZOO_SET_WATCHES = 101
+
+_M.ZOO_ADD_WATCH = 106
+
+_M.ZK_WATCH_PATH = "/shenyu/register/instance";
+
+--Definition of error codes.
+local error_code = {
+ "0", "-1", "-2", "-3", "-4", "-5", "-6", "-7", "-8", "-100",
+ "-101", "-102", "-103", "-108", "-110", "-111", "-112",
+ "-113", "-114", "-115", "-118", "-119" }
+
+
+--errorcode
+local error_msg = {
+ err0 = "Ok",
+ err1 = "System error",
+ err2 = "Runtime inconsistency",
+ err3 = "Data inconsistency",
+ err4 = "Connection loss",
+ err5 = "Marshalling error",
+ err6 = "Operation is unimplemented",
+ err7 = "Operation timeout",
+ err8 = "Invalid arguments",
+ err9 = "API errors.",
+ err101 = "Node does not exist",
+ err102 = "Not authenticated",
+ err103 = "Version conflict",
+ err108 = "Ephemeral nodes may not have children",
+ err110 = "The node already exists",
+ err111 = "The node has children",
+ err112 = "The session has been expired by the server",
+ err113 = "Invalid callback specified",
+ err114 = "Invalid ACL specified",
+ err115 = "Client authentication failed",
+ err118 = "Session moved to another server, so operation is ignored",
+ err119 = "State-changing request is passed to read-only server",
+}
+for i = 1, #error_code do
+ local cmd = "err" .. (error_code[i] * -1)
+ _M[cmd] = error_msg.cmd
+end
+
+function _M.get_err_msg(code)
+ if not code then
+ return "unknown"
+ end
+ return error_msg["err" .. (code * -1)]
+end
+
+return _M
\ No newline at end of file
diff --git a/lib/shenyu/register/zookeeper/zk_proto.lua b/lib/shenyu/register/zookeeper/zk_proto.lua
new file mode 100644
index 0000000..2eacefe
--- /dev/null
+++ b/lib/shenyu/register/zookeeper/zk_proto.lua
@@ -0,0 +1,157 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an"AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local struct = require("shenyu.register.core.struct")
+local pack = struct.pack
+local unpack = struct.unpack
+local tbunpack = struct.tbunpack
+local strbyte = string.byte
+local strlen = string.len
+local strsub = string.sub
+
+local _M = {}
+
+local _base = {
+ new = function(self, o)
+ o = o or {}
+ setmetatable(o, self)
+ return o
+ end
+}
+
+local _request_header =
+ _base:new {
+ xid = 0,
+ type = 0,
+ pack = function(self)
+ return pack(">II", self.xid, self.type)
+ end
+}
+
+local _get_children_request =
+ _base:new {
+ path = "",
+ watch = 0,
+ pack = function(self)
+ local path_len = strlen(self.path)
+ return pack(">ic" .. path_len .. "b", path_len, self.path, strbyte(self.watch))
+ end
+}
+
+local _ping_request =
+ _base:new {
+ pack = function(self)
+ local stream = {}
+ return table.concat(stream)
+ end
+}
+
+local _connect_request =
+ _base:new {
+ protocol_version = 0,
+ last_zxid_seen = 0,
+ timeout = 0,
+ session_id = 0,
+ password = "",
+ pack = function(self)
+ return pack(">lilic16", 0, 0, 0, 0, "")
+ end
+}
+
+function _M.serialize(self, h, request)
+ local h_bytes = h:pack()
+ local r_bytes = request:pack()
+ local len = h_bytes:len() + r_bytes:len()
+ local len_bytes = pack(">i", len)
+ return len_bytes .. h_bytes .. r_bytes
+end
+
+_M.get_children_request = _get_children_request
+_M.request_header = _request_header
+_M.ping_request = _ping_request
+_M.connect_request = _connect_request
+
+--basics of response.
+local _reply_header =
+ _base:new {
+ xid = 0, -- int
+ zxid = 0, -- long
+ err = 0, -- int
+ unpack = function(self, bytes, start_index)
+ local vars, end_index = unpack(">ili", bytes, start_index)
+ self.xid, self.zxid, self.err = tbunpack(vars)
+ return self, end_index
+ end
+}
+
+local _connect_response =
+ _base:new {
+ proto_ver = 0,
+ timeout = 0,
+ session_id = 0,
+ password = "",
+ unpack = function(self, bytes, start_index)
+ local vars, end_index = unpack(">iilS", bytes, start_index)
+ self.proto_ver, self.timeout, self.session_id, self.password = tbunpack(vars)
+ return self, end_index
+ end
+}
+
+local function unpack_strings(str)
+ local size = strlen(str)
+ local pos = 0
+ local str_set = {}
+ local index = 1
+ while size > pos do
+ local vars = unpack(">i", strsub(str, 1 + pos, 4 + pos))
+ local len = tbunpack(vars)
+ vars = unpack(">c" .. len, strsub(str, 5 + pos, 5 + pos + len - 1))
+ local s = tbunpack(vars)
+ str_set[index] = s
+ index = index + 1
+ pos = pos + len + 4
+ end
+ return str_set
+end
+
+local _get_children_response =
+ _base:new {
+ paths = {},
+ unpack = function(self, bytes, start_index)
+ self.paths = unpack_strings(strsub(bytes, 21))
+ return self
+ end
+}
+
+local _watch_event =
+ _base:new {
+ type = 0,
+ state = 0,
+ paths = {},
+ unpack = function(self, bytes, start_index)
+ local vars = unpack(">ii", bytes, start_index)
+ self.type, self.state = tbunpack(vars)
+ self.paths = unpack_strings(strsub(bytes, 25))
+ return self
+ end
+}
+
+_M.reply_header = _reply_header
+_M.connect_response = _connect_response
+_M.get_children_response = _get_children_response
+_M.watch_event = _watch_event
+
+return _M
diff --git a/rockspec/shenyu-nginx-main-0.rockspec b/rockspec/shenyu-nginx-main-0.rockspec
index f6ef077..df19e0d 100644
--- a/rockspec/shenyu-nginx-main-0.rockspec
+++ b/rockspec/shenyu-nginx-main-0.rockspec
@@ -15,6 +15,7 @@ dependencies = {
"lua-resty-balancer >= 0.04",
"lua-resty-http >= 0.15",
"lua-cjson = 2.1.0.6-1",
+ "stringy = 0.7-0",
}
build = {
@@ -23,5 +24,14 @@ build = {
["shenyu.register.etcd"] = "lib/shenyu/register/etcd.lua",
["shenyu.register.nacos"] = "lib/shenyu/register/nacos.lua",
["shenyu.register.balancer"] = "lib/shenyu/register/balancer.lua",
+ ["shenyu.register.zookeeper"] = "lib/shenyu/register/zookeeper.lua",
+ ["shenyu.register.zookeeper.connection"] = "lib/shenyu/register/zookeeper/connection.lua",
+ ["shenyu.register.zookeeper.zk_client"] = "lib/shenyu/register/zookeeper/zk_client.lua",
+ ["shenyu.register.zookeeper.zk_cluster"] = "lib/shenyu/register/zookeeper/zk_cluster.lua",
+ ["shenyu.register.zookeeper.zk_const"] = "lib/shenyu/register/zookeeper/zk_const.lua",
+ ["shenyu.register.zookeeper.zk_proto"] = "lib/shenyu/register/zookeeper/zk_proto.lua",
+ ["shenyu.register.core.string"] = "lib/shenyu/register/core/string.lua",
+ ["shenyu.register.core.struct"] = "lib/shenyu/register/core/struct.lua",
+ ["shenyu.register.core.utils"] = "lib/shenyu/register/core/utils.lua",
}
}