You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by sp...@apache.org on 2022/02/23 02:27:03 UTC

[apisix] branch master updated: feat: add kubernetes discovery module (#4880)

This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new f7b50f2  feat: add kubernetes discovery module (#4880)
f7b50f2 is described below

commit f7b50f28bae7ca6ed373fe7c1ea07e658a6bee3f
Author: zhixiongdu <ro...@libssl.com>
AuthorDate: Wed Feb 23 10:26:54 2022 +0800

    feat: add kubernetes discovery module (#4880)
    
    Co-authored-by: zhixiogndu <ad...@gmail.com>
    Co-authored-by: zhixiongdu <zh...@upchina.com>
---
 .github/workflows/kubernetes-ci.yml              |  90 +++
 Makefile                                         |   3 +-
 apisix/discovery/kubernetes/informer_factory.lua | 376 ++++++++++++
 apisix/discovery/kubernetes/init.lua             | 386 ++++++++++++
 apisix/discovery/kubernetes/schema.lua           | 140 +++++
 ci/kubernetes-ci.sh                              |  33 ++
 docs/zh/latest/config.json                       |   3 +-
 docs/zh/latest/discovery/kubernetes.md           | 120 ++++
 t_kubernetes/configs/account.yaml                |  44 ++
 t_kubernetes/configs/endpoint.yaml               |  58 ++
 t_kubernetes/configs/kind.yaml                   |  22 +
 t_kubernetes/discovery/kubernetes.t              | 714 +++++++++++++++++++++++
 12 files changed, 1987 insertions(+), 2 deletions(-)

diff --git a/.github/workflows/kubernetes-ci.yml b/.github/workflows/kubernetes-ci.yml
new file mode 100644
index 0000000..b622d34
--- /dev/null
+++ b/.github/workflows/kubernetes-ci.yml
@@ -0,0 +1,90 @@
+name: CI Kubernetes
+
+on:
+  push:
+    branches: [ master, 'release/**' ]
+    paths-ignore:
+      - 'docs/**'
+      - '**/*.md'
+  pull_request:
+    branches: [ master, 'release/**' ]
+    paths-ignore:
+      - 'docs/**'
+      - '**/*.md'
+
+concurrency:
+  group: ${{ github.workflow }}-${{ github.ref == 'refs/heads/master' && github.run_number || github.ref }}
+  cancel-in-progress: true
+
+jobs:
+  kubernetes-discovery:
+    strategy:
+      fail-fast: false
+      matrix:
+        platform:
+          - ubuntu-18.04
+        os_name:
+          - linux_openresty
+          - linux_openresty_1_17
+
+    runs-on: ${{ matrix.platform }}
+    timeout-minutes: 15
+    env:
+      SERVER_NAME: ${{ matrix.os_name }}
+      OPENRESTY_VERSION: default
+
+    steps:
+      - name: Check out code
+        uses: actions/checkout@v2.4.0
+        with:
+          submodules: recursive
+
+      - name: Extract branch name
+        if: ${{ startsWith(github.ref, 'refs/heads/release/') }}
+        id: branch_env
+        shell: bash
+        run: |
+          echo "##[set-output name=version;]$(echo ${GITHUB_REF##*/})"
+
+      - name: Setup kubernetes cluster
+        run: |
+          KIND_VERSION="v0.11.1"
+          KUBECTL_VERSION="v1.22.0"
+          curl -Lo ./kind "https://kind.sigs.k8s.io/dl/${KIND_VERSION}/kind-$(uname)-amd64"
+          curl -Lo ./kubectl "https://dl.k8s.io/release/${KUBECTL_VERSION}/bin/linux/amd64/kubectl"
+          chmod +x ./kind
+          chmod +x ./kubectl
+
+          ./kind create cluster --name apisix-test --config ./t_kubernetes/configs/kind.yaml
+
+          ./kubectl wait --for=condition=Ready nodes --all --timeout=180s
+
+          ./kubectl apply -f ./t_kubernetes/configs/account.yaml
+
+          ./kubectl apply -f ./t_kubernetes/configs/endpoint.yaml
+
+          KUBERNETES_CLIENT_TOKEN_CONTENT=$(./kubectl get secrets | grep apisix-test | awk '{system("./kubectl get secret -o jsonpath={.data.token} "$1" | base64 --decode")}')
+
+          KUBERNETES_CLIENT_TOKEN_DIR="/tmp/var/run/secrets/kubernetes.io/serviceaccount"
+
+          KUBERNETES_CLIENT_TOKEN_FILE=${KUBERNETES_CLIENT_TOKEN_DIR}/token
+
+          mkdir -p ${KUBERNETES_CLIENT_TOKEN_DIR}
+          echo -n "$KUBERNETES_CLIENT_TOKEN_CONTENT" > ${KUBERNETES_CLIENT_TOKEN_FILE}
+
+          echo 'KUBERNETES_SERVICE_HOST=127.0.0.1'
+          echo 'KUBERNETES_SERVICE_PORT=6443'
+          echo 'KUBERNETES_CLIENT_TOKEN='"${KUBERNETES_CLIENT_TOKEN_CONTENT}"
+          echo 'KUBERNETES_CLIENT_TOKEN_FILE='${KUBERNETES_CLIENT_TOKEN_FILE}
+
+          ./kubectl proxy -p 6445 &
+
+      - name: Linux Install
+        run: |
+          sudo apt install -y cpanminus build-essential libncurses5-dev libreadline-dev libssl-dev perl libpcre3 libpcre3-dev libldap2-dev
+          sudo cpanm --notest Test::Nginx >build.log 2>&1 || (cat build.log && exit 1)
+          sudo --preserve-env=OPENRESTY_VERSION ./ci/${{ matrix.os_name }}_runner.sh do_install
+
+      - name: Run test cases
+        run: |
+          ./ci/kubernetes-ci.sh run_case
diff --git a/Makefile b/Makefile
index e37c801..f7546bf 100644
--- a/Makefile
+++ b/Makefile
@@ -284,11 +284,12 @@ install: runtime
 
 	$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/discovery
 	$(ENV_INSTALL) apisix/discovery/*.lua $(ENV_INST_LUADIR)/apisix/discovery/
-	$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/discovery/{consul_kv,dns,eureka,nacos}
+	$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/discovery/{consul_kv,dns,eureka,nacos,kubernetes}
 	$(ENV_INSTALL) apisix/discovery/consul_kv/*.lua $(ENV_INST_LUADIR)/apisix/discovery/consul_kv
 	$(ENV_INSTALL) apisix/discovery/dns/*.lua $(ENV_INST_LUADIR)/apisix/discovery/dns
 	$(ENV_INSTALL) apisix/discovery/eureka/*.lua $(ENV_INST_LUADIR)/apisix/discovery/eureka
 	$(ENV_INSTALL) apisix/discovery/nacos/*.lua $(ENV_INST_LUADIR)/apisix/discovery/nacos
+	$(ENV_INSTALL) apisix/discovery/kubernetes/*.lua $(ENV_INST_LUADIR)/apisix/discovery/kubernetes
 
 	$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/http
 	$(ENV_INSTALL) apisix/http/*.lua $(ENV_INST_LUADIR)/apisix/http/
diff --git a/apisix/discovery/kubernetes/informer_factory.lua b/apisix/discovery/kubernetes/informer_factory.lua
new file mode 100644
index 0000000..8b50fc3
--- /dev/null
+++ b/apisix/discovery/kubernetes/informer_factory.lua
@@ -0,0 +1,376 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local ngx = ngx
+local ipairs = ipairs
+local string = string
+local math = math
+local type = type
+local core = require("apisix.core")
+local http = require("resty.http")
+
+local empty_table = {}
+
+local function list_query(informer)
+    local arguments = {
+        limit = informer.limit,
+    }
+
+    if informer.continue and informer.continue ~= "" then
+        arguments.continue = informer.continue
+    end
+
+    if informer.label_selector and informer.label_selector ~= "" then
+        arguments.labelSelector = informer.label_selector
+    end
+
+    if informer.field_selector and informer.field_selector ~= "" then
+        arguments.fieldSelector = informer.field_selector
+    end
+
+    return ngx.encode_args(arguments)
+end
+
+
+local function list(httpc, apiserver, informer)
+    local response, err = httpc:request({
+        path = informer.path,
+        query = list_query(informer),
+        headers = {
+            ["Host"] = apiserver.host .. ":" .. apiserver.port,
+            ["Authorization"] = "Bearer " .. apiserver.token,
+            ["Accept"] = "application/json",
+            ["Connection"] = "keep-alive"
+        }
+    })
+
+    core.log.info("--raw=", informer.path, "?", list_query(informer))
+
+    if not response then
+        return false, "RequestError", err or ""
+    end
+
+    if response.status ~= 200 then
+        return false, response.reason, response:read_body() or ""
+    end
+
+    local body, err = response:read_body()
+    if err then
+        return false, "ReadBodyError", err
+    end
+
+    local data = core.json.decode(body)
+    if not data or data.kind ~= informer.list_kind then
+        return false, "UnexpectedBody", body
+    end
+
+    informer.version = data.metadata.resourceVersion
+
+    if informer.on_added then
+        for _, item in ipairs(data.items or empty_table) do
+            informer:on_added(item, "list")
+        end
+    end
+
+    informer.continue = data.metadata.continue
+    if informer.continue and informer.continue ~= "" then
+        list(httpc, apiserver, informer)
+    end
+
+    return true
+end
+
+
+local function watch_query(informer)
+    local arguments = {
+        watch = "true",
+        allowWatchBookmarks = "true",
+        timeoutSeconds = informer.overtime,
+    }
+
+    if informer.version and informer.version ~= "" then
+        arguments.resourceVersion = informer.version
+    end
+
+    if informer.label_selector and informer.label_selector ~= "" then
+        arguments.labelSelector = informer.label_selector
+    end
+
+    if informer.field_selector and informer.field_selector ~= "" then
+        arguments.fieldSelector = informer.field_selector
+    end
+
+    return ngx.encode_args(arguments)
+end
+
+
+local function split_event (body, callback, ...)
+    local gmatch_iterator, err = ngx.re.gmatch(body, "{\"type\":.*}\n", "jao")
+    if not gmatch_iterator then
+        return false, nil, "GmatchError", err
+    end
+
+    local captures
+    local captured_size = 0
+    local ok, reason
+    while true do
+        captures, err = gmatch_iterator()
+
+        if err then
+            return false, nil, "GmatchError", err
+        end
+
+        if not captures then
+            break
+        end
+
+        captured_size = captured_size + #captures[0]
+
+        ok, reason, err = callback(captures[0], ...)
+        if not ok then
+            return false, nil, reason, err
+        end
+    end
+
+    local remainder_body
+    if captured_size == #body then
+        remainder_body = ""
+    elseif captured_size == 0 then
+        remainder_body = body
+    elseif captured_size < #body then
+        remainder_body = string.sub(body, captured_size + 1)
+    end
+
+    return true, remainder_body
+end
+
+
+local function dispatch_event(event_string, informer)
+    local event = core.json.decode(event_string)
+
+    if not event or not event.type or not event.object then
+        return false, "UnexpectedBody", event_string
+    end
+
+    local tp = event.type
+
+    if tp == "ERROR" then
+        if event.object.code == 410 then
+            return false, "ResourceGone", nil
+        end
+        return false, "UnexpectedBody", event_string
+    end
+
+    local object = event.object
+    informer.version = object.metadata.resourceVersion
+
+    if tp == "ADDED" then
+        if informer.on_added then
+            informer:on_added(object, "watch")
+        end
+    elseif tp == "DELETED" then
+        if informer.on_deleted then
+            informer:on_deleted(object)
+        end
+    elseif tp == "MODIFIED" then
+        if informer.on_modified then
+            informer:on_modified(object)
+        end
+        -- elseif type == "BOOKMARK" then
+        --    do nothing
+    end
+
+    return true
+end
+
+
+local function watch(httpc, apiserver, informer)
+    local watch_times = 8
+    for _ = 1, watch_times do
+        local watch_seconds = 1800 + math.random(9, 999)
+        informer.overtime = watch_seconds
+        local http_seconds = watch_seconds + 120
+        httpc:set_timeouts(2000, 3000, http_seconds * 1000)
+
+        local response, err = httpc:request({
+            path = informer.path,
+            query = watch_query(informer),
+            headers = {
+                ["Host"] = apiserver.host .. ":" .. apiserver.port,
+                ["Authorization"] = "Bearer " .. apiserver.token,
+                ["Accept"] = "application/json",
+                ["Connection"] = "keep-alive"
+            }
+        })
+
+        core.log.info("--raw=", informer.path, "?", watch_query(informer))
+
+        if err then
+            return false, "RequestError", err
+        end
+
+        if response.status ~= 200 then
+            return false, response.reason, response:read_body() or ""
+        end
+
+        local ok
+        local remainder_body
+        local body
+        local reason
+
+        while true do
+            body, err = response.body_reader()
+            if err then
+                return false, "ReadBodyError", err
+            end
+
+            if not body then
+                break
+            end
+
+            if remainder_body and #remainder_body > 0 then
+                body = remainder_body .. body
+            end
+
+            ok, remainder_body, reason, err = split_event(body, dispatch_event, informer)
+            if not ok then
+                if reason == "ResourceGone" then
+                    return true
+                end
+                return false, reason, err
+            end
+        end
+    end
+
+    return true
+end
+
+
+local function list_watch(informer, apiserver)
+    local ok
+    local reason, message
+    local httpc = http.new()
+
+    informer.fetch_state = "connecting"
+    core.log.info("begin to connect ", apiserver.host, ":", apiserver.port)
+
+    ok, message = httpc:connect({
+        scheme = apiserver.schema,
+        host = apiserver.host,
+        port = apiserver.port,
+        ssl_verify = false
+    })
+
+    if not ok then
+        informer.fetch_state = "connect failed"
+        core.log.error("connect apiserver failed, apiserver.host: ", apiserver.host,
+                ", apiserver.port: ", apiserver.port, ", message : ", message)
+        return false
+    end
+
+    core.log.info("begin to list ", informer.kind)
+    informer.fetch_state = "listing"
+    if informer.pre_List then
+        informer:pre_list()
+    end
+
+    ok, reason, message = list(httpc, apiserver, informer)
+    if not ok then
+        informer.fetch_state = "list failed"
+        core.log.error("list failed, kind: ", informer.kind,
+                ", reason: ", reason, ", message : ", message)
+        return false
+    end
+
+    informer.fetch_state = "list finished"
+    if informer.post_List then
+        informer:post_list()
+    end
+
+    core.log.info("begin to watch ", informer.kind)
+    informer.fetch_state = "watching"
+    ok, reason, message = watch(httpc, apiserver, informer)
+    if not ok then
+        informer.fetch_state = "watch failed"
+        core.log.error("watch failed, kind: ", informer.kind,
+                ", reason: ", reason, ", message : ", message)
+        return false
+    end
+
+    informer.fetch_state = "watch finished"
+
+    return true
+end
+
+local _M = {
+}
+
+function _M.new(group, version, kind, plural, namespace)
+    local tp
+    tp = type(group)
+    if tp ~= "nil" and tp ~= "string" then
+        return nil, "group should set to string or nil type but " .. tp
+    end
+
+    tp = type(namespace)
+    if tp ~= "nil" and tp ~= "string" then
+        return nil, "namespace should set to string or nil type but " .. tp
+    end
+
+    tp = type(version)
+    if tp ~= "string" or version == "" then
+        return nil, "version should set to non-empty string"
+    end
+
+    tp = type(kind)
+    if tp ~= "string" or kind == "" then
+        return nil, "kind should set to non-empty string"
+    end
+
+    tp = type(plural)
+    if tp ~= "string" or plural == "" then
+        return nil, "plural should set to non-empty string"
+    end
+
+    local path = ""
+    if group == nil or group == "" then
+        path = path .. "/api/" .. version
+    else
+        path = path .. "/apis/" .. group .. "/" .. version
+    end
+
+    if namespace and namespace ~= "" then
+        path = path .. "/namespace/" .. namespace
+    end
+    path = path .. "/" .. plural
+
+    return {
+        kind = kind,
+        list_kind = kind .. "List",
+        plural = plural,
+        path = path,
+        limit = 120,
+        label_selector = "",
+        field_selector = "",
+        overtime = "1800",
+        version = "",
+        continue = "",
+        list_watch = list_watch
+    }
+end
+
+return _M
diff --git a/apisix/discovery/kubernetes/init.lua b/apisix/discovery/kubernetes/init.lua
new file mode 100644
index 0000000..ba83588
--- /dev/null
+++ b/apisix/discovery/kubernetes/init.lua
@@ -0,0 +1,386 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local ngx = ngx
+local ipairs = ipairs
+local pairs = pairs
+local string = string
+local tonumber = tonumber
+local tostring = tostring
+local os = os
+local error = error
+local pcall = pcall
+local process = require("ngx.process")
+local core = require("apisix.core")
+local util = require("apisix.cli.util")
+local local_conf = require("apisix.core.config_local").local_conf()
+local informer_factory = require("apisix.discovery.kubernetes.informer_factory")
+
+local endpoint_dict
+local default_weight
+
+local endpoint_lrucache = core.lrucache.new({
+    ttl = 300,
+    count = 1024
+})
+
+local endpoint_buffer = {}
+local empty_table = {}
+
+local function sort_nodes_cmp(left, right)
+    if left.host ~= right.host then
+        return left.host < right.host
+    end
+
+    return left.port < right.port
+end
+
+
+local function on_endpoint_modified(informer, endpoint)
+    if informer.namespace_selector and
+            not informer:namespace_selector(endpoint.metadata.namespace) then
+        return
+    end
+
+    core.log.debug(core.json.delay_encode(endpoint))
+    core.table.clear(endpoint_buffer)
+
+    local subsets = endpoint.subsets
+    for _, subset in ipairs(subsets or empty_table) do
+        if subset.addresses then
+            local addresses = subset.addresses
+            for _, port in ipairs(subset.ports or empty_table) do
+                local port_name
+                if port.name then
+                    port_name = port.name
+                elseif port.targetPort then
+                    port_name = tostring(port.targetPort)
+                else
+                    port_name = tostring(port.port)
+                end
+
+                local nodes = endpoint_buffer[port_name]
+                if nodes == nil then
+                    nodes = core.table.new(0, #subsets * #addresses)
+                    endpoint_buffer[port_name] = nodes
+                end
+
+                for _, address in ipairs(subset.addresses) do
+                    core.table.insert(nodes, {
+                        host = address.ip,
+                        port = port.port,
+                        weight = default_weight
+                    })
+                end
+            end
+        end
+    end
+
+    for _, ports in pairs(endpoint_buffer) do
+        for _, nodes in pairs(ports) do
+            core.table.sort(nodes, sort_nodes_cmp)
+        end
+    end
+
+    local endpoint_key = endpoint.metadata.namespace .. "/" .. endpoint.metadata.name
+    local endpoint_content = core.json.encode(endpoint_buffer, true)
+    local endpoint_version = ngx.crc32_long(endpoint_content)
+
+    local _, err
+    _, err = endpoint_dict:safe_set(endpoint_key .. "#version", endpoint_version)
+    if err then
+        core.log.error("set endpoint version into discovery DICT failed, ", err)
+        return
+    end
+    _, err = endpoint_dict:safe_set(endpoint_key, endpoint_content)
+    if err then
+        core.log.error("set endpoint into discovery DICT failed, ", err)
+        endpoint_dict:delete(endpoint_key .. "#version")
+    end
+end
+
+
+local function on_endpoint_deleted(informer, endpoint)
+    if informer.namespace_selector and
+            not informer:namespace_selector(endpoint.metadata.namespace) then
+        return
+    end
+
+    core.log.debug(core.json.delay_encode(endpoint))
+    local endpoint_key = endpoint.metadata.namespace .. "/" .. endpoint.metadata.name
+    endpoint_dict:delete(endpoint_key .. "#version")
+    endpoint_dict:delete(endpoint_key)
+end
+
+
+local function pre_list(informer)
+    endpoint_dict:flush_all()
+end
+
+
+local function post_list(informer)
+    endpoint_dict:flush_expired()
+end
+
+
+local function setup_label_selector(conf, informer)
+    informer.label_selector = conf.label_selector
+end
+
+
+local function setup_namespace_selector(conf, informer)
+    local ns = conf.namespace_selector
+    if ns == nil then
+        informer.namespace_selector = nil
+        return
+    end
+
+    if ns.equal then
+        informer.field_selector = "metadata.namespace=" .. ns.equal
+        informer.namespace_selector = nil
+        return
+    end
+
+    if ns.not_equal then
+        informer.field_selector = "metadata.namespace!=" .. ns.not_equal
+        informer.namespace_selector = nil
+        return
+    end
+
+    if ns.match then
+        informer.namespace_selector = function(self, namespace)
+            local match = conf.namespace_selector.match
+            local m, err
+            for _, v in ipairs(match) do
+                m, err = ngx.re.match(namespace, v, "j")
+                if m and m[0] == namespace then
+                    return true
+                end
+                if err then
+                    core.log.error("ngx.re.match failed: ", err)
+                end
+            end
+            return false
+        end
+        return
+    end
+
+    if ns.not_match then
+        informer.namespace_selector = function(self, namespace)
+            local not_match = conf.namespace_selector.not_match
+            local m, err
+            for _, v in ipairs(not_match) do
+                m, err = ngx.re.match(namespace, v, "j")
+                if m and m[0] == namespace then
+                    return false
+                end
+                if err then
+                    return false
+                end
+            end
+            return true
+        end
+        return
+    end
+end
+
+
+local function read_env(key)
+    if #key > 3 then
+        local a, b = string.byte(key, 1, 2)
+        local c = string.byte(key, #key, #key)
+        -- '$', '{', '}' == 36,123,125
+        if a == 36 and b == 123 and c == 125 then
+            local env = string.sub(key, 3, #key - 1)
+            local value = os.getenv(env)
+            if not value then
+                return nil, "not found environment variable " .. env
+            end
+            return value, nil
+        end
+    end
+
+    return key
+end
+
+
+local function get_apiserver(conf)
+    local apiserver = {
+        schema = "",
+        host = "",
+        port = "",
+        token = ""
+    }
+
+    apiserver.schema = conf.service.schema
+    if apiserver.schema ~= "http" and apiserver.schema ~= "https" then
+        return nil, "service.schema should set to one of [http,https] but " .. apiserver.schema
+    end
+
+    local err
+    apiserver.host, err = read_env(conf.service.host)
+    if err then
+        return nil, err
+    end
+
+    if apiserver.host == "" then
+        return nil, "service.host should set to non-empty string"
+    end
+
+    local port
+    port, err = read_env(conf.service.port)
+    if err then
+        return nil, err
+    end
+
+    apiserver.port = tonumber(port)
+    if not apiserver.port or apiserver.port <= 0 or apiserver.port > 65535 then
+        return nil, "invalid port value: " .. apiserver.port
+    end
+
+    if conf.client.token then
+        apiserver.token, err = read_env(conf.client.token)
+        if err then
+            return nil, err
+        end
+    elseif conf.client.token_file and conf.client.token_file ~= "" then
+        local file
+        file, err = read_env(conf.client.token_file)
+        if err then
+            return nil, err
+        end
+
+        apiserver.token, err = util.read_file(file)
+        if err then
+            return nil, err
+        end
+    else
+        return nil, "one of [client.token,client.token_file] should be set but none"
+    end
+
+    if apiserver.schema == "https" and apiserver.token == "" then
+        return nil, "apiserver.token should set to non-empty string when service.schema is https"
+    end
+
+    return apiserver
+end
+
+
+local function create_endpoint_lrucache(endpoint_key, endpoint_port)
+    local endpoint_content = endpoint_dict:get_stale(endpoint_key)
+    if not endpoint_content then
+        core.log.error("get empty endpoint content from discovery DIC, this should not happen ",
+                endpoint_key)
+        return nil
+    end
+
+    local endpoint = core.json.decode(endpoint_content)
+    if not endpoint then
+        core.log.error("decode endpoint content failed, this should not happen, content: ",
+                endpoint_content)
+        return nil
+    end
+
+    return endpoint[endpoint_port]
+end
+
+local _M = {
+    version = "0.0.1"
+}
+
+function _M.nodes(service_name)
+    local pattern = "^(.*):(.*)$"  -- namespace/name:port_name
+    local match = ngx.re.match(service_name, pattern, "jo")
+    if not match then
+        core.log.info("get unexpected upstream service_name: ", service_name)
+        return nil
+    end
+
+    local endpoint_key = match[1]
+    local endpoint_port = match[2]
+    local endpoint_version = endpoint_dict:get_stale(endpoint_key .. "#version")
+    if not endpoint_version then
+        core.log.info("get empty endpoint version from discovery DICT ", endpoint_key)
+        return nil
+    end
+
+    return endpoint_lrucache(service_name, endpoint_version,
+            create_endpoint_lrucache, endpoint_key, endpoint_port)
+end
+
+
+function _M.init_worker()
+    -- TODO: maybe we can read dict name from discovery config
+    endpoint_dict = ngx.shared.discovery
+    if not endpoint_dict then
+        error("failed to get nginx shared dict: discovery, please check your APISIX version")
+    end
+
+    if process.type() ~= "privileged agent" then
+        return
+    end
+
+    local discovery_conf = local_conf.discovery.kubernetes
+
+    default_weight = discovery_conf.default_weight or 50
+
+    local apiserver, err = get_apiserver(discovery_conf)
+    if err then
+        error(err)
+        return
+    end
+
+    local endpoints_informer, err = informer_factory.new("", "v1",
+            "Endpoints", "endpoints", "")
+    if err then
+        error(err)
+        return
+    end
+
+    setup_namespace_selector(discovery_conf, endpoints_informer)
+    setup_label_selector(discovery_conf, endpoints_informer)
+
+    endpoints_informer.on_added = on_endpoint_modified
+    endpoints_informer.on_modified = on_endpoint_modified
+    endpoints_informer.on_deleted = on_endpoint_deleted
+    endpoints_informer.pre_list = pre_list
+    endpoints_informer.post_list = post_list
+
+    local timer_runner
+    timer_runner = function(premature)
+        if premature then
+            return
+        end
+
+        local ok, status = pcall(endpoints_informer.list_watch, endpoints_informer, apiserver)
+
+        local retry_interval = 0
+        if not ok then
+            core.log.error("list_watch failed, kind: ", endpoints_informer.kind,
+                    ", reason: ", "RuntimeException", ", message : ", status)
+            retry_interval = 40
+        elseif not status then
+            retry_interval = 40
+        end
+
+        ngx.timer.at(retry_interval, timer_runner)
+    end
+
+    ngx.timer.at(0, timer_runner)
+end
+
+return _M
diff --git a/apisix/discovery/kubernetes/schema.lua b/apisix/discovery/kubernetes/schema.lua
new file mode 100644
index 0000000..4888de6
--- /dev/null
+++ b/apisix/discovery/kubernetes/schema.lua
@@ -0,0 +1,140 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local host_patterns = {
+    { pattern = [[^\${[_A-Za-z]([_A-Za-z0-9]*[_A-Za-z])*}$]] },
+    { pattern = [[^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$]] },
+}
+
+local port_patterns = {
+    { pattern = [[^\${[_A-Za-z]([_A-Za-z0-9]*[_A-Za-z])*}$]] },
+    { pattern = [[^(([1-9]\d{0,3}|[1-5]\d{4}|6[0-4]\d{3}|65[0-4]\d{2}|655[0-2]\d|6553[0-5]))$]] },
+}
+
+local namespace_pattern = [[^[a-z0-9]([-a-z0-9_.]*[a-z0-9])?$]]
+local namespace_regex_pattern = [[^[\x21-\x7e]*$]]
+
+return {
+    type = "object",
+    properties = {
+        service = {
+            type = "object",
+            properties = {
+                schema = {
+                    type = "string",
+                    enum = { "http", "https" },
+                    default = "https",
+                },
+                host = {
+                    type = "string",
+                    default = "${KUBERNETES_SERVICE_HOST}",
+                    oneOf = host_patterns,
+                },
+                port = {
+                    type = "string",
+                    default = "${KUBERNETES_SERVICE_PORT}",
+                    oneOf = port_patterns,
+                },
+            },
+            default = {
+                schema = "https",
+                host = "${KUBERNETES_SERVICE_HOST}",
+                port = "${KUBERNETES_SERVICE_PORT}",
+            }
+        },
+        client = {
+            type = "object",
+            properties = {
+                token = {
+                    type = "string",
+                    oneOf = {
+                        { pattern = [[\${[_A-Za-z]([_A-Za-z0-9]*[_A-Za-z])*}$]] },
+                        { pattern = [[^[A-Za-z0-9+\/._=-]{0,4096}$]] },
+                    },
+                },
+                token_file = {
+                    type = "string",
+                    pattern = [[^[^\:*?"<>|]*$]],
+                    minLength = 1,
+                    maxLength = 500,
+                }
+            },
+            oneOf = {
+                { required = { "token" } },
+                { required = { "token_file" } },
+            },
+            default = {
+                token_file = "/var/run/secrets/kubernetes.io/serviceaccount/token"
+            }
+        },
+        default_weight = {
+            type = "integer",
+            default = 50,
+            minimum = 0,
+        },
+        namespace_selector = {
+            type = "object",
+            properties = {
+                equal = {
+                    type = "string",
+                    pattern = namespace_pattern,
+                },
+                not_equal = {
+                    type = "string",
+                    pattern = namespace_pattern,
+                },
+                match = {
+                    type = "array",
+                    items = {
+                        type = "string",
+                        pattern = namespace_regex_pattern
+                    },
+                    minItems = 1
+                },
+                not_match = {
+                    type = "array",
+                    items = {
+                        type = "string",
+                        pattern = namespace_regex_pattern
+                    },
+                    minItems = 1
+                },
+            },
+            oneOf = {
+                { required = { } },
+                { required = { "equal" } },
+                { required = { "not_equal" } },
+                { required = { "match" } },
+                { required = { "not_match" } }
+            },
+        },
+        label_selector = {
+            type = "string",
+        }
+    },
+    default = {
+        service = {
+            schema = "https",
+            host = "${KUBERNETES_SERVICE_HOST}",
+            port = "${KUBERNETES_SERVICE_PORT}",
+        },
+        client = {
+            token_file = "/var/run/secrets/kubernetes.io/serviceaccount/token"
+        },
+        default_weight = 50
+    }
+}
diff --git a/ci/kubernetes-ci.sh b/ci/kubernetes-ci.sh
new file mode 100755
index 0000000..839929b
--- /dev/null
+++ b/ci/kubernetes-ci.sh
@@ -0,0 +1,33 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+. ./ci/common.sh
+
+run_case() {
+    export_or_prefix
+    export PERL5LIB=.:$PERL5LIB
+    prove -Itest-nginx/lib -I./ -r t_kubernetes | tee test-result
+    rerun_flaky_tests test-result
+}
+
+case_opt=$1
+case $case_opt in
+    (run_case)
+        run_case
+        ;;
+esac
diff --git a/docs/zh/latest/config.json b/docs/zh/latest/config.json
index cf5a3a1..14e185c 100644
--- a/docs/zh/latest/config.json
+++ b/docs/zh/latest/config.json
@@ -189,7 +189,8 @@
             "discovery",
             "discovery/dns",
             "discovery/nacos",
-            "discovery/eureka"
+            "discovery/eureka",
+            "discovery/kubernetes"
           ]
         },
         {
diff --git a/docs/zh/latest/discovery/kubernetes.md b/docs/zh/latest/discovery/kubernetes.md
new file mode 100644
index 0000000..9ef14ff
--- /dev/null
+++ b/docs/zh/latest/discovery/kubernetes.md
@@ -0,0 +1,120 @@
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+# 基于 Kubernetes 的服务发现
+
+Kubernetes 服务发现插件以 ListWatch 方式监听 Kubernetes 集群 v1.endpoints 的实时变化,
+并将其值存储在 ngx.shared.dict 中, 同时遵循 APISIX Discovery 规范提供查询接口
+
+# Kubernetes 服务发现插件的配置
+
+Kubernetes 服务发现插件的样例配置如下:
+
+```yaml
+discovery:
+  kubernetes:
+    service:
+      # apiserver schema, options [http, https]
+      schema: https #default https
+
+      # apiserver host, options [ipv4, ipv6, domain, environment variable]
+      host: ${KUBERNETES_SERVICE_HOST} #default ${KUBERNETES_SERVICE_HOST}
+
+      # apiserver port, options [port number, environment variable]
+      port: ${KUBERNETES_SERVICE_PORT}  #default ${KUBERNETES_SERVICE_PORT}
+
+    client:
+      # serviceaccount token or token_file
+      token_file: /var/run/secrets/kubernetes.io/serviceaccount/token
+
+      #token: |-
+       # eyJhbGciOiJSUzI1NiIsImtpZCI6Ikx5ME1DNWdnbmhQNkZCNlZYMXBsT3pYU3BBS2swYzBPSkN3ZnBESGpkUEEif
+       # 6Ikx5ME1DNWdnbmhQNkZCNlZYMXBsT3pYU3BBS2swYzBPSkN3ZnBESGpkUEEifeyJhbGciOiJSUzI1NiIsImtpZCI
+
+    # kubernetes discovery plugin support use namespace_selector
+    # you can use one of [equal, not_equal, match, not_match] filter namespace
+    namespace_selector:
+      # only save endpoints with namespace equal default
+      equal: default
+
+      # only save endpoints with namespace not equal default
+      #not_equal: default
+
+      # only save endpoints with namespace match one of [default, ^my-[a-z]+$]
+      #match:
+       #- default
+       #- ^my-[a-z]+$
+
+      # only save endpoints with namespace not match one of [default, ^my-[a-z]+$]
+      #not_match:
+       #- default
+       #- ^my-[a-z]+$
+
+    # kubernetes discovery plugin support use label_selector
+    # for the expression of label_selector, please refer to https://kubernetes.io/docs/concepts/overview/working-with-objects/labels
+    label_selector: |-
+      first="a",second="b"
+```
+
+如果 Kubernetes 服务插件运行在 Pod 内, 你可以使用最简配置:
+
+```yaml
+discovery:
+  kubernetes: { }
+```
+
+如果 Kubernetes 服务插件运行在 Pod 外, 你需要新建或选取指定的 ServiceAccount, 获取其 Token 值, 并使用如下配置:
+
+```yaml
+discovery:
+  kubernetes:
+    service:
+      schema: https
+      host: # enter apiserver host value here
+      port: # enter apiServer port value here
+    client:
+      token: # enter serviceaccount token value here
+      #token_file: # enter file path here
+```
+
+# Kubernetes 服务发现插件的使用
+
+Kubernetes 服务发现插件提供与其他服务发现插件相同的查询接口 -> nodes(service_name) \
+service_name 的 pattern 如下:
+> _[namespace]/[name]:[portName]_
+
+如果 kubernetes Endpoint 没有定义 portName, Kubernetes 服务发现插件会依次使用 targetPort, port 代替
+
+# Q&A
+
+> Q: 为什么只支持配置 token 来访问 Kubernetes ApiServer \
+> A: 通常情况下,我们会使用三种方式与 Kubernetes ApiServer 通信 :
+>
+>+ mTLS
+>+ token
+>+ basic authentication
+>
+> 因为 lua-resty-http 目前不支持 mTLS, 以及 basic authentication 不被推荐使用,\
+> 所以当前只实现了 token 认证方式
+
+-------
+
+> Q: APISIX 是多进程模型, 是否意味着每个 APISIX 工作进程都会监听 Kubernetes v1.endpoints \
+> A: Kubernetes 服务发现插件只使用特权进程监听 Kubernetes v1.endpoints, 然后将结果存储\
+> 在 ngx.shared.dict 中, 业务进程是通过查询 ngx.shared.dict 来获取结果的
diff --git a/t_kubernetes/configs/account.yaml b/t_kubernetes/configs/account.yaml
new file mode 100644
index 0000000..da7cf01
--- /dev/null
+++ b/t_kubernetes/configs/account.yaml
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+kind: ServiceAccount
+apiVersion: v1
+metadata:
+  name: apisix-test
+  namespace: default
+---
+kind: ClusterRole
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+  name: apisix-test
+rules:
+  - apiGroups: [ "" ]
+    resources: [ endpoints ]
+    verbs: [ get,list,watch ]
+---
+apiVersion: rbac.authorization.k8s.io/v1
+kind: ClusterRoleBinding
+metadata:
+  name: apisix-test
+roleRef:
+  apiGroup: rbac.authorization.k8s.io
+  kind: ClusterRole
+  name: apisix-test
+subjects:
+  - kind: ServiceAccount
+    name: apisix-test
+    namespace: default
diff --git a/t_kubernetes/configs/endpoint.yaml b/t_kubernetes/configs/endpoint.yaml
new file mode 100644
index 0000000..885f825
--- /dev/null
+++ b/t_kubernetes/configs/endpoint.yaml
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+kind: Namespace
+apiVersion: v1
+metadata:
+  name: ns-a
+---
+
+kind: Endpoints
+apiVersion: v1
+metadata:
+  name: ep
+  namespace: ns-a
+subsets: [ ]
+---
+
+kind: Namespace
+apiVersion: v1
+metadata:
+  name: ns-b
+---
+
+kind: Endpoints
+apiVersion: v1
+metadata:
+  name: ep
+  namespace: ns-b
+subsets: [ ]
+---
+
+kind: Namespace
+apiVersion: v1
+metadata:
+  name: ns-c
+---
+
+kind: Endpoints
+apiVersion: v1
+metadata:
+  name: ep
+  namespace: ns-c
+subsets: [ ]
+---
diff --git a/t_kubernetes/configs/kind.yaml b/t_kubernetes/configs/kind.yaml
new file mode 100644
index 0000000..3db903f
--- /dev/null
+++ b/t_kubernetes/configs/kind.yaml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+kind: Cluster
+apiVersion: kind.x-k8s.io/v1alpha4
+networking:
+  apiServerAddress: 127.0.0.1
+  apiServerPort: 6443
diff --git a/t_kubernetes/discovery/kubernetes.t b/t_kubernetes/discovery/kubernetes.t
new file mode 100644
index 0000000..9d44e6a
--- /dev/null
+++ b/t_kubernetes/discovery/kubernetes.t
@@ -0,0 +1,714 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+BEGIN {
+    my $token_var_file = "/var/run/secrets/kubernetes.io/serviceaccount/token";
+    my $token_from_var = eval {`cat $token_var_file 2>/dev/null`};
+    if ($token_from_var) {
+
+        our $yaml_config = <<_EOC_;
+apisix:
+  node_listen: 1984
+  config_center: yaml
+  enable_admin: false
+discovery:
+  kubernetes: {}
+_EOC_
+        our $token_file = $token_var_file;
+        our $token_value = $token_from_var;
+
+    }
+
+    my $token_tmp_file = "/tmp/var/run/secrets/kubernetes.io/serviceaccount/token";
+    my $token_from_tmp = eval {`cat $token_tmp_file 2>/dev/null`};
+    if ($token_from_tmp) {
+
+        our $yaml_config = <<_EOC_;
+apisix:
+  node_listen: 1984
+  config_center: yaml
+  enable_admin: false
+discovery:
+  kubernetes:
+    client:
+      token_file: /tmp/var/run/secrets/kubernetes.io/serviceaccount/token
+_EOC_
+        our $token_file = $token_tmp_file;
+        our $token_value = $token_from_tmp;
+    }
+
+    our $scale_ns_c = <<_EOC_;
+[
+  {
+    "op": "replace_subsets",
+    "name": "ep",
+    "namespace": "ns-c",
+    "subsets": [
+      {
+        "addresses": [
+          {
+            "ip": "10.0.0.1"
+          }
+        ],
+        "ports": [
+          {
+            "name": "p1",
+            "port": 5001
+          }
+        ]
+      }
+    ]
+  }
+]
+_EOC_
+
+}
+
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+log_level('debug');
+no_root_location();
+no_shuffle();
+workers(4);
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    my $apisix_yaml = $block->apisix_yaml // <<_EOC_;
+routes: []
+#END
+_EOC_
+
+    $block->set_value("apisix_yaml", $apisix_yaml);
+
+    my $main_config = $block->main_config // <<_EOC_;
+env KUBERNETES_SERVICE_HOST=127.0.0.1;
+env KUBERNETES_SERVICE_PORT=6443;
+env KUBERNETES_CLIENT_TOKEN=$::token_value;
+env KUBERNETES_CLIENT_TOKEN_FILE=$::token_file;
+_EOC_
+
+    $block->set_value("main_config", $main_config);
+
+    my $config = $block->config // <<_EOC_;
+        location /queries {
+            content_by_lua_block {
+              local core = require("apisix.core")
+              local d = require("apisix.discovery.kubernetes")
+
+              ngx.sleep(1)
+
+              ngx.req.read_body()
+              local request_body = ngx.req.get_body_data()
+              local queries = core.json.decode(request_body)
+              local response_body = "{"
+              for _,query in ipairs(queries) do
+                local nodes = d.nodes(query)
+                if nodes==nil or #nodes==0 then
+                    response_body=response_body.." "..0
+                else
+                    response_body=response_body.." "..#nodes
+                end
+              end
+              ngx.say(response_body.." }")
+            }
+        }
+
+        location /operators {
+            content_by_lua_block {
+                local http = require("resty.http")
+                local core = require("apisix.core")
+                local ipairs = ipairs
+
+                ngx.req.read_body()
+                local request_body = ngx.req.get_body_data()
+                local operators = core.json.decode(request_body)
+
+                core.log.info("get body ", request_body)
+                core.log.info("get operators ", #operators)
+                for _, op in ipairs(operators) do
+                    local method, path, body
+                    local headers = {
+                        ["Host"] = "127.0.0.1:6445"
+                    }
+
+                    if op.op == "replace_subsets" then
+                        method = "PATCH"
+                        path = "/api/v1/namespaces/" .. op.namespace .. "/endpoints/" .. op.name
+                        if #op.subsets == 0 then
+                            body = '[{"path":"/subsets","op":"replace","value":[]}]'
+                        else
+                            local t = { { op = "replace", path = "/subsets", value = op.subsets } }
+                            body = core.json.encode(t, true)
+                        end
+                        headers["Content-Type"] = "application/json-patch+json"
+                    end
+
+                    if op.op == "replace_labels" then
+                        method = "PATCH"
+                        path = "/api/v1/namespaces/" .. op.namespace .. "/endpoints/" .. op.name
+                        local t = { { op = "replace", path = "/metadata/labels", value = op.labels } }
+                        body = core.json.encode(t, true)
+                        headers["Content-Type"] = "application/json-patch+json"
+                    end
+
+                    local httpc = http.new()
+                    core.log.info("begin to connect ", "127.0.0.1:6445")
+                    local ok, message = httpc:connect({
+                        scheme = "http",
+                        host = "127.0.0.1",
+                        port = 6445,
+                    })
+                    if not ok then
+                        core.log.error("connect 127.0.0.1:6445 failed, message : ", message)
+                        ngx.say("FAILED")
+                    end
+                    local res, err = httpc:request({
+                        method = method,
+                        path = path,
+                        headers = headers,
+                        body = body,
+                    })
+                    if err ~= nil then
+                        core.log.err("operator k8s cluster error: ", err)
+                        return 500
+                    end
+                    if res.status ~= 200 and res.status ~= 201 and res.status ~= 409 then
+                        return res.status
+                    end
+                end
+                ngx.say("DONE")
+            }
+        }
+
+_EOC_
+
+    $block->set_value("config", $config);
+});
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: create namespace and endpoints
+--- yaml_config eval: $::yaml_config
+--- request
+POST /operators
+[
+  {
+    "op": "replace_subsets",
+    "namespace": "ns-a",
+    "name": "ep",
+    "subsets": [
+      {
+        "addresses": [
+          {
+            "ip": "10.0.0.1"
+          },
+          {
+            "ip": "10.0.0.2"
+          }
+        ],
+        "ports": [
+          {
+            "name": "p1",
+            "port": 5001
+          }
+        ]
+      },
+      {
+        "addresses": [
+          {
+            "ip": "20.0.0.1"
+          },
+          {
+            "ip": "20.0.0.2"
+          }
+        ],
+        "ports": [
+          {
+            "name": "p2",
+            "port": 5002
+          }
+        ]
+      }
+    ]
+  },
+  {
+    "op": "create_namespace",
+    "name": "ns-b"
+  },
+  {
+    "op": "replace_subsets",
+    "namespace": "ns-b",
+    "name": "ep",
+    "subsets": [
+      {
+        "addresses": [
+          {
+            "ip": "10.0.0.1"
+          },
+          {
+            "ip": "10.0.0.2"
+          }
+        ],
+        "ports": [
+          {
+            "name": "p1",
+            "port": 5001
+          }
+        ]
+      },
+      {
+        "addresses": [
+          {
+            "ip": "20.0.0.1"
+          },
+          {
+            "ip": "20.0.0.2"
+          }
+        ],
+        "ports": [
+          {
+            "name": "p2",
+            "port": 5002
+          }
+        ]
+      }
+    ]
+  },
+  {
+    "op": "create_namespace",
+    "name": "ns-c"
+  },
+  {
+    "op": "replace_subsets",
+    "namespace": "ns-c",
+    "name": "ep",
+    "subsets": [
+      {
+        "addresses": [
+          {
+            "ip": "10.0.0.1"
+          },
+          {
+            "ip": "10.0.0.2"
+          }
+        ],
+        "ports": [
+          {
+            "port": 5001
+          }
+        ]
+      },
+      {
+        "addresses": [
+          {
+            "ip": "20.0.0.1"
+          },
+          {
+            "ip": "20.0.0.2"
+          }
+        ],
+        "ports": [
+          {
+            "port": 5002
+          }
+        ]
+      }
+    ]
+  }
+]
+--- more_headers
+Content-type: application/json
+--- error_code: 200
+--- no_error_log
+[error]
+
+
+
+=== TEST 2: use default parameters
+--- yaml_config eval: $::yaml_config
+--- request
+GET /queries
+["ns-a/ep:p1","ns-a/ep:p2","ns-b/ep:p1","ns-b/ep:p2","ns-c/ep:5001","ns-c/ep:5002"]
+--- more_headers
+Content-type: application/json
+--- response_body eval
+qr{ 2 2 2 2 2 2 }
+--- no_error_log
+[error]
+
+
+
+=== TEST 3: use specify parameters
+--- yaml_config
+apisix:
+  node_listen: 1984
+  config_center: yaml
+  enable_admin: false
+discovery:
+  kubernetes:
+    service:
+      host: "127.0.0.1"
+      port: "6443"
+    client:
+      token: "${KUBERNETES_CLIENT_TOKEN}"
+--- request
+GET /queries
+["ns-a/ep:p1","ns-a/ep:p2","ns-b/ep:p1","ns-b/ep:p2","ns-c/ep:5001","ns-c/ep:5002"]
+--- more_headers
+Content-type: application/json
+--- response_body eval
+qr{ 2 2 2 2 2 2 }
+--- no_error_log
+[error]
+
+
+
+=== TEST 4: use specify environment parameters
+--- yaml_config
+apisix:
+  node_listen: 1984
+  config_center: yaml
+  enable_admin: false
+discovery:
+  kubernetes:
+    service:
+      host: ${KUBERNETES_SERVICE_HOST}
+      port: ${KUBERNETES_SERVICE_PORT}
+    client:
+      token: ${KUBERNETES_CLIENT_TOKEN}
+--- request
+GET /queries
+["ns-a/ep:p1","ns-a/ep:p2","ns-b/ep:p1","ns-b/ep:p2","ns-c/ep:5001","ns-c/ep:5002"]
+--- more_headers
+Content-type: application/json
+--- response_body eval
+qr{ 2 2 2 2 2 2 }
+--- no_error_log
+[error]
+
+
+
+=== TEST 5: use token_file
+--- yaml_config
+apisix:
+  node_listen: 1984
+  config_center: yaml
+  enable_admin: false
+discovery:
+  kubernetes:
+    client:
+      token_file: ${KUBERNETES_CLIENT_TOKEN_FILE}
+--- request
+GET /queries
+["ns-a/ep:p1","ns-a/ep:p2","ns-b/ep:p1","ns-b/ep:p2","ns-c/ep:5001","ns-c/ep:5002"]
+--- more_headers
+Content-type: application/json
+--- response_body eval
+qr{ 2 2 2 2 2 2 }
+--- no_error_log
+[error]
+
+
+
+=== TEST 6: use http
+--- yaml_config
+apisix:
+  node_listen: 1984
+  config_center: yaml
+  enable_admin: false
+discovery:
+  kubernetes:
+    service:
+      schema: http
+      host: "127.0.0.1"
+      port: "6445"
+    client:
+      token: ""
+--- request
+GET /queries
+["ns-a/ep:p1","ns-a/ep:p2","ns-b/ep:p1","ns-b/ep:p2","ns-c/ep:5001","ns-c/ep:5002"]
+--- more_headers
+Content-type: application/json
+--- response_body eval
+qr{ 2 2 2 2 2 2 }
+--- no_error_log
+[error]
+
+
+
+=== TEST 7: use namespace selector equal
+--- yaml_config
+apisix:
+  node_listen: 1984
+  config_center: yaml
+  enable_admin: false
+discovery:
+  kubernetes:
+    client:
+      token_file: ${KUBERNETES_CLIENT_TOKEN_FILE}
+    namespace_selector:
+      equal: ns-a
+--- request
+GET /queries
+["ns-a/ep:p1","ns-a/ep:p2","ns-b/ep:p1","ns-b/ep:p2","ns-c/ep:5001","ns-c/ep:5002"]
+--- more_headers
+Content-type: application/json
+--- response_body eval
+qr{ 2 2 0 0 0 0 }
+--- no_error_log
+[error]
+
+
+
+=== TEST 8: use namespace selector not_equal
+--- yaml_config
+apisix:
+  node_listen: 1984
+  config_center: yaml
+  enable_admin: false
+discovery:
+  kubernetes:
+    client:
+      token_file: ${KUBERNETES_CLIENT_TOKEN_FILE}
+    namespace_selector:
+      not_equal: ns-a
+--- request
+GET /queries
+["ns-a/ep:p1","ns-a/ep:p2","ns-b/ep:p1","ns-b/ep:p2","ns-c/ep:5001","ns-c/ep:5002"]
+--- more_headers
+Content-type: application/json
+--- response_body eval
+qr{ 0 0 2 2 2 2 }
+--- no_error_log
+[error]
+
+
+
+=== TEST 9: use namespace selector match
+--- yaml_config
+apisix:
+  node_listen: 1984
+  config_center: yaml
+  enable_admin: false
+discovery:
+  kubernetes:
+    client:
+      token_file: ${KUBERNETES_CLIENT_TOKEN_FILE}
+    namespace_selector:
+      match: [ns-a,ns-b]
+--- request
+GET /queries
+["ns-a/ep:p1","ns-a/ep:p2","ns-b/ep:p1","ns-b/ep:p2","ns-c/ep:5001","ns-c/ep:5002"]
+--- more_headers
+Content-type: application/json
+--- response_body eval
+qr{ 2 2 2 2 0 0 }
+--- no_error_log
+[error]
+
+
+
+=== TEST 10: use namespace selector match with regex
+--- yaml_config
+apisix:
+  node_listen: 1984
+  config_center: yaml
+  enable_admin: false
+discovery:
+  kubernetes:
+    client:
+      token_file: ${KUBERNETES_CLIENT_TOKEN_FILE}
+    namespace_selector:
+      match: ["ns-[ab]"]
+--- request
+GET /queries
+["ns-a/ep:p1","ns-a/ep:p2","ns-b/ep:p1","ns-b/ep:p2","ns-c/ep:5001","ns-c/ep:5002"]
+--- more_headers
+Content-type: application/json
+--- response_body eval
+qr{ 2 2 2 2 0 0 }
+--- no_error_log
+[error]
+
+
+
+=== TEST 11: use namespace selector not_match
+--- yaml_config
+apisix:
+  node_listen: 1984
+  config_center: yaml
+  enable_admin: false
+discovery:
+  kubernetes:
+    client:
+      token_file: ${KUBERNETES_CLIENT_TOKEN_FILE}
+    namespace_selector:
+      not_match: ["ns-a"]
+--- request
+GET /queries
+["ns-a/ep:p1","ns-a/ep:p2","ns-b/ep:p1","ns-b/ep:p2","ns-c/ep:5001","ns-c/ep:5002"]
+--- more_headers
+Content-type: application/json
+--- response_body eval
+qr{ 0 0 2 2 2 2 }
+--- no_error_log
+[error]
+
+
+
+=== TEST 12: use namespace selector not_match with regex
+--- yaml_config
+apisix:
+  node_listen: 1984
+  config_center: yaml
+  enable_admin: false
+discovery:
+  kubernetes:
+    client:
+      token_file: ${KUBERNETES_CLIENT_TOKEN_FILE}
+    namespace_selector:
+      not_match: ["ns-[ab]"]
+--- request
+GET /queries
+["ns-a/ep:p1","ns-a/ep:p2","ns-b/ep:p1","ns-b/ep:p2","ns-c/ep:5001","ns-c/ep:5002"]
+--- more_headers
+Content-type: application/json
+--- response_body eval
+qr{ 0 0 0 0 2 2 }
+--- no_error_log
+[error]
+
+
+
+=== TEST 13: use label selector
+--- yaml_config
+apisix:
+  node_listen: 1984
+  config_center: yaml
+  enable_admin: false
+discovery:
+  kubernetes:
+    client:
+      token_file: ${KUBERNETES_CLIENT_TOKEN_FILE}
+    label_selector: |-
+       first=1,second
+--- request eval
+[
+
+"POST /operators
+[{\"op\":\"replace_labels\",\"name\":\"ep\",\"namespace\":\"ns-a\",\"labels\":{}}]",
+
+"POST /operators
+[{\"op\":\"replace_labels\",\"name\":\"ep\",\"namespace\":\"ns-b\",\"labels\":{}}]",
+
+"POST /operators
+[{\"op\":\"replace_labels\",\"name\":\"ep\",\"namespace\":\"ns-c\",\"labels\":{}}]",
+
+"GET /queries
+[\"ns-a/ep:p1\",\"ns-b/ep:p1\",\"ns-c/ep:5001\"]",
+
+"POST /operators
+[{\"op\":\"replace_labels\",\"name\":\"ep\",\"namespace\":\"ns-a\",\"labels\":{\"first\":\"1\" }}]",
+
+"GET /queries
+[\"ns-a/ep:p1\",\"ns-b/ep:p1\",\"ns-c/ep:5001\"]",
+
+"POST /operators
+[{\"op\":\"replace_labels\",\"name\":\"ep\",\"namespace\":\"ns-b\",\"labels\":{\"first\":\"1\",\"second\":\"o\" }}]",
+
+"GET /queries
+[\"ns-a/ep:p1\",\"ns-b/ep:p1\",\"ns-c/ep:5001\"]",
+
+"POST /operators
+[{\"op\":\"replace_labels\",\"name\":\"ep\",\"namespace\":\"ns-c\",\"labels\":{\"first\":\"2\",\"second\":\"o\" }}]",
+
+"GET /queries
+[\"ns-a/ep:p1\",\"ns-b/ep:p1\",\"ns-c/ep:5001\"]",
+
+"POST /operators
+[{\"op\":\"replace_labels\",\"name\":\"ep\",\"namespace\":\"ns-c\",\"labels\":{\"first\":\"1\" }}]",
+
+"GET /queries
+[\"ns-a/ep:p1\",\"ns-b/ep:p1\",\"ns-c/ep:5001\"]",
+
+"POST /operators
+[{\"op\":\"replace_labels\",\"name\":\"ep\",\"namespace\":\"ns-c\",\"labels\":{\"first\":\"1\",\"second\":\"o\" }}]",
+
+"GET /queries
+[\"ns-a/ep:p1\",\"ns-b/ep:p1\",\"ns-c/ep:5001\"]",
+
+]
+--- response_body eval
+[
+    "DONE\n",
+    "DONE\n",
+    "DONE\n",
+    "{ 0 0 0 }\n",
+    "DONE\n",
+    "{ 0 0 0 }\n",
+    "DONE\n",
+    "{ 0 2 0 }\n",
+    "DONE\n",
+    "{ 0 2 0 }\n",
+    "DONE\n",
+    "{ 0 2 0 }\n",
+    "DONE\n",
+    "{ 0 2 2 }\n",
+]
+--- no_error_log
+[error]
+
+
+
+=== TEST 15: scale endpoints
+--- yaml_config eval: $::yaml_config
+--- request eval
+[
+"GET /queries
+[\"ns-a/ep:p1\",\"ns-a/ep:p2\"]",
+
+"POST /operators
+[{\"op\":\"replace_subsets\",\"name\":\"ep\",\"namespace\":\"ns-a\",\"subsets\":[]}]",
+
+"GET /queries
+[\"ns-a/ep:p1\",\"ns-a/ep:p2\"]",
+
+"GET /queries
+[\"ns-c/ep:5001\",\"ns-c/ep:5002\",\"ns-c/ep:p1\"]",
+
+"POST /operators
+$::scale_ns_c",
+
+"GET /queries
+[\"ns-c/ep:5001\",\"ns-c/ep:5002\",\"ns-c/ep:p1\"]",
+
+]
+--- response_body eval
+[
+    "{ 2 2 }\n",
+    "DONE\n",
+    "{ 0 0 }\n",
+    "{ 2 2 0 }\n",
+    "DONE\n",
+    "{ 0 0 1 }\n",
+]
+--- no_error_log
+[error]