You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shenyu.apache.org by im...@apache.org on 2022/07/02 09:26:36 UTC

[incubator-shenyu-nginx] branch main updated: 1. commit zookeeper connection (#10)

This is an automated email from the ASF dual-hosted git repository.

impactcn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-shenyu-nginx.git


The following commit(s) were added to refs/heads/main by this push:
     new eb1bbc8  1. commit zookeeper connection (#10)
eb1bbc8 is described below

commit eb1bbc8a69f30ad22d1e4e7a97e0bfd40ba9b9e2
Author: Sixh-PrFor <ch...@163.com>
AuthorDate: Sat Jul 2 17:26:33 2022 +0800

    1. commit zookeeper connection (#10)
    
    * 1. commit zookeeper connection
    
    * Increase the processing of heartbeats
    
    * 1. Send a request to subscribe
    
    * 1. zookeeper watch event...
    
    * 1. zookeeper watch event...
    
    * 1. update zookeeper proto.
    
    * 1. update zookeeper proto.
    
    * update zookeeper proto.
    
    * update zookeeper proto.
    
    * Modify Chinese
    
    * Modify Chinese
    
    * Modify Chinese
---
 .gitignore                                   |   3 +
 example/zookeeper/nginx.conf                 |  55 +++++++
 lib/shenyu/register/balancer.lua             |   2 -
 lib/shenyu/register/core/string.lua          |  31 ++++
 lib/shenyu/register/core/struct.lua          | 209 +++++++++++++++++++++++++++
 lib/shenyu/register/core/utils.lua           |  39 +++++
 lib/shenyu/register/etcd.lua                 |  46 +++---
 lib/shenyu/register/zookeeper.lua            |  95 ++++++++++++
 lib/shenyu/register/zookeeper/connection.lua | 127 ++++++++++++++++
 lib/shenyu/register/zookeeper/zk_client.lua  | 186 ++++++++++++++++++++++++
 lib/shenyu/register/zookeeper/zk_cluster.lua |  86 +++++++++++
 lib/shenyu/register/zookeeper/zk_const.lua   |  85 +++++++++++
 lib/shenyu/register/zookeeper/zk_proto.lua   | 157 ++++++++++++++++++++
 rockspec/shenyu-nginx-main-0.rockspec        |  10 ++
 14 files changed, 1111 insertions(+), 20 deletions(-)

diff --git a/.gitignore b/.gitignore
index 276ba56..af1c99a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -29,3 +29,6 @@ logs/
 *.diff
 *.patch
 *.tmp
+
+/out/
+/.vscode/
diff --git a/example/zookeeper/nginx.conf b/example/zookeeper/nginx.conf
new file mode 100644
index 0000000..a53b238
--- /dev/null
+++ b/example/zookeeper/nginx.conf
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+worker_processes  2;
+daemon off;
+error_log /dev/stdout debug;
+
+events {
+    worker_connections 1024;
+}
+http {
+    lua_shared_dict shenyu_storage 1m;
+
+#     lua_package_path "$prefix/lib/?.lua;;";
+
+    init_worker_by_lua_block {
+        local register = require("shenyu.register.zookeeper")
+        register.init({
+           servers = {"127.0.0.1:2181"},
+           shenyu_storage = ngx.shared.shenyu_storage,
+           balancer_type = "chash"
+        });
+    }
+
+    upstream shenyu {
+        server 0.0.0.1;
+        balancer_by_lua_block {
+            require("shenyu.register.zookeeper").pick_and_set_peer()
+        }
+    }
+
+    server {
+        listen 80;
+
+        location ~ /* {
+            proxy_pass http://shenyu;
+        }
+    }
+}
+
+	
diff --git a/lib/shenyu/register/balancer.lua b/lib/shenyu/register/balancer.lua
index 39d37ad..5407bc8 100644
--- a/lib/shenyu/register/balancer.lua
+++ b/lib/shenyu/register/balancer.lua
@@ -14,7 +14,6 @@
 -- See the License for the specific language governing permissions and
 -- limitations under the License.
 --
-
 local _M = {}
 local str_null = string.char(0)
 
@@ -25,7 +24,6 @@ function _M.new(balancer_type)
             local servers, nodes = {}, {}
             for serv, weight in pairs(server_list) do
                 local id = string.gsub(serv, ":", str_null)
-
                 servers[id] = serv
                 nodes[id] = weight
             end
diff --git a/lib/shenyu/register/core/string.lua b/lib/shenyu/register/core/string.lua
new file mode 100644
index 0000000..089b4b5
--- /dev/null
+++ b/lib/shenyu/register/core/string.lua
@@ -0,0 +1,31 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an"AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local table_insert          = table.insert
+local _M                    ={}
+
+
+function _M.split(str, delimiter)
+    if not str or str == "" then return {} end
+    if not delimiter or delimiter == "" then return { str } end
+    local result = {}
+    for match in (str .. delimiter):gmatch("(.-)" .. delimiter) do
+        table_insert(result, match)
+    end
+    return result
+end
+
+return _M
\ No newline at end of file
diff --git a/lib/shenyu/register/core/struct.lua b/lib/shenyu/register/core/struct.lua
new file mode 100644
index 0000000..d26d350
--- /dev/null
+++ b/lib/shenyu/register/core/struct.lua
@@ -0,0 +1,209 @@
+--[[
+ * Copyright (c) 2015-2020 Iryont <https://github.com/iryont/lua-struct>
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+]]
+local unpack = table.unpack or _G.unpack
+local struct = {}
+
+function struct.pack(format, ...)
+  local stream = {}
+  local vars = {...}
+  local endianness = true
+
+  for i = 1, format:len() do
+    local opt = format:sub(i, i)
+
+    if opt == "<" then
+      endianness = true
+    elseif opt == ">" then
+      endianness = false
+    elseif opt:find("[bBhHiIlL]") then
+      local n = opt:find("[hH]") and 2 or opt:find("[iI]") and 4 or opt:find("[lL]") and 8 or 1
+      local val = tonumber(table.remove(vars, 1))
+
+      local bytes = {}
+      for j = 1, n do
+        table.insert(bytes, string.char(val % (2 ^ 8)))
+        val = math.floor(val / (2 ^ 8))
+      end
+
+      if not endianness then
+        table.insert(stream, string.reverse(table.concat(bytes)))
+      else
+        table.insert(stream, table.concat(bytes))
+      end
+    elseif opt:find("[fd]") then
+      local val = tonumber(table.remove(vars, 1))
+      local sign = 0
+
+      if val < 0 then
+        sign = 1
+        val = -val
+      end
+
+      local mantissa, exponent = math.frexp(val)
+      if val == 0 then
+        mantissa = 0
+        exponent = 0
+      else
+        mantissa = (mantissa * 2 - 1) * math.ldexp(0.5, (opt == "d") and 53 or 24)
+        exponent = exponent + ((opt == "d") and 1022 or 126)
+      end
+
+      local bytes = {}
+      if opt == "d" then
+        val = mantissa
+        for i = 1, 6 do
+          table.insert(bytes, string.char(math.floor(val) % (2 ^ 8)))
+          val = math.floor(val / (2 ^ 8))
+        end
+      else
+        table.insert(bytes, string.char(math.floor(mantissa) % (2 ^ 8)))
+        val = math.floor(mantissa / (2 ^ 8))
+        table.insert(bytes, string.char(math.floor(val) % (2 ^ 8)))
+        val = math.floor(val / (2 ^ 8))
+      end
+
+      table.insert(bytes, string.char(math.floor(exponent * ((opt == "d") and 16 or 128) + val) % (2 ^ 8)))
+      val = math.floor((exponent * ((opt == "d") and 16 or 128) + val) / (2 ^ 8))
+      table.insert(bytes, string.char(math.floor(sign * 128 + val) % (2 ^ 8)))
+      val = math.floor((sign * 128 + val) / (2 ^ 8))
+
+      if not endianness then
+        table.insert(stream, string.reverse(table.concat(bytes)))
+      else
+        table.insert(stream, table.concat(bytes))
+      end
+    elseif opt == "s" then
+      table.insert(stream, tostring(table.remove(vars, 1)))
+      table.insert(stream, string.char(0))
+    elseif opt == "c" then
+      local n = format:sub(i + 1):match("%d+")
+      local str = tostring(table.remove(vars, 1))
+      local len = tonumber(n)
+      if len <= 0 then
+        len = str:len()
+      end
+      if len - str:len() > 0 then
+        str = str .. string.rep(" ", len - str:len())
+      end
+      table.insert(stream, str:sub(1, len))
+      i = i + n:len()
+    end
+  end
+
+  return table.concat(stream)
+end
+
+function struct.unpack(format, stream, pos)
+  local vars = {}
+  local iterator = pos or 1
+  local endianness = true
+
+  for i = 1, format:len() do
+    local opt = format:sub(i, i)
+
+    if opt == "<" then
+      endianness = true
+    elseif opt == ">" then
+      endianness = false
+    elseif opt:find("[bBhHiIlL]") then
+      local n = opt:find("[hH]") and 2 or opt:find("[iI]") and 4 or opt:find("[lL]") and 8 or 1
+      local signed = opt:lower() == opt
+
+      local val = 0
+      for j = 1, n do
+        local byte = string.byte(stream:sub(iterator, iterator))
+        if endianness then
+          val = val + byte * (2 ^ ((j - 1) * 8))
+        else
+          val = val + byte * (2 ^ ((n - j) * 8))
+        end
+        iterator = iterator + 1
+      end
+
+      if signed and val >= 2 ^ (n * 8 - 1) then
+        val = val - 2 ^ (n * 8)
+      end
+
+      table.insert(vars, math.floor(val))
+    elseif opt:find("[fd]") then
+      local n = (opt == "d") and 8 or 4
+      local x = stream:sub(iterator, iterator + n - 1)
+      iterator = iterator + n
+
+      if not endianness then
+        x = string.reverse(x)
+      end
+
+      local sign = 1
+      local mantissa = string.byte(x, (opt == "d") and 7 or 3) % ((opt == "d") and 16 or 128)
+      for i = n - 2, 1, -1 do
+        mantissa = mantissa * (2 ^ 8) + string.byte(x, i)
+      end
+
+      if string.byte(x, n) > 127 then
+        sign = -1
+      end
+
+      local exponent =
+        (string.byte(x, n) % 128) * ((opt == "d") and 16 or 2) +
+        math.floor(string.byte(x, n - 1) / ((opt == "d") and 16 or 128))
+      if exponent == 0 then
+        table.insert(vars, 0.0)
+      else
+        mantissa = (math.ldexp(mantissa, (opt == "d") and -52 or -23) + 1) * sign
+        table.insert(vars, math.ldexp(mantissa, exponent - ((opt == "d") and 1023 or 127)))
+      end
+    elseif opt == "s" then
+      local bytes = {}
+      for j = iterator, stream:len() do
+        if stream:sub(j, j) == string.char(0) or stream:sub(j) == "" then
+          break
+        end
+
+        table.insert(bytes, stream:sub(j, j))
+      end
+
+      local str = table.concat(bytes)
+      iterator = iterator + str:len() + 1
+      table.insert(vars, str)
+    elseif opt == "c" then
+      local n = format:sub(i + 1):match("%d+")
+      local len = tonumber(n)
+      if len <= 0 then
+        len = table.remove(vars)
+      end
+
+      table.insert(vars, stream:sub(iterator, iterator + len - 1))
+      iterator = iterator + len
+      i = i + n:len()
+    end
+  end
+
+  return vars, iterator
+end
+
+function struct.tbunpack(vars)
+  -- body
+  return unpack(vars)
+end
+
+return struct
diff --git a/lib/shenyu/register/core/utils.lua b/lib/shenyu/register/core/utils.lua
new file mode 100644
index 0000000..5f74206
--- /dev/null
+++ b/lib/shenyu/register/core/utils.lua
@@ -0,0 +1,39 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an"AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local str = require("shenyu.register.core.string")
+local _M = {}
+
+--Delimited string
+function _M.paras_host(host, delimiter)
+    return str.split(host, delimiter)
+end
+
+function _M.long_to_hex_string(long)
+    return string.format("0X%06X", long)
+end
+
+-- table len
+function _M.table_len(args)
+    -- body
+    local n = 0
+    if args then
+        n = #args
+    end
+    return n
+end
+
+return _M
diff --git a/lib/shenyu/register/etcd.lua b/lib/shenyu/register/etcd.lua
index 4fd7c4f..da20d13 100644
--- a/lib/shenyu/register/etcd.lua
+++ b/lib/shenyu/register/etcd.lua
@@ -83,7 +83,7 @@ local function parse_base_url(base_url)
         host = m[2],
         port = tonumber(m[3]),
         base_url = base_url,
-        prefix = detect_etcd_version(base_url),
+        prefix = detect_etcd_version(base_url)
     }
 end
 
@@ -95,14 +95,18 @@ end
 local function fetch_shenyu_instances(conf)
     local range_request = {
         key = encode_base64(_M.start_key),
-        range_end = encode_base64(_M.end_key),
+        range_end = encode_base64(_M.end_key)
     }
 
     local httpc = http.new()
-    local res, err = httpc:request_uri(conf.base_url .. conf.prefix .. "/kv/range", {
-        method = "POST",
-        body = json.encode(range_request),
-    })
+    local res, err =
+        httpc:request_uri(
+        conf.base_url .. conf.prefix .. "/kv/range",
+        {
+            method = "POST",
+            body = json.encode(range_request)
+        }
+    )
     if not res then
         return nil, "failed to list shenyu instances from etcd, " .. (err or "unknown")
     end
@@ -267,11 +271,14 @@ local function watch(premature, watching)
     else
         local conf = _M.etcd_conf
         local httpc = http.new()
-        local ok, err = httpc:connect({
-            scheme = conf.scheme,
-            host = conf.host,
-            port = tonumber(conf.port),
-        })
+        local ok, err =
+            httpc:connect(
+            {
+                scheme = conf.scheme,
+                host = conf.host,
+                port = tonumber(conf.port)
+            }
+        )
         if not ok then
             log(ERR, "failed to connect to etcd server", err)
             _M.time_at = 3
@@ -292,15 +299,18 @@ local function watch(premature, watching)
             create_request = {
                 key = encode_base64(_M.start_key),
                 range_end = encode_base64(_M.end_key),
-                start_revision = _M.revision,
+                start_revision = _M.revision
             }
         }
 
-        local res, err = httpc:request({
-            path = "/v3/watch",
-            method = "POST",
-            body = json.encode(request),
-        })
+        local res, err =
+            httpc:request(
+            {
+                path = "/v3/watch",
+                method = "POST",
+                body = json.encode(request)
+            }
+        )
         if not res then
             log(ERR, "failed to watch keys under '/shenyu/register/instance/'", err)
             _M.time_at = 3
@@ -330,7 +340,7 @@ local function watch(premature, watching)
         end
     end
 
-    :: continue ::
+    ::continue::
     local ok, err = ngx_timer_at(_M.time_at, watch, watching)
     if not ok then
         log(ERR, "failed to start watch: ", err)
diff --git a/lib/shenyu/register/zookeeper.lua b/lib/shenyu/register/zookeeper.lua
new file mode 100644
index 0000000..b7a3518
--- /dev/null
+++ b/lib/shenyu/register/zookeeper.lua
@@ -0,0 +1,95 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an"AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local zk_cluster = require("shenyu.register.zookeeper.zk_cluster")
+local ngx_balancer = require("ngx.balancer")
+local balancer = require("shenyu.register.balancer")
+local const = require("shenyu.register.zookeeper.zk_const")
+local ngx_timer_at = ngx.timer.at
+local xpcall = xpcall
+local ngx_log = ngx.log
+local math = string.match
+local zc
+local _M = {
+    isload = 0,
+    nodes = {}
+}
+
+local function watch_data(data)
+    local server_lists = {}
+    -- body
+    for index, value in ipairs(data) do
+        if math(value, ":") then
+            server_lists[value] = 1
+        end
+    end
+    local s_nodes = _M.nodes
+    for host, index in pairs(server_lists) do
+        if not s_nodes[host] then
+            ngx_log(ngx.INFO, "add shenyu server:" .. host)
+        end
+    end
+    for host, index in pairs(s_nodes) do
+        if not server_lists[host] then
+            ngx_log(ngx.INFO, "remove shenyu server:" .. host)
+        end
+    end
+    if (_M.isload > 1) then
+        _M.balancer:reinit(server_lists)
+    else
+        _M.balancer:init(server_lists)
+    end
+    _M.isload = _M.isload + 1
+    _M.nodes = server_lists
+end
+
+local function watch(premature, path)
+    local ok, err = zc:connect()
+    if ok then
+        ok, err =
+            xpcall(
+            zc:add_watch(path, watch_data),
+            function(err)
+                ngx_log(ngx.ERR, "zookeeper start watch error..." .. tostring(err))
+            end
+        )
+    end
+    return ok, err
+end
+
+function _M.init(config)
+    _M.storage = config.shenyu_storage
+    _M.balancer = balancer.new(config.balancer_type)
+    zc = zk_cluster:new(config)
+    if ngx.worker.id() == 0 then
+        -- Start the zookeeper watcher
+        local ok, err = ngx_timer_at(2, watch, const.ZK_WATCH_PATH)
+        if not ok then
+            ngx_log(ngx.ERR, "failed to start watch: " .. err)
+        end
+        return
+    end
+end
+
+function _M.pick_and_set_peer(key)
+    local server = _M.balancer:find(key)
+    if not server then
+        ngx_log(ngx.ERR, "not find shenyu server..")
+        return
+    end
+    ngx_balancer.set_current_peer(server)
+end
+return _M
diff --git a/lib/shenyu/register/zookeeper/connection.lua b/lib/shenyu/register/zookeeper/connection.lua
new file mode 100644
index 0000000..4076f91
--- /dev/null
+++ b/lib/shenyu/register/zookeeper/connection.lua
@@ -0,0 +1,127 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an"AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local proto = require("shenyu.register.zookeeper.zk_proto")
+local struct = require("shenyu.register.core.struct")
+local tcp = ngx.socket.tcp
+local pack = struct.pack
+local unpack = struct.unpack
+local tbunpack = struct.tbunpack
+local ngx_log = ngx.log
+local _timeout = 60 * 1000
+local _M = {}
+local mt = {__index = _M}
+
+function _M.new(self)
+    local sock, err = tcp()
+    if not tcp then
+        return nil, err
+    end
+    return setmetatable({sock = sock, timeout = _timeout}, mt)
+end
+
+function _M.connect(self, ip, port)
+    -- body
+    local sock = self.sock
+    if not sock then
+        return nil, "not initialized tcp."
+    end
+    local ok, err = sock:connect(ip, port)
+    if not ok then
+        ngx_log(ngx.DEBUG, "connect host:" .. ip .. err)
+        return ok, err
+    end
+    return ok, nil
+end
+
+function _M.write(self, req)
+    -- body
+    local sock = self.sock
+    if not sock then
+        return nil, "not initialized tpc."
+    end
+    return sock:send(req)
+end
+
+function _M.read(self, len)
+    -- body
+    local sock = self.sock
+    if not sock then
+        return nil, "not initialized tpc."
+    end
+    return sock:receive(len)
+end
+
+function _M.read_len(self)
+    local b, err = self:read(4)
+    if not b then
+        return nil, "error"
+    end
+    local len = tbunpack(unpack(">i", b))
+    return len
+end
+
+function _M.read_headler(self)
+    local len = self:read_len()
+    local b, err = self:read(len)
+    if not b then
+        return nil, "error"
+    end
+    local h, end_index = proto.reply_header:unpack(b, 1)
+    if not h then
+        return nil, nil, 0
+    end
+    return h, b, end_index
+end
+
+function _M.close(self)
+    -- body
+    local sock = self.sock
+    if not sock then
+        return nil, "not initialized tpc."
+    end
+    sock.close()
+end
+
+function _M.set_timeout(self, timeout)
+    -- body
+    local sock = self.sock
+    if not sock then
+        return nil, "not initialized"
+    end
+    sock:settimeout(timeout)
+end
+
+function _M.set_keepalive(self, ...)
+    local sock = self.sock
+    if not sock then
+        return nil, "not initialized"
+    end
+
+    return sock:setkeepalive(...)
+end
+
+function _M.set_timeouts(self, connect_timeout, send_timeout, read_timeout)
+    local sock = self.sock
+    if not sock then
+        error("not initialized", 2)
+        return
+    end
+
+    sock:settimeouts(connect_timeout, send_timeout, read_timeout)
+end
+
+return _M
diff --git a/lib/shenyu/register/zookeeper/zk_client.lua b/lib/shenyu/register/zookeeper/zk_client.lua
new file mode 100644
index 0000000..ceb490b
--- /dev/null
+++ b/lib/shenyu/register/zookeeper/zk_client.lua
@@ -0,0 +1,186 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an"AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local util = require("shenyu.register.core.utils")
+local const = require("shenyu.register.zookeeper.zk_const")
+local proto = require("shenyu.register.zookeeper.zk_proto")
+local connection = require("shenyu.register.zookeeper.connection")
+local ngx_log = ngx.log
+local now = ngx.now
+local exiting = ngx.worker.exiting
+local sleep = ngx.sleep
+local strlen = string.len
+local _timeout = 60 * 1000
+local _M = {}
+local mt = {__index = _M}
+
+function _M.new(self)
+    local conn_, err = connection:new()
+    if not conn_ then
+        return nil, "initialized connection error" .. err
+    end
+    conn_:set_timeout(_timeout)
+    conn_:set_keepalive()
+    return setmetatable({conn = conn_}, mt)
+end
+
+function _M.connect(self, host)
+    -- body
+    local conn = self.conn
+    local iptables = util.paras_host(host, ":")
+    local ip = iptables[1]
+    local port = iptables[2]
+    local byt = conn:connect(ip, port)
+    if not byt then
+        return nil, "connection error" .. host
+    end
+    local bytes, err = proto:serialize(proto.request_header, proto.connect_request)
+    local b, err = conn:write(bytes)
+    if not b then
+        return nil, "connect error " .. ip + ":" .. port
+    end
+    local len = conn:read_len()
+    if not len then
+        return nil, "error"
+    end
+    local bytes = conn:read(len)
+    if not bytes then
+        return nil, "connection read error"
+    end
+    local rsp = proto.connect_response:unpack(bytes, 1)
+    if not rsp then
+        return nil, "error"
+    end
+    self.xid = 0
+    local t = rsp.timeout
+    self.session_timeout = rsp.timeout
+    self.ping_time = (t / 3) / 1000
+    self.host = host
+    self.session_id = rsp.session_id
+    local tostring =
+        "proto_ver:" ..
+        rsp.proto_ver ..
+            "," .. "timeout:" .. rsp.timeout .. "," .. "session_id:" .. util.long_to_hex_string(rsp.session_id)
+    ngx_log(ngx.INFO, tostring)
+    return true, nil
+end
+
+function _M.get_children(self, path)
+    return self:_get_children(path, 0)
+end
+
+function _M._get_children(self, path, is_watch)
+    local conn = self.conn
+    if not conn then
+        return nil, "not initialized connection"
+    end
+    local xid = self.xid + 1
+    local h = proto.request_header
+    h.xid = xid
+    h.type = const.ZOO_GET_CHILDREN
+    local r = proto.get_children_request
+    r.path = path
+    r.watch = is_watch
+    local req = proto:serialize(h, r)
+    local bytes, err = conn:write(req)
+    if not bytes then
+        return bytes, "write bytes error"
+    end
+    --  If other data is received, it means that the data of the _get_children command has not been received
+    ::continue::
+    local rsp_header, bytes, end_index = conn:read_headler()
+    if not rsp_header then
+        return nil, "read headler error"
+    end
+    if rsp_header.err ~= 0 then
+        ngx_log(ngx.ERR, "zookeeper remote error: " .. const.get_err_msg(rsp_header.err) .. "," .. path)
+        return nil, const.get_err_msg(rsp_header.err)
+    end
+    if strlen(bytes) > 16 and rsp_header.xid > 0 then
+        self.xid = rsp_header.xid + 1
+        local get_children_response = proto.get_children_response:unpack(bytes, end_index)
+        return {
+            xid = rsp_header.xid,
+            zxid = rsp_header.zxid,
+            path = get_children_response.paths
+        }
+    end
+    if rsp_header.xid == const.XID_PING then
+        goto continue
+    end
+    return nil, "get_children error"
+end
+
+function _M.add_watch(self, path)
+    -- body
+    local d, e = self:_get_children(path, 1)
+    if not d then
+        return d, e
+    end
+    self.watch = true
+    return d, nil
+end
+
+local function reply_read(self, callback)
+    local conn = self.conn
+    local h = proto.request_header
+    h.xid = const.XID_PING
+    h.type = const.ZOO_PING_OP
+    local req = proto:serialize(h, proto.ping_request)
+    local ok, err = conn:write(req)
+    if ok then
+        local h, bytes, end_start = conn:read_headler()
+        if h.xid == const.XID_PING then
+            ngx_log(
+                ngx.DEBUG,
+                "Got ping zookeeper response host:" ..
+                    self.host .. " for sessionId:" .. util.long_to_hex_string(self.session_id)
+            )
+        elseif h.xid == const.XID_WATCH_EVENT then
+            --decoding
+            local watch_event = proto.watch_event:unpack(bytes, end_start)
+            -- local xid, done, err, type, state = unpack(">iliii", bytes)
+            -- local eventPath = unpack_strings(strsub(bytes, 25))
+            local t = watch_event.paths[1]
+            local d, e = self:add_watch("" .. t)
+            if d then
+                callback(d.path)
+            end
+        end
+    end
+    return ok, err
+end
+
+function _M.watch_receive(self, callback)
+    local last_time = 0
+    while true do
+        if exiting() then
+            self.conn.close()
+            return true
+        end
+        local can_ping = now() - last_time > self.ping_time
+        if can_ping then
+            local ok, err = reply_read(self, callback)
+            if err then
+                return nil, err
+            end
+            last_time = now()
+        end
+        sleep(0.2)
+    end
+end
+
+return _M
diff --git a/lib/shenyu/register/zookeeper/zk_cluster.lua b/lib/shenyu/register/zookeeper/zk_cluster.lua
new file mode 100644
index 0000000..d13f4f3
--- /dev/null
+++ b/lib/shenyu/register/zookeeper/zk_cluster.lua
@@ -0,0 +1,86 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an"AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations un
+local zkclient = require("shenyu.register.zookeeper.zk_client")
+local ngx_log = ngx.log
+local ipairs = ipairs
+local _M = {}
+local mt = {__index = _M}
+
+function _M.new(self, zk_config)
+    -- body
+    return setmetatable({servers = zk_config.servers}, mt)
+end
+
+function _M.connect(self)
+    local servers = self.servers
+    if not servers then
+        return nil, "servers is null"
+    end
+    -- initialize
+    local client, err = zkclient:new()
+    if not client then
+        ngx_log(ngx.ERR, "Failed to initialize zk Client" .. err)
+        return nil, err
+    end
+    for _, _host in ipairs(servers) do
+        ngx_log(ngx.INFO, "try to connect to zookeeper host : " .. _host)
+        local ok, err = client:connect(_host)
+        if not ok then
+            ngx_log(ngx.INFO, "Failed to connect to zookeeper host : " .. _host .. err)
+        else
+            ngx_log(ngx.INFO, "Successful connection to zookeeper host : " .. _host)
+            self.client = client
+            return client
+        end
+    end
+    ngx_log(ngx.ERR, "Failed to connect to zookeeper")
+    return nil
+end
+
+function _M.get_children(self, path)
+    local client = self.client
+    if not client then
+        ngx_log(ngx.ERR, "conn not initialized")
+    end
+    local data, error = client:get_children(path)
+    if not data then
+        return nil, error
+    end
+    return data, nil
+end
+
+local function _watch_receive(self, callback)
+    local client = self.client
+    if not client then
+        ngx_log(ngx.ERR, "conn not initialized")
+    end
+    return client:watch_receive(callback)
+end
+
+function _M.add_watch(self, path, callback)
+    local client = self.client
+    if not client then
+        ngx_log(ngx.ERR, "conn not initialized")
+    end
+    local data, err = client:add_watch(path)
+    if data then
+        callback(data.path)
+        return _watch_receive(self, callback)
+    end
+    return data, err
+end
+
+return _M
diff --git a/lib/shenyu/register/zookeeper/zk_const.lua b/lib/shenyu/register/zookeeper/zk_const.lua
new file mode 100644
index 0000000..2aa7e3a
--- /dev/null
+++ b/lib/shenyu/register/zookeeper/zk_const.lua
@@ -0,0 +1,85 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"), you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an"AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local _M = {}
+
+-- XID
+_M.XID_WATCH_EVENT = -1
+
+_M.XID_PING = -2
+
+_M.XID_SET_WATCHES = -8
+
+--op code
+
+_M.ZOO_GET_CHILDREN = 8
+
+_M.ZOO_PING_OP = 11
+
+_M.ZOO_GET_CHILDREN2 = 12
+
+_M.ZOO_SET_WATCHES = 101
+
+_M.ZOO_ADD_WATCH = 106
+
+_M.ZK_WATCH_PATH = "/shenyu/register/instance";
+
+--Definition of error codes.
+local error_code = {
+    "0", "-1", "-2", "-3", "-4", "-5", "-6", "-7", "-8", "-100",
+    "-101", "-102", "-103", "-108", "-110", "-111", "-112",
+    "-113", "-114", "-115", "-118", "-119" }
+
+
+--errorcode
+local error_msg = {
+    err0 = "Ok",
+    err1 = "System error",
+    err2 = "Runtime inconsistency",
+    err3 = "Data inconsistency",
+    err4 = "Connection loss",
+    err5 = "Marshalling error",
+    err6 = "Operation is unimplemented",
+    err7 = "Operation timeout",
+    err8 = "Invalid arguments",
+    err9 = "API errors.",
+    err101 = "Node does not exist",
+    err102 = "Not authenticated",
+    err103 = "Version conflict",
+    err108 = "Ephemeral nodes may not have children",
+    err110 = "The node already exists",
+    err111 = "The node has children",
+    err112 = "The session has been expired by the server",
+    err113 = "Invalid callback specified",
+    err114 = "Invalid ACL specified",
+    err115 = "Client authentication failed",
+    err118 = "Session moved to another server, so operation is ignored",
+    err119 = "State-changing request is passed to read-only server",
+}
+for i = 1, #error_code do
+    local cmd = "err" .. (error_code[i] * -1)
+    _M[cmd] = error_msg.cmd
+end
+
+function _M.get_err_msg(code)
+    if not code then
+        return "unknown"
+    end
+    return error_msg["err" .. (code * -1)]
+end
+
+return _M
\ No newline at end of file
diff --git a/lib/shenyu/register/zookeeper/zk_proto.lua b/lib/shenyu/register/zookeeper/zk_proto.lua
new file mode 100644
index 0000000..2eacefe
--- /dev/null
+++ b/lib/shenyu/register/zookeeper/zk_proto.lua
@@ -0,0 +1,157 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an"AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local struct = require("shenyu.register.core.struct")
+local pack = struct.pack
+local unpack = struct.unpack
+local tbunpack = struct.tbunpack
+local strbyte = string.byte
+local strlen = string.len
+local strsub = string.sub
+
+local _M = {}
+
+local _base = {
+    new = function(self, o)
+        o = o or {}
+        setmetatable(o, self)
+        return o
+    end
+}
+
+local _request_header =
+    _base:new {
+    xid = 0,
+    type = 0,
+    pack = function(self)
+        return pack(">II", self.xid, self.type)
+    end
+}
+
+local _get_children_request =
+    _base:new {
+    path = "",
+    watch = 0,
+    pack = function(self)
+        local path_len = strlen(self.path)
+        return pack(">ic" .. path_len .. "b", path_len, self.path, strbyte(self.watch))
+    end
+}
+
+local _ping_request =
+    _base:new {
+    pack = function(self)
+        local stream = {}
+        return table.concat(stream)
+    end
+}
+
+local _connect_request =
+    _base:new {
+    protocol_version = 0,
+    last_zxid_seen = 0,
+    timeout = 0,
+    session_id = 0,
+    password = "",
+    pack = function(self)
+        return pack(">lilic16", 0, 0, 0, 0, "")
+    end
+}
+
+function _M.serialize(self, h, request)
+    local h_bytes = h:pack()
+    local r_bytes = request:pack()
+    local len = h_bytes:len() + r_bytes:len()
+    local len_bytes = pack(">i", len)
+    return len_bytes .. h_bytes .. r_bytes
+end
+
+_M.get_children_request = _get_children_request
+_M.request_header = _request_header
+_M.ping_request = _ping_request
+_M.connect_request = _connect_request
+
+--basics of response.
+local _reply_header =
+    _base:new {
+    xid = 0, -- int
+    zxid = 0, -- long
+    err = 0, -- int
+    unpack = function(self, bytes, start_index)
+        local vars, end_index = unpack(">ili", bytes, start_index)
+        self.xid, self.zxid, self.err = tbunpack(vars)
+        return self, end_index
+    end
+}
+
+local _connect_response =
+    _base:new {
+    proto_ver = 0,
+    timeout = 0,
+    session_id = 0,
+    password = "",
+    unpack = function(self, bytes, start_index)
+        local vars, end_index = unpack(">iilS", bytes, start_index)
+        self.proto_ver, self.timeout, self.session_id, self.password = tbunpack(vars)
+        return self, end_index
+    end
+}
+
+local function unpack_strings(str)
+    local size = strlen(str)
+    local pos = 0
+    local str_set = {}
+    local index = 1
+    while size > pos do
+        local vars = unpack(">i", strsub(str, 1 + pos, 4 + pos))
+        local len = tbunpack(vars)
+        vars = unpack(">c" .. len, strsub(str, 5 + pos, 5 + pos + len - 1))
+        local s = tbunpack(vars)
+        str_set[index] = s
+        index = index + 1
+        pos = pos + len + 4
+    end
+    return str_set
+end
+
+local _get_children_response =
+    _base:new {
+    paths = {},
+    unpack = function(self, bytes, start_index)
+        self.paths = unpack_strings(strsub(bytes, 21))
+        return self
+    end
+}
+
+local _watch_event =
+    _base:new {
+    type = 0,
+    state = 0,
+    paths = {},
+    unpack = function(self, bytes, start_index)
+        local vars = unpack(">ii", bytes, start_index)
+        self.type, self.state = tbunpack(vars)
+        self.paths = unpack_strings(strsub(bytes, 25))
+        return self
+    end
+}
+
+_M.reply_header = _reply_header
+_M.connect_response = _connect_response
+_M.get_children_response = _get_children_response
+_M.watch_event = _watch_event
+
+return _M
diff --git a/rockspec/shenyu-nginx-main-0.rockspec b/rockspec/shenyu-nginx-main-0.rockspec
index f6ef077..df19e0d 100644
--- a/rockspec/shenyu-nginx-main-0.rockspec
+++ b/rockspec/shenyu-nginx-main-0.rockspec
@@ -15,6 +15,7 @@ dependencies = {
     "lua-resty-balancer >= 0.04",
     "lua-resty-http >= 0.15",
     "lua-cjson = 2.1.0.6-1",
+    "stringy = 0.7-0",
 }
 
 build = {
@@ -23,5 +24,14 @@ build = {
       ["shenyu.register.etcd"] = "lib/shenyu/register/etcd.lua",
       ["shenyu.register.nacos"] = "lib/shenyu/register/nacos.lua",
       ["shenyu.register.balancer"] = "lib/shenyu/register/balancer.lua",
+      ["shenyu.register.zookeeper"] = "lib/shenyu/register/zookeeper.lua",
+      ["shenyu.register.zookeeper.connection"] = "lib/shenyu/register/zookeeper/connection.lua",
+      ["shenyu.register.zookeeper.zk_client"] = "lib/shenyu/register/zookeeper/zk_client.lua",
+      ["shenyu.register.zookeeper.zk_cluster"] = "lib/shenyu/register/zookeeper/zk_cluster.lua",
+      ["shenyu.register.zookeeper.zk_const"] = "lib/shenyu/register/zookeeper/zk_const.lua",
+      ["shenyu.register.zookeeper.zk_proto"] = "lib/shenyu/register/zookeeper/zk_proto.lua",
+      ["shenyu.register.core.string"] = "lib/shenyu/register/core/string.lua",
+      ["shenyu.register.core.struct"] = "lib/shenyu/register/core/struct.lua",
+      ["shenyu.register.core.utils"] = "lib/shenyu/register/core/utils.lua",
    }
 }