You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by GitBox <gi...@apache.org> on 2021/06/08 01:42:51 UTC

[GitHub] [apisix] adugeek opened a new issue #4388: Another implementation of ingress controller

adugeek opened a new issue #4388:
URL: https://github.com/apache/apisix/issues/4388


   Although there is an official implementation of Ingress Controller, it is not very practical in some scenarios.
   
   1: Small-scale cluster. If your cluster only has 1~2 nodes, you cannot provide a stable Etcd deployment environment. Even if you deploy it in a certain way, it will consume additional resources.
   
   2: The general private k8s cluster does not have a stable external storage service. Even if the number of nodes is sufficient, it is not convenient to deploy etcd
   
   3: An isolated multi-tenant cluster through the namespace. The appearance of the Ingress Controller as a cluster entry will break the isolation of tenants. Even creating a new Ingress Contoller Class for each tenant is not appropriate.
   
   Based on the above reasons, I designed a new implementation of Ingress Controller.
   The main points of implementation are as follows:
   
   1: Define CRD on the cluster
   2: Deploy apisix service in each namespace of the cluster as and only as the traffic entrance of that namespace
   3: Each apisix uses the privileged process list-watch k8s namespaced crd resources, and then writes it to conf/apisix.yaml
   4: Implement k8s discovery (list-watch k8s namespaced enpoints)
   
   This way
   No need for additional development language and framework intervention,
   No need for additional etcd, reduced data transfer process,
   It is more robust than the official implementation.
   
   The only implementation difficulty may be how to implement webhooks.
   If use client-go development, it is not convenient to verify config schema, plugin schema.
   If use the apisix plugin, it is not convenient to verify the uniqueness of service_id, upstream_id and route_id because of the nginx process model.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] tao12345666333 commented on issue #4388: Another implementation of ingress controller

Posted by GitBox <gi...@apache.org>.
tao12345666333 commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-887230920


   > The k8s endpoints discovery is very useful,I think it should be merge into apisix
   
   I agree. @adugeek would you like to add a `kubernetes` discovery plugin for Apache APISIX?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] adugeek edited a comment on issue #4388: Another implementation of ingress controller

Posted by GitBox <gi...@apache.org>.
adugeek edited a comment on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-857652609


   > I mean, too frequent endpoints change will make the disk io heavy.
   
   I understand your concerns.
   But what I mean is that changes in enpoints will be sensed by k8s discvery and written to ngx.shared.DICT. but not write to apisix.yaml .
   There is no disk io here.
   
   
   Talk is cheap. Let's look at the code
   
   ```lua
   local ipairs = ipairs
   local ngx = ngx
   local string = string
   local tonumber = tonumber
   local math = math
   local os = os
   local process = require("ngx.process")
   local core = require("apisix.core")
   local util = require("apisix.cli.util")
   local http = require("resty.http")
   local signal = require("resty.signal")
   local ngx_timer_at = ngx.timer.at
   local shared_endpoints = ngx.shared.discovery
   
   local apiserver_host = ""
   local apiserver_port = ""
   local namespace = ""
   local token = ""
   
   local default_weight = 50
   
   local lrucache = core.lrucache.new({
       ttl = 300,
       count = 1024
   })
   
   local cache_table = {}
   
   local function end_world(reason)
       core.log.emerg(reason)
       signal.kill(process.get_master_pid(), signal.signum("QUIT"))
   end
   
   local function sort_by_key_host(a, b)
       return a.host < b.host
   end
   
   local function on_endpoint_added(endpoint)
       local subsets = endpoint.subsets
       if subsets == nil or #subsets == 0 then
           return
       end
   
       local subset = subsets[1]
   
       local addresses = subset.addresses
       if addresses == nil or #addresses == 0 then
           return
       end
   
       local ports = subset.ports
       if ports == nil or #ports == 0 then
           return
       end
   
       core.table.clear(cache_table)
       for _, port in ipairs(ports) do
           local nodes = core.table.new(#addresses, 0)
           for i, address in ipairs(addresses) do
               nodes[i] = {
                   host = address.ip,
                   port = port.port,
                   weight = default_weight
               }
           end
           core.table.sort(nodes, sort_by_key_host)
           cache_table[port.name] = nodes
       end
   
       local _, err
       _, err = shared_endpoints:safe_set(endpoint.metadata.name .. "#version", endpoint.metadata.resourceVersion)
       if err then
           core.log.emerg("set endpoint version into discovery DICT failed ,", err)
       end
   
       shared_endpoints:safe_set(endpoint.metadata.name, core.json.encode(cache_table, true))
       if err then
           core.log.emerg("set endpoint into discovery DICT failed ,", err)
       end
   end
   
   local function on_endpoint_deleted(endpoint)
       shared_endpoints:deleted(endpoint.metadata.name .. "#version")
       shared_endpoints:delete(endpoint.metadata.name)
   end
   
   local function on_endpoint_modified(endpoint)
       local subsets = endpoint.subsets
       if subsets == nil or #subsets == 0 then
           return on_endpoint_deleted(endpoint)
       end
   
       local subset = subsets[1]
   
       local addresses = subset.addresses
       if addresses == nil or #addresses == 0 then
           return on_endpoint_deleted(endpoint)
       end
   
       local ports = subset.ports
       if ports == nil or #ports == 0 then
           return on_endpoint_deleted(endpoint)
       end
   
       core.table.clear(cache_table)
       for _, port in ipairs(ports) do
           local nodes = core.table.new(#addresses, 0)
           for i, address in ipairs(addresses) do
               nodes[i] = {
                   host = address.ip,
                   port = port.port,
                   weight = default_weight
               }
           end
           core.table.sort(nodes, sort_by_key_host)
           cache_table[port.name] = nodes
       end
   
       local _, err
       _, err = shared_endpoints:safe_set(endpoint.metadata.name .. "#version", endpoint.metadata.resourceVersion)
       if err then
           core.log.emerg("set endpoints version into discovery DICT failed ,", err)
       end
   
       shared_endpoints:safe_set(endpoint.metadata.name, core.json.encode(cache_table, true))
       if err then
           core.log.emerg("set endpoints into discovery DICT failed ,", err)
       end
   end
   
   local endpoint_resource = {
       version = "v1",
       kind = "Endpoints",
       listKind = "EndpointsList",
       plural = "endpoints",
       max_resource_version = 0,
       list_path = function(self)
           return string.format("/api/v1/namespaces/%s/endpoints", namespace)
       end,
   
       list_query = function(self, continue)
           if continue == nil or continue == "" then
               return "limit=45"
           else
               return "limit=45&continue=" .. continue
           end
       end,
   
       watch_path = function(self)
           return string.format("/api/v1/namespaces/%s/endpoints", namespace)
       end,
       watch_query = function(self, timeout)
           return string.format("watch=1&allowWatchBookmarks=true&timeoutSeconds=%d&resourceVersion=%d", timeout,
                      self.max_resource_version)
       end,
       pre_list_callback = function(self)
           self.max_resource_version = 0
           shared_endpoints:flush_all()
       end,
       post_list_callback = function(self)
           shared_endpoints:flush_expired()
       end,
       added_callback = function(self, object, drive)
           on_endpoint_added(object)
       end,
       modified_callback = function(self, object)
           on_endpoint_modified(object)
       end,
       deleted_callback = function(self, object)
           on_endpoint_deleted(object)
       end
   }
   
   local function event_dispatch(resource, event, object, drive)
       if drive == "watch" then
           local resource_version = object.metadata.resourceVersion
           local rvv = tonumber(resource_version)
           if rvv <= resource.max_resource_version then
               return
           end
           resource.max_resource_version = rvv
       end
   
       if event == "ADDED" then
           resource:added_callback(object, drive)
       elseif event == "MODIFIED" then
           if object.deletionTimestamp ~= nil then
               resource:deleted_callback(object)
           else
               resource:modified_callback(object)
           end
       elseif event == "DELETED" then
           resource:deleted_callback(object)
       elseif event == "BOOKMARK" then
           -- do nothing because we had record max_resource_version to resource.max_resource_version
       end
   end
   
   local function list_resource(httpc, resource, continue)
       httpc:set_timeouts(2000, 2000, 3000)
       local res, err = httpc:request({
           path = resource:list_path(),
           query = resource:list_query(),
           headers = {
               ["Authorization"] = string.format("Bearer %s", token)
           }
       })
   
       if not res then
           return false, "RequestError", err or ""
       end
   
       if res.status ~= 200 then
           return false, res.reason, res.read_body() or ""
       end
   
       local body, err = res:read_body()
       if err then
           return false, "ReadBodyError", err
       end
   
       local data, _ = core.json.decode(body)
       if not data or data.kind ~= resource.listKind then
           return false, "UnexpectedBody", body
       end
   
       local resource_version = data.metadata.resourceVersion
       resource.max_resource_version = tonumber(resource_version)
   
       for _, item in ipairs(data.items) do
           event_dispatch(resource, "ADDED", item, "list")
       end
   
       if data.metadata.continue ~= nil and data.metadata.continue ~= "" then
           list_resource(httpc, resource, data.metadata.continue)
       end
   
       return true, "Success", ""
   end
   
   local function watch_resource(httpc, resource)
       math.randomseed(process.get_master_pid())
       local watch_seconds = 1800 + math.random(60, 1200)
       local allowance_seconds = 120
       httpc:set_timeouts(2000, 3000, (watch_seconds + allowance_seconds) * 1000)
       local res, err = httpc:request({
           path = resource:watch_path(),
           query = resource:watch_query(watch_seconds),
           headers = {
               ["Authorization"] = string.format("Bearer %s", token)
           }
       })
   
       if err then
           return false, "RequestError", err
       end
   
       if res.status ~= 200 then
           return false, res.reason, res.read_body and res.read_body()
       end
   
       local remaindBody = ""
       local body = ""
       local reader = res.body_reader
       local gmatchIterator;
       local captures;
       local capturedSize = 0
       while true do
   
           body, err = reader()
           if err then
               return false, "ReadBodyError", err
           end
   
           if not body then
               break
           end
   
           if #remaindBody ~= 0 then
               body = remaindBody .. body
           end
   
           gmatchIterator, err = ngx.re.gmatch(body, "{\"type\":.*}\n", "jiao")
           if not gmatchIterator then
               return false, "GmatchError", err
           end
   
           while true do
               captures, err = gmatchIterator()
               if err then
                   return false, "GmatchError", err
               end
               if not captures then
                   break
               end
               capturedSize = capturedSize + #captures[0]
               local v, _ = core.json.decode(captures[0])
               if not v or not v.object or v.object.kind ~= resource.kind then
                   return false, "UnexpectedBody", captures[0]
               end
               event_dispatch(resource, v.type, v.object, "watch")
           end
   
           if capturedSize == #body then
               remaindBody = ""
           elseif capturedSize == 0 then
               remaindBody = body
           else
               remaindBody = string.sub(body, capturedSize + 1)
           end
       end
       watch_resource(httpc, resource)
   end
   
   local function fetch_resource(resource)
       while true do
           local ok = false
           local reason, message = "", ""
           local intervalTime = 0
           repeat
               local httpc = http.new()
               resource.watch_state = "connecting"
               core.log.info("begin to connect ", resource.plural)
               ok, message = httpc:connect({
                   scheme = "https",
                   host = apiserver_host,
                   port = tonumber(apiserver_port),
                   ssl_verify = false
               })
               if not ok then
                   resource.watch_state = "connecting"
                   core.log.error("connect apiserver failed , apiserver_host: ", apiserver_host, "apiserver_port",
                       apiserver_port, "message : ", message)
                   intervalTime = 200
                   break
               end
   
               core.log.info("begin to list ", resource.plural)
               resource.watch_state = "listing"
               resource:pre_list_callback()
               ok, reason, message = list_resource(httpc, resource)
               if not ok then
                   resource.watch_state = "listing failed"
                   core.log.error("list failed , resource: ", resource.plural, " reason: ", reason, "message : ", message)
                   intervalTime = 200
                   break
               end
               resource.watch_state = "list finished"
               resource:post_list_callback()
   
               core.log.info("begin to watch ", resource.plural)
               resource.watch_state = "watching"
               ok, reason, message = watch_resource(httpc, resource)
               if not ok then
                   resource.watch_state = "watch failed"
                   core.log.error("watch failed, resource: ", resource.plural, " reason: ", reason, "message : ", message)
                   intervalTime = 100
                   break
               end
               resource.watch_state = "watch finished"
               intervalTime = 0
           until true
           ngx.sleep(intervalTime)
       end
   end
   
   local function create_lrucache(service_name, port_name)
       local endpoint, _, _ = shared_endpoints:get_stale(service_name)
       if not endpoint then
           core.log.error("get emppty endpoint from discovery DICT,this should not happen ", service_name)
           return nil
       end
   
       local t, _ = core.json.decode(endpoint)
       if not t then
           core.log.error("json decode endpoint failed, this should not happen, content : ", endpoint)
       end
       return t[port_name]
   end
   
   local _M = {
       version = 0.01
   }
   
   function _M.nodes(service_name)
       local pattern = "([a-z][a-z0-9-.]{0,62})[:]([a-z][a-z0-9-.]{0,62})$"
       local match, _ = ngx.re.match(service_name, pattern, "jiao")
       if not match then
           core.log.info("get unexpected upstream service_name: ", service_name)
           return nil
       end
       local k8s_service_name = match[1]
       local k8s_port_name = match[2]
       local version, _, err = shared_endpoints:get_stale(k8s_service_name .. "#version")
       if not version then
           core.log.info("get emppty endpoint version from discovery DICT ", k8s_service_name)
           return nil
       end
       return lrucache(service_name, version, create_lrucache, k8s_service_name, k8s_port_name)
   end
   
   function _M.init_worker()
       if process.type() ~= "privileg,med agent" then
           return
       end
   
       local err
       namespace, err = util.read_file("/home/adugeek/Temp/namespace")
       if not namespace or namespace == "" then
           end_world("get empty namespace value " .. (err or ""))
           return
       end
   
       token, err = util.read_file("/home/adugeek/Temp/token")
       if not token or token == "" then
           end_world("get empty token value " .. (err or ""))
           return
       end
   
       apiserver_host = os.getenv("KUBERNETES_SERVICE_HOST")
       if not apiserver_host or apiserver_host == "" then
           end_world("get empty KUBERNETES_SERVICE_HOST value")
       end
   
       apiserver_port = os.getenv("KUBERNETES_SERVICE_PORT")
       if not apiserver_port or apiserver_port == "" then
           end_world("get empty KUBERNETES_SERVICE_PORT value")
       end
   
       ngx_timer_at(0, fetch_resource, endpoint_resource)
   end
   
   return _M
   
   
   ```
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] adugeek edited a comment on issue #4388: Another implementation of ingress controller

Posted by GitBox <gi...@apache.org>.
adugeek edited a comment on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-857652609


   > I mean, too frequent endpoints change will make the disk io heavy.
   
   I understand your concerns.
   But what I mean is that changes in enpoints will be sensed by k8s discvery and written to ngx.shared.DICT. but not write to apisix.yaml .
   There is no disk io here.
   
   
   Talk is cheap. Let's look at the code
   
   ```lua
   local ipairs = ipairs
   local ngx = ngx
   local string = string
   local tonumber = tonumber
   local math = math
   local ngx_timer_at = ngx.timer.at
   local process = require("ngx.process")
   local core = require("apisix.core")
   local util = require("apisix.cli.util")
   local http = require("resty.http")
   local shared_endpoints = ngx.shared.discovery
   local signal = require("resty.signal")
   local os = os
   
   local apiserver_host = ""
   local apiserver_port = ""
   local namespace = ""
   local token = ""
   
   local default_weight = 50
   
   local lrucache = core.lrucache.new({
       ttl = 300,
       count = 1024
   })
   
   local cache_table = {}
   
   local function end_world(reason)
       core.log.emerg(reason)
       signal.kill(process.get_master_pid(), signal.signum("QUIT"))
   end
   
   local function sort_by_key_host(a, b)
       return a.host < b.host
   end
   
   local function on_endpoint_added(endpoint)
       local subsets = endpoint.subsets
       if subsets == nil or #subsets == 0 then
           return
       end
   
       local subset = subsets[1]
   
       local addresses = subset.addresses
       if addresses == nil or #addresses == 0 then
           return
       end
   
       local ports = subset.ports
       if ports == nil or #ports == 0 then
           return
       end
   
       core.table.clear(cache_table)
       for _, port in ipairs(ports) do
           local nodes = core.table.new(#addresses, 0)
           for i, address in ipairs(addresses) do
               nodes[i] = {
                   host = address.ip,
                   port = port.port,
                   weight = default_weight
               }
           end
           core.table.sort(nodes, sort_by_key_host)
           cache_table[port.name] = nodes
       end
   
       local _, err
       _, err = shared_endpoints:safe_set(endpoint.metadata.name .. "#version", endpoint.metadata.resourceVersion)
       if err then
           core.log.emerg("set endpoints version into discovery DICT field ,", err)
       end
   
       shared_endpoints:safe_set(endpoint.metadata.name, core.json.encode(cache_table, true))
       if err then
           core.log.emerg("set endpoints into discovery DICT field ,", err)
       end
   end
   
   local function on_endpoint_deleted(endpoint)
       shared_endpoints:deleted(endpoint.metadata.name .. "#version")
       shared_endpoints:delete(endpoint.metadata.name)
   end
   
   local function on_endpoint_modified(endpoint)
       local subsets = endpoint.subsets
       if subsets == nil or #subsets == 0 then
           return on_endpoint_deleted(endpoint)
       end
   
       local subset = subsets[1]
   
       local addresses = subset.addresses
       if addresses == nil or #addresses == 0 then
           return on_endpoint_deleted(endpoint)
       end
   
       local ports = subset.ports
       if ports == nil or #ports == 0 then
           return on_endpoint_deleted(endpoint)
       end
   
       core.table.clear(cache_table)
       for _, port in ipairs(ports) do
           local nodes = core.table.new(#addresses, 0)
           for i, address in ipairs(addresses) do
               nodes[i] = {
                   host = address.ip,
                   port = port.port,
                   weight = default_weight
               }
           end
           core.table.sort(nodes, sort_by_key_host)
           cache_table[port.name] = nodes
       end
   
       local _, err
       _, err = shared_endpoints:safe_set(endpoint.metadata.name .. "#version", endpoint.metadata.resourceVersion)
       if err then
           core.log.emerg("set endpoints version into discovery DICT field ,", err)
       end
   
       shared_endpoints:safe_set(endpoint.metadata.name, core.json.encode(cache_table, true))
       if err then
           core.log.emerg("set endpoints into discovery DICT field ,", err)
       end
   end
   
   local endpoint_resource = {
       version = "v1",
       kind = "Endpoints",
       listKind = "EndpointsList",
       plural = "endpoints",
       max_resource_version = 0,
       list_path = function(self)
           return string.format("/api/v1/namespaces/%s/endpoints", namespace)
       end,
   
       list_query = function(self, continue)
           if continue == nil or continue == "" then
               return "limit=45"
           else
               return "limit=45&continue=" .. continue
           end
       end,
   
       watch_path = function(self)
           return string.format("/api/v1/namespaces/%s/endpoints", namespace)
       end,
       watch_query = function(self, timeout)
           return string.format("watch=1&allowWatchBookmarks=true&timeoutSeconds=%d&resourceVersion=%d", timeout,
                      self.max_resource_version)
       end,
       pre_list_callback = function(self)
           self.max_resource_version = 0
           shared_endpoints:flush_all()
       end,
       post_list_callback = function(self)
           shared_endpoints:flush_expired()
       end,
       added_callback = function(self, object, drive)
           on_endpoint_added(object)
       end,
       modified_callback = function(self, object)
           on_endpoint_modified(object)
       end,
       deleted_callback = function(self, object)
           on_endpoint_deleted(object)
       end
   }
   
   local function event_dispatch(resource, event, object, drive)
       local rvstr = object.metadata.resourceVersion
       if rvstr ~= nil then
           local rv = tonumber(rvstr)
           if resource.max_resource_version < rv then
               resource.max_resource_version = rv
           end
       end
   
       if event == "ADDED" then
           resource:added_callback(object, drive)
       elseif event == "MODIFIED" then
           if object.deletionTimestamp ~= nil then
               resource:deleted_callback(object)
           else
               resource:modified_callback(object)
           end
       elseif event == "DELETED" then
           resource:deleted_callback(object)
       elseif event == "BOOKMARK" then
           -- do nothing because we had record max_resource_version to resource.max_resource_version
       end
   end
   
   local function list_resource(httpc, resource, continue)
       httpc:set_timeouts(2000, 2000, 3000)
       local res, err = httpc:request({
           path = resource:list_path(),
           query = resource:list_query(),
           headers = {
               ["Authorization"] = string.format("Bearer %s", token)
           }
       })
   
       if not res then
           return false, "RequestError", err or ""
       end
   
       if res.status ~= 200 then
           return false, res.reason, res.read_body() or ""
       end
   
       local body, err = res:read_body()
       if err then
           return false, "ReadBodyError", err
       end
   
       local data, _ = core.json.decode(body)
       if not data or data.kind ~= resource.listKind then
           return false, "UnexpectedBody", body
       end
   
       local resource_version = data.metadata.resourceVersion
       if resource_version ~= nil then
           local rvv = tonumber(resource_version)
           if rvv <= resource.max_resource_version then
               return
           end
           resource.max_resource_version = rvv
       end
   
       for _, item in ipairs(data.items) do
           event_dispatch(resource, "ADDED", item, "list")
       end
   
       if data.metadata.continue ~= nil and data.metadata.continue ~= "" then
           list_resource(httpc, resource, data.metadata.continue)
       end
   
       return true, "Success", ""
   end
   
   local function watch_resource(httpc, resource)
       math.randomseed(process.get_master_pid())
       local watch_seconds = 1800 + math.random(60, 1200)
       local allowance_seconds = 120
       httpc:set_timeouts(2000, 3000, (watch_seconds + allowance_seconds) * 1000)
       local res, err = httpc:request({
           path = resource:watch_path(),
           query = resource:watch_query(watch_seconds),
           headers = {
               ["Authorization"] = string.format("Bearer %s", token)
           }
       })
   
       if err then
           return false, "RequestError", err
       end
   
       if res.status ~= 200 then
           return false, res.reason, res.read_body and res.read_body()
       end
   
       local remaindBody = ""
       local body = ""
       local reader = res.body_reader
       local gmatchIterator;
       local captures;
       local capturedSize = 0
       while true do
   
           body, err = reader()
           if err then
               return false, "ReadBodyError", err
           end
   
           if not body then
               break
           end
   
           if #remaindBody ~= 0 then
               body = remaindBody .. body
           end
   
           gmatchIterator, err = ngx.re.gmatch(body, "{\"type\":.*}\n", "jiao")
           if not gmatchIterator then
               return false, "GmatchError", err
           end
   
           while true do
               captures, err = gmatchIterator()
               if err then
                   return false, "GmatchError", err
               end
               if not captures then
                   break
               end
               capturedSize = capturedSize + #captures[0]
               local v, _ = core.json.decode(captures[0])
               if not v or not v.object or v.object.kind ~= resource.kind then
                   return false, "UnexpectedBody", captures[0]
               end
               event_dispatch(resource, v.type, v.object, "watch")
           end
   
           if capturedSize == #body then
               remaindBody = ""
           elseif capturedSize == 0 then
               remaindBody = body
           else
               remaindBody = string.sub(body, capturedSize + 1)
           end
       end
       watch_resource(httpc, resource)
   end
   
   local function fetch_resource(resource)
       while true do
           local ok = false
           local reason, message = "", ""
           local intervalTime = 0
           repeat
               local httpc = http.new()
               resource.watch_state = "connecting"
               core.log.info("begin to connect ", resource.plural)
               ok, message = httpc:connect({
                   scheme = "https",
                   host = apiserver_host,
                   port = tonumber(apiserver_port),
                   ssl_verify = false
               })
               if not ok then
                   resource.watch_state = "connecting"
                   core.log.error("connect apiserver failed , apiserver_host: ", apiserver_host, "apiserver_port",
                       apiserver_port, "message : ", message)
                   intervalTime = 200
                   break
               end
   
               core.log.info("begin to list ", resource.plural)
               resource.watch_state = "listing"
               resource:pre_list_callback()
               ok, reason, message = list_resource(httpc, resource)
               if not ok then
                   resource.watch_state = "listing failed"
                   core.log.error("list failed , resource: ", resource.plural, " reason: ", reason, "message : ", message)
                   intervalTime = 200
                   break
               end
               resource.watch_state = "list finished"
               resource:post_list_callback()
   
               core.log.info("begin to watch ", resource.plural)
               resource.watch_state = "watching"
               ok, reason, message = watch_resource(httpc, resource)
               if not ok then
                   resource.watch_state = "watch failed"
                   core.log.error("watch failed, resource: ", resource.plural, " reason: ", reason, "message : ", message)
                   intervalTime = 100
                   break
               end
               resource.watch_state = "watch finished"
               intervalTime = 0
           until true
           ngx.sleep(intervalTime)
       end
   end
   
   local function create_lrucache(service_name, port_name)
       local endpoint, _, _ = shared_endpoints:get_stale(service_name)
       if not endpoint then
           core.log.error("get emppty endpoint from discovery DICT,this should not happen ", service_name)
           return nil
       end
   
       local t, _ = core.json.decode(endpoint)
       if not t then
           core.log.error("json decode endpoint failed, this should not happen, content : ", endpoint)
       end
       return t[port_name]
   end
   
   local _M = {
       version = 0.01
   }
   
   function _M.nodes(service_name)
       local pattern = "([a-z][a-z0-9-.]{0,62})[:]([a-z][a-z0-9-.]{0,62})$"
       local match, _ = ngx.re.match(service_name, pattern, "jiao")
       if not match then
           core.log.info("get unexpected upstream service_name: ", service_name)
           return nil
       end
       local k8s_service_name = match[1]
       local k8s_port_name = match[2]
       local version, _, err = shared_endpoints:get_stale(k8s_service_name .. "#version")
       if not version then
           core.log.info("get emppty endpoint version from discovery DICT ", k8s_service_name)
           return nil
       end
       return lrucache(service_name, version, create_lrucache, k8s_service_name, k8s_port_name)
   end
   
   function _M.init_worker()
       if process.type() ~= "privileg,med agent" then
           return
       end
   
       local err
       namespace, err = util.read_file("/home/adugeek/Temp/namespace")
       if not namespace or namespace == "" then
           end_world("get empty namespace value " .. (err or ""))
           return
       end
   
       token, err = util.read_file("/home/adugeek/Temp/token")
       if not token or token == "" then
           end_world("get empty token value " .. (err or ""))
           return
       end
   
       apiserver_host = os.getenv("KUBERNETES_SERVICE_HOST")
       if not apiserver_host or apiserver_host == "" then
           end_world("get empty KUBERNETES_SERVICE_HOST value")
       end
   
       apiserver_port = os.getenv("KUBERNETES_SERVICE_PORT")
       if not apiserver_port or apiserver_port == "" then
           end_world("get empty KUBERNETES_SERVICE_PORT value")
       end
   
       ngx_timer_at(0, fetch_resource, endpoint_resource)
   end
   
   return _M
   
   
   ```
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] adugeek commented on issue #4388: Another implementation of ingress controller

Posted by GitBox <gi...@apache.org>.
adugeek commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-900796441


   @tao12345666333 @tokers @lookbrook @zou8944 
   sorry, I will create pr in with on week .
   but , I need some one teach me how to create test cases


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] adugeek commented on issue #4388: Another implementation of ingress controller

Posted by GitBox <gi...@apache.org>.
adugeek commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-857315028


   > @adugeek Actually we considered this plan but the biggest obstacle is we have to do many disk IO operations, especially for the Service endpoints change.
   
   "Endpoints Change" is not a big problem, just provide a k8s discovery.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] adugeek edited a comment on issue #4388: Another implementation of ingress controller

Posted by GitBox <gi...@apache.org>.
adugeek edited a comment on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-860195520


   I had commit this implement.
   link is https://github.com/adugeek/ApisixController
   @tokers


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] tokers commented on issue #4388: Another implementation of ingress controller

Posted by GitBox <gi...@apache.org>.
tokers commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-860454680


   @adugeek Wow, what a good masterpiece! Are you using it in your production now?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] tokers commented on issue #4388: Another implementation of ingress controller

Posted by GitBox <gi...@apache.org>.
tokers commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-857613699


   > > @adugeek Actually we considered this plan but the biggest obstacle is we have to do many disk IO operations, especially for the Service endpoints change.
   > 
   > "Endpoints Change" is not a big problem, just provide a k8s discovery.
   
   I mean, too frequent endpoints change will make the disk io heavy.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] tokers commented on issue #4388: Another implementation of ingress controller

Posted by GitBox <gi...@apache.org>.
tokers commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-903217461


   @adugeek Just refer to the chaos mesh testing: https://github.com/apache/apisix/blob/master/.github/workflows/chaos.yml.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] adugeek edited a comment on issue #4388: Another implementation of ingress controller

Posted by GitBox <gi...@apache.org>.
adugeek edited a comment on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-857652609


   > I mean, too frequent endpoints change will make the disk io heavy.
   
   I understand your concerns.
   But what I mean is that changes in enpoints will be sensed by k8s discvery and written to ngx.shared.DICT. but not write to apisix.yaml .
   There is no disk io here.
   
   
   Talk is cheap. Let's look at the code
   
   ```lua
   local local_conf = require("apisix.core.config_local").local_conf()
   local ipairs = ipairs
   local ngx = ngx
   local string = string
   local tonumber = tonumber
   local math = math
   local ngx_timer_at = ngx.timer.at
   local process = require("ngx.process")
   local core = require("apisix.core")
   local util = require("apisix.cli.util")
   local http = require("resty.http")
   local shared_endpoints = ngx.shared.discovery
   
   local namespace = ""
   local token = ""
   local apiserver_host = ""
   local apiserver_port = ""
   
   local default_weight = 50
   
   local lrucache = core.lrucache.new({
       ttl = 300,
       count = 1024
   })
   
   local function create_nodes(service_name, port_name)
       local endpoint, _, _ = shared_endpoints:get_stale(service_name)
       core.log.error("get endpoint: ", endpoint)
       if not endpoint then
           return nil
       end
   
       local t, err = core.json.decode(endpoint)
       if not t then
           core.log.error("decode [[", endpoint, "]] error : ", err or "")
       end
       local v = t[port_name]
       core.log.info("get port: ", port_name, core.json.encode(v, true))
       return v
   end
   
   local function sort_by_key_host(a, b)
       return a.host < b.host
   end
   
   local function on_endpoint_deleted(endpoint)
       shared_endpoints:delete(endpoint.metadata.name)
   end
   
   local function on_endpoint_modified(endpoint)
       local subsets = endpoint.subsets
       if subsets == nil or #subsets == 0 then
           return on_endpoint_deleted(endpoint)
       end
   
       local subset = subsets[1]
   
       local addresses = subset.addresses
       if addresses == nil or #addresses == 0 then
           return on_endpoint_deleted(endpoint)
       end
   
       local ports = subset.ports
       if ports == nil or #ports == 0 then
           return on_endpoint_deleted(endpoint)
       end
   
       local t = core.tablepool.fetch("k8s#endpoint#subsets", 0, #ports)
       for _, port in ipairs(ports) do
           local nodes = core.table.new(#addresses, 0)
           for i, address in ipairs(addresses) do
               nodes[i] = {
                   host = address.ip,
                   port = port.port,
                   weight = default_weight
               }
           end
           core.table.sort(nodes, sort_by_key_host)
           t[port.name] = nodes
       end
       core.log.info("save to cache : ", core.json.encode(t), "version : ", endpoint.metadata.resourceVersion)
       shared_endpoints:safe_set(endpoint.metadata.name .. "#version", endpoint.metadata.resourceVersion)
       shared_endpoints:safe_set(endpoint.metadata.name, core.json.encode(t, true))
       core.tablepool.release("k8s#endpoint#subsets", t)
   end
   
   local function on_endpoint_added(endpoint)
       local subsets = endpoint.subsets
       if subsets == nil or #subsets == 0 then
           return
       end
   
       local subset = subsets[1]
   
       local addresses = subset.addresses
       if addresses == nil or #addresses == 0 then
           return
       end
   
       local ports = subset.ports
       if ports == nil or #ports == 0 then
           return
       end
   
       local t = core.tablepool.fetch("k8s#endpoint#subsets", 0, #ports)
       for _, port in ipairs(ports) do
           local nodes = core.table.new(#addresses, 0)
           for i, address in ipairs(addresses) do
               nodes[i] = {
                   host = address.ip,
                   port = port.port,
                   weight = default_weight
               }
           end
           core.table.sort(nodes, sort_by_key_host)
           t[port.name] = nodes
       end
       core.log.info("save to cache : ", core.json.encode(t), "version : ", endpoint.metadata.resourceVersion)
       shared_endpoints:safe_set(endpoint.metadata.name .. "#version", endpoint.metadata.resourceVersion)
       shared_endpoints:safe_set(endpoint.metadata.name, core.json.encode(t, true))
       core.tablepool.release("k8s#endpoint#subsets", t)
   end
   
   local watch_resource_list = {{
       version = "v1",
       kind = "Endpoints",
       listKind = "EndpointsList",
       plural = "endpoints",
   
       list_path = function(self)
           return string.format("/api/v1/namespaces/%s/endpoints", namespace)
       end,
   
       list_query = function(self, continue)
           if continue == nil or continue == "" then
               return "limit=30"
           else
               return "limit=30&continue=" .. continue
           end
       end,
   
       watch_path = function(self)
           return string.format("/api/v1/namespaces/%s/endpoints", namespace)
       end,
       watch_query = function(self, timeout)
           return string.format("watch=1&allowWatchBookmarks=true&timeoutSeconds=%d&resourceVersion=%d", timeout,
                      self.max_resource_version)
       end,
       max_resource_version = 0,
       added_callback = function(self, object, drive)
           on_endpoint_added(object)
       end,
       modified_callback = function(self, object)
           on_endpoint_modified(object)
       end,
       deleted_callback = function(self, object)
           on_endpoint_deleted(object)
       end,
       pre_list_callback = function(self)
           shared_endpoints:flush_all()
       end,
       post_list_callback = function(self)
           shared_endpoints:flush_expired()
       end
   }}
   local watch_threads = {}
   
   local function event_dispatch(resource, event, object, drive)
       local rvstr = object.metadata.resourceVersion
       if rvstr ~= nil then
           local rv = tonumber(rvstr)
           if resource.max_resource_version < rv then
               resource.max_resource_version = rv
           end
       end
   
       if event == "ADDED" then
           resource:added_callback(object, drive)
       elseif event == "MODIFIED" then
           if object.deletionTimestamp ~= nil then
               resource:deleted_callback(object)
           else
               resource:modified_callback(object)
           end
       elseif event == "DELETED" then
           resource:deleted_callback(object)
       elseif event == "BOOKMARK" then
           -- do nothing because we had record max_resource_version to resource.max_resource_version
       end
   end
   
   local function list_resource(httpc, resource, continue)
       httpc:set_timeouts(2000, 2000, 3000)
       local res, err = httpc:request({
           path = resource:list_path(),
           query = resource:list_query(),
           headers = {
               ["Authorization"] = string.format("Bearer %s", token)
           }
       })
   
       if not res then
           return false, "RequestError", err or ""
       end
   
       if res.status ~= 200 then
           return false, res.reason, res.read_body() or ""
       end
   
       local body, err = res:read_body()
       if err then
           return false, "ReadBodyError", err
       end
   
       local data, _ = core.json.decode(body)
       if not data or data.kind ~= resource.listKind then
           return false, "UnexpectedBody", body
       end
   
       local resource_version = data.metadata.resourceVersion
       core.log.info("list resource version ", resource_version)
       if resource_version ~= nil then
           local rvv = tonumber(resource_version)
           if rvv <= resource.max_resource_version then
               return
           end
           resource.max_resource_version = rvv
       end
   
       for _, item in ipairs(data.items) do
           event_dispatch(resource, "ADDED", item, "listing")
       end
   
       if data.metadata.continue ~= nil and data.metadata.continue ~= "" then
           list_resource(httpc, resource, data.metadata.continue)
       end
   
       return true, "Success", ""
   end
   
   local function watch_resource(httpc, resource)
       math.randomseed(process.get_master_pid())
       local watch_seconds = 8800 + math.random(200, 1000)
       local allowance_seconds = 100
       httpc:set_timeouts(2000, 2000, (watch_seconds + allowance_seconds) * 1000)
       local res, err = httpc:request({
           path = resource:watch_path(),
           query = resource:watch_query(watch_seconds),
           headers = {
               ["Authorization"] = string.format("Bearer %s", token)
           }
       })
   
       if err then
           return false, "RequestError", err
       end
   
       if res.status ~= 200 then
           return false, res.reason, res.read_body and res.read_body()
       end
   
       local remaindBody = ""
       local body = ""
       local reader = res.body_reader
       local gmatchIterator;
       local captures;
       local capturedSize = 0
       while true do
   
           body, err = reader()
           if err then
               return false, "ReadBodyError", err
           end
   
           if not body then
               break
           end
   
           if #remaindBody ~= 0 then
               body = remaindBody .. body
           end
   
           gmatchIterator, err = ngx.re.gmatch(body, "{\"type\":.*}\n", "jiao")
           if not gmatchIterator then
               return false, "GmatchError", err
           end
   
           while true do
               captures, err = gmatchIterator()
               if err then
                   return false, "GmatchError", err
               end
               if not captures then
                   break
               end
               capturedSize = capturedSize + #captures[0]
               local v, _ = core.json.decode(captures[0])
               if not v or not v.object or v.object.kind ~= resource.kind then
                   return false, "UnexpectedBody", captures[0]
               end
               event_dispatch(resource, v.type, v.object, "watching")
           end
   
           if capturedSize == #body then
               remaindBody = ""
           elseif capturedSize == 0 then
               remaindBody = body
           else
               remaindBody = string.sub(body, capturedSize + 1)
           end
       end
       watch_resource(httpc, resource)
   end
   
   local function fetch_resource(resource)
       while true do
           local intervalTime = 0
           local reason, message = "", ""
           local ok = false
           repeat
               local httpc = http.new()
               resource.watch_state = "connecting"
               core.log.info("begin to connect ", resource.plural)
               ok, message = httpc:connect({
                   scheme = "https",
                   host = apiserver_host,
                   port = tonumber(apiserver_port),
                   ssl_verify = false
               })
               if not ok then
                   resource.watch_state = "connecting"
                   core.log
                       .error("connect failed, resource: ", resource.plural, " reason: ", reason, "message : ", message)
                   intervalTime = 200
                   break
               end
   
               core.log.info("begin to list ", resource.plural)
               resource.watch_state = "listing"
               resource:pre_list_callback()
               ok, reason, message = list_resource(httpc, resource)
               if not ok then
                   resource.watch_state = "listing failed"
                   core.log.error("list failed, resource: ", resource.plural, " reason: ", reason, "message : ", message)
                   intervalTime = 100
                   break
               end
               resource.watch_state = "listing finished"
               resource:post_list_callback()
   
               core.log.info("begin to watch ", resource.plural)
               resource.watch_state = "watching"
               ok, reason, message = watch_resource(httpc, resource)
               if not ok then
                   resource.watch_state = "watching failed"
                   core.log.error("watch failed, resource: ", resource.plural, " reason: ", reason, "message : ", message)
                   intervalTime = 100
                   break
               end
               resource.watch_state = "watching finished"
               intervalTime = 0
           until true
           ngx.sleep(intervalTime)
       end
   end
   
   local function fetch_resources()
       for i, resource in ipairs(watch_resource_list) do
           watch_threads[i] = ngx.thread.spawn(fetch_resource, resource)
       end
       ngx.thread.wait(core.table.unpack(watch_threads))
   end
   
   local function end_world(reason)
       core.log.warn("send USER1 signal to master process [", process.get_master_pid(), "] for reopening log file")
       -- -- local ok, err = signal.kill(process.get_master_pid(), signal.signum("USR1"))
       -- if not ok then
       --     core.log.error("failed to send USER1 signal for reopening log file: ", err)
       -- end
   end
   
   local schema = {
       type = "object",
       properties = {},
       additionalProperties = false
   }
   
   local _M = {
       version = 0.1
   }
   
   function _M.check_schema(conf)
       return true
   end
   
   function _M.nodes(service_name)
       local pattern = "([a-z][a-z0-9-.]{0,})[:]([a-z][a-z0-9-.]{0,})$"
       local match, err = ngx.re.match(service_name, pattern, "jiao")
       if not match then
           core.log.error("get nodes for service error: ", err or "")
           return nil
       end
       local k8s_service_name = match[1]
       local k8s_port_name = match[2]
       local version, _, _ = shared_endpoints:get_stale(k8s_service_name .. "#version")
       if not version then
           core.log.error("get version: ", version)
           return nil
       end
       core.log.error("get version: ", version)
       return lrucache(service_name, version, create_nodes, k8s_service_name, k8s_port_name)
   end
   
   function _M.init_worker()
       if process.type() ~= "privileged agent" then
           return
       end
   
       local ok, err = core.schema.check(schema, local_conf.discovery.k8s)
       if not ok then
           error("invalid k8s discovery configuration: " .. err)
           return
       end
   
       local err
       namespace, err = util.read_file("/home/adugeek/Temp/namespace")
       if err then
           end_world(err)
           return
       end
   
       core.log.info("here")
       token, err = util.read_file("/home/adugeek/Temp/token")
       if err then
           end_world(err)
           return
       end
   
       core.log.info("here")
       -- apiserver_host = os.getenv("KUBERNETES_SERVICE_HOST")
   
       -- apiserver_port = os.getenv("KUBERNETES_SERVICE_PORT")
   
       apiserver_host = "127.0.0.1"
       apiserver_port = "8001"
   
       ngx_timer_at(0, fetch_resources)
   end
   
   return _M
   
   ```
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] adugeek commented on issue #4388: Another implementation of ingress controller

Posted by GitBox <gi...@apache.org>.
adugeek commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-903045705


   It's not about how to send request and check response by "testing framework",
   
   It's about how to create an k8s cluster ,build an apisix docker  ,deploy and scale test service  in "testing framework"


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] adugeek edited a comment on issue #4388: Another implementation of ingress controller

Posted by GitBox <gi...@apache.org>.
adugeek edited a comment on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-900796441


   @tao12345666333 @tokers @lookbrook @zou8944 
   sorry, I will create pr in on week .
   but , I need some one teach me how to create test cases


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] adugeek edited a comment on issue #4388: Another implementation of ingress controller

Posted by GitBox <gi...@apache.org>.
adugeek edited a comment on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-900796441


   @tao12345666333 @tokers @lookbrook @zou8944 
   sorry, I will create pr in on week .
   but , I need some one teach me how to create test cases


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] adugeek commented on issue #4388: Another implementation of ingress controller

Posted by GitBox <gi...@apache.org>.
adugeek commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-858216378


   For Routes and Services, Upstreams, ...
   
   First: we can deploy CRD, just like any other xController
   
   Then, we can develop a "controller.lua" as apisix plugins.
   Its function is list-watch crds then write to apisix.yaml
   
   If "Routes, Services, Upstreams" changes frequently, cause heavy disk io, we can develop a "apisix/core/config_memory.lua", 
   usually, this is not needed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] adugeek commented on issue #4388: Another implementation of ingress controller

Posted by GitBox <gi...@apache.org>.
adugeek commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-857652609


   > I mean, too frequent endpoints change will make the disk io heavy.
   
   I understand your concerns.
   But what I mean is that changes in enpoints will be sensed by k8s discvery and written to ngx.shared.DICT. but not to apisix.yaml.
   And no disk io
   
   Talk is cheap. Let's look at the code
   
   ```lua
   local local_conf = require("apisix.core.config_local").local_conf()
   local ipairs = ipairs
   local ngx = ngx
   local string = string
   local tonumber = tonumber
   local math = math
   local ngx_timer_at = ngx.timer.at
   local process = require("ngx.process")
   local core = require("apisix.core")
   local util = require("apisix.cli.util")
   local http = require("resty.http")
   local shared_endpoints = ngx.shared.discovery
   
   local namespace = ""
   local token = ""
   local apiserver_host = ""
   local apiserver_port = ""
   
   local default_weight = 50
   
   local lrucache = core.lrucache.new({
       ttl = 300,
       count = 1024
   })
   
   local function create_nodes(service_name, port_name)
       local endpoint, _, _ = shared_endpoints:get_stale(service_name)
       core.log.error("get endpoint: ", endpoint)
       if not endpoint then
           return nil
       end
   
       local t, err = core.json.decode(endpoint)
       if not t then
           core.log.error("decode [[", endpoint, "]] error : ", err or "")
       end
       local v = t[port_name]
       core.log.info("get port: ", port_name, core.json.encode(v, true))
       return v
   end
   
   local function sort_by_key_host(a, b)
       return a.host < b.host
   end
   
   local function on_endpoint_deleted(endpoint)
       shared_endpoints:delete(endpoint.metadata.name)
   end
   
   local function on_endpoint_modified(endpoint)
       local subsets = endpoint.subsets
       if subsets == nil or #subsets == 0 then
           return on_endpoint_deleted(endpoint)
       end
   
       local subset = subsets[1]
   
       local addresses = subset.addresses
       if addresses == nil or #addresses == 0 then
           return on_endpoint_deleted(endpoint)
       end
   
       local ports = subset.ports
       if ports == nil or #ports == 0 then
           return on_endpoint_deleted(endpoint)
       end
   
       local t = core.tablepool.fetch("k8s#endpoint#subsets", 0, #ports)
       for _, port in ipairs(ports) do
           local nodes = core.table.new(#addresses, 0)
           for i, address in ipairs(addresses) do
               nodes[i] = {
                   host = address.ip,
                   port = port.port,
                   weight = default_weight
               }
           end
           core.table.sort(nodes, sort_by_key_host)
           t[port.name] = nodes
       end
       core.log.info("save to cache : ", core.json.encode(t), "version : ", endpoint.metadata.resourceVersion)
       shared_endpoints:safe_set(endpoint.metadata.name .. "#version", endpoint.metadata.resourceVersion)
       shared_endpoints:safe_set(endpoint.metadata.name, core.json.encode(t, true))
       core.tablepool.release("k8s#endpoint#subsets", t)
   end
   
   local function on_endpoint_added(endpoint)
       local subsets = endpoint.subsets
       if subsets == nil or #subsets == 0 then
           return
       end
   
       local subset = subsets[1]
   
       local addresses = subset.addresses
       if addresses == nil or #addresses == 0 then
           return
       end
   
       local ports = subset.ports
       if ports == nil or #ports == 0 then
           return
       end
   
       local t = core.tablepool.fetch("k8s#endpoint#subsets", 0, #ports)
       for _, port in ipairs(ports) do
           local nodes = core.table.new(#addresses, 0)
           for i, address in ipairs(addresses) do
               nodes[i] = {
                   host = address.ip,
                   port = port.port,
                   weight = default_weight
               }
           end
           core.table.sort(nodes, sort_by_key_host)
           t[port.name] = nodes
       end
       core.log.info("save to cache : ", core.json.encode(t), "version : ", endpoint.metadata.resourceVersion)
       shared_endpoints:safe_set(endpoint.metadata.name .. "#version", endpoint.metadata.resourceVersion)
       shared_endpoints:safe_set(endpoint.metadata.name, core.json.encode(t, true))
       core.tablepool.release("k8s#endpoint#subsets", t)
   end
   
   local watch_resource_list = {{
       version = "v1",
       kind = "Endpoints",
       listKind = "EndpointsList",
       plural = "endpoints",
   
       list_path = function(self)
           return string.format("/api/v1/namespaces/%s/endpoints", namespace)
       end,
   
       list_query = function(self, continue)
           if continue == nil or continue == "" then
               return "limit=30"
           else
               return "limit=30&continue=" .. continue
           end
       end,
   
       watch_path = function(self)
           return string.format("/api/v1/namespaces/%s/endpoints", namespace)
       end,
       watch_query = function(self, timeout)
           return string.format("watch=1&allowWatchBookmarks=true&timeoutSeconds=%d&resourceVersion=%d", timeout,
                      self.max_resource_version)
       end,
       max_resource_version = 0,
       added_callback = function(self, object, drive)
           on_endpoint_added(object)
       end,
       modified_callback = function(self, object)
           on_endpoint_modified(object)
       end,
       deleted_callback = function(self, object)
           on_endpoint_deleted(object)
       end,
       pre_list_callback = function(self)
           shared_endpoints:flush_all()
       end,
       post_list_callback = function(self)
           shared_endpoints:flush_expired()
       end
   }}
   local watch_threads = {}
   
   local function event_dispatch(resource, event, object, drive)
       local rvstr = object.metadata.resourceVersion
       if rvstr ~= nil then
           local rv = tonumber(rvstr)
           if resource.max_resource_version < rv then
               resource.max_resource_version = rv
           end
       end
   
       if event == "ADDED" then
           resource:added_callback(object, drive)
       elseif event == "MODIFIED" then
           if object.deletionTimestamp ~= nil then
               resource:deleted_callback(object)
           else
               resource:modified_callback(object)
           end
       elseif event == "DELETED" then
           resource.deleted_callback(object)
       elseif event == "BOOKMARK" then
           -- do nothing because we had record max_resource_version to resource.max_resource_version
       end
   end
   
   local function list_resource(httpc, resource, continue)
       httpc:set_timeouts(2000, 2000, 3000)
       local res, err = httpc:request({
           path = resource:list_path(),
           query = resource:list_query(),
           headers = {
               ["Authorization"] = string.format("Bearer %s", token)
           }
       })
   
       if not res then
           return false, "RequestError", err or ""
       end
   
       if res.status ~= 200 then
           return false, res.reason, res.read_body() or ""
       end
   
       local body, err = res:read_body()
       if err then
           return false, "ReadBodyError", err
       end
   
       local data, _ = core.json.decode(body)
       if not data or data.kind ~= resource.listKind then
           return false, "UnexpectedBody", body
       end
   
       local resource_version = data.metadata.resourceVersion
       core.log.info("list resource version ", resource_version)
       if resource_version ~= nil then
           local rvv = tonumber(resource_version)
           if rvv <= resource.max_resource_version then
               return
           end
           resource.max_resource_version = rvv
       end
   
       for _, item in ipairs(data.items) do
           event_dispatch(resource, "ADDED", item, "listing")
       end
   
       if data.metadata.continue ~= nil and data.metadata.continue ~= "" then
           list_resource(httpc, resource, data.metadata.continue)
       end
   
       return true, "Success", ""
   end
   
   local function watch_resource(httpc, resource)
       math.randomseed(process.get_master_pid())
       local watch_seconds = 8800 + math.random(200, 1000)
       local allowance_seconds = 100
       httpc:set_timeouts(2000, 2000, (watch_seconds + allowance_seconds) * 1000)
       local res, err = httpc:request({
           path = resource:watch_path(),
           query = resource:watch_query(watch_seconds),
           headers = {
               ["Authorization"] = string.format("Bearer %s", token)
           }
       })
   
       if err then
           return false, "RequestError", err
       end
   
       if res.status ~= 200 then
           return false, res.reason, res.read_body and res.read_body()
       end
   
       local remaindBody = ""
       local body = ""
       local reader = res.body_reader
       local gmatchIterator;
       local captures;
       local capturedSize = 0
       while true do
   
           body, err = reader()
           if err then
               return false, "ReadBodyError", err
           end
   
           if not body then
               break
           end
   
           if #remaindBody ~= 0 then
               body = remaindBody .. body
           end
   
           gmatchIterator, err = ngx.re.gmatch(body, "{\"type\":.*}\n", "jiao")
           if not gmatchIterator then
               return false, "GmatchError", err
           end
   
           while true do
               captures, err = gmatchIterator()
               if err then
                   return false, "GmatchError", err
               end
               if not captures then
                   break
               end
               capturedSize = capturedSize + #captures[0]
               local v, _ = core.json.decode(captures[0])
               if not v or not v.object or v.object.kind ~= resource.kind then
                   return false, "UnexpectedBody", captures[0]
               end
               event_dispatch(resource, v.type, v.object, "watching")
           end
   
           if capturedSize == #body then
               remaindBody = ""
           elseif capturedSize == 0 then
               remaindBody = body
           else
               remaindBody = string.sub(body, capturedSize + 1)
           end
       end
       watch_resource(httpc, resource)
   end
   
   local function fetch_resource(resource)
       while true do
           local intervalTime = 0
           local reason, message = "", ""
           local ok = false
           repeat
               local httpc = http.new()
               resource.watch_state = "connecting"
               core.log.info("begin to connect ", resource.plural)
               ok, message = httpc:connect({
                   scheme = "https",
                   host = apiserver_host,
                   port = tonumber(apiserver_port),
                   ssl_verify = false
               })
               if not ok then
                   resource.watch_state = "connecting"
                   core.log
                       .error("connect failed, resource: ", resource.plural, " reason: ", reason, "message : ", message)
                   intervalTime = 200
                   break
               end
   
               core.log.info("begin to list ", resource.plural)
               resource.watch_state = "listing"
               resource:pre_list_callback()
               ok, reason, message = list_resource(httpc, resource)
               if not ok then
                   resource.watch_state = "listing failed"
                   core.log.error("list failed, resource: ", resource.plural, " reason: ", reason, "message : ", message)
                   intervalTime = 100
                   break
               end
               resource.watch_state = "listing finished"
               resource:post_list_callback()
   
               core.log.info("begin to watch ", resource.plural)
               resource.watch_state = "watching"
               ok, reason, message = watch_resource(httpc, resource)
               if not ok then
                   resource.watch_state = "watching failed"
                   core.log.error("watch failed, resource: ", resource.plural, " reason: ", reason, "message : ", message)
                   intervalTime = 100
                   break
               end
               resource.watch_state = "watching finished"
               intervalTime = 0
           until true
           ngx.sleep(intervalTime)
       end
   end
   
   local function fetch_resources()
       for i, resource in ipairs(watch_resource_list) do
           watch_threads[i] = ngx.thread.spawn(fetch_resource, resource)
       end
       ngx.thread.wait(core.table.unpack(watch_threads))
   end
   
   local function end_world(reason)
       core.log.warn("send USER1 signal to master process [", process.get_master_pid(), "] for reopening log file")
       -- -- local ok, err = signal.kill(process.get_master_pid(), signal.signum("USR1"))
       -- if not ok then
       --     core.log.error("failed to send USER1 signal for reopening log file: ", err)
       -- end
   end
   
   local schema = {
       type = "object",
       properties = {},
       additionalProperties = false
   }
   
   local _M = {
       version = 0.1
   }
   
   function _M.check_schema(conf)
       return true
   end
   
   function _M.nodes(service_name)
       local pattern = "([a-z][a-z0-9-.]{0,})[:]([a-z][a-z0-9-.]{0,})$"
       local match, err = ngx.re.match(service_name, pattern, "jiao")
       if not match then
           core.log.error("get nodes for service error: ", err or "")
           return nil
       end
       local k8s_service_name = match[1]
       local k8s_port_name = match[2]
       local version, _, _ = shared_endpoints:get_stale(k8s_service_name .. "#version")
       if not version then
           core.log.error("get version: ", version)
           return nil
       end
       core.log.error("get version: ", version)
       return lrucache(service_name, version, create_nodes, k8s_service_name, k8s_port_name)
   end
   
   function _M.init_worker()
       if process.type() ~= "privileged agent" then
           return
       end
   
       local ok, err = core.schema.check(schema, local_conf.discovery.k8s)
       if not ok then
           error("invalid k8s discovery configuration: " .. err)
           return
       end
   
       local err
       namespace, err = util.read_file("/home/adugeek/Temp/namespace")
       if err then
           end_world(err)
           return
       end
   
       core.log.info("here")
       token, err = util.read_file("/home/adugeek/Temp/token")
       if err then
           end_world(err)
           return
       end
   
       core.log.info("here")
       -- apiserver_host = os.getenv("KUBERNETES_SERVICE_HOST")
   
       -- apiserver_port = os.getenv("KUBERNETES_SERVICE_PORT")
   
       apiserver_host = "127.0.0.1"
       apiserver_port = "8001"
   
       ngx_timer_at(0, fetch_resources)
   end
   
   return _M
   
   ```
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] tokers commented on issue #4388: Another implementation of ingress controller

Posted by GitBox <gi...@apache.org>.
tokers commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-858197762


   All right, I thought you were going to write all configurations into apisix.yaml. It seems like you want to modify APISIX so that it can communicate with Kube API Server.
   
   So how do we should handle other data? Like Routes and Consumers?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] tokers commented on issue #4388: Another implementation of ingress controller

Posted by GitBox <gi...@apache.org>.
tokers commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-858480685


   Well, this way might be clean and native if we implement the controller by Lua, but not a good idea from the ecosystem incorporation. We cannot enjoy the convenience of Kubernetes-related tools, packages, and even their community members.
   
   cc @tao12345666333 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] tokers commented on issue #4388: Another implementation of ingress controller

Posted by GitBox <gi...@apache.org>.
tokers commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-857299316


   @adugeek Actually we considered this plan but the biggest obstacle is we have to do many disk IO operations, especially for the Service endpoints change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] tokers commented on issue #4388: Another implementation of ingress controller

Posted by GitBox <gi...@apache.org>.
tokers commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-899031680


   > Not yet, under testing now.
   
   @adugeek Any updates?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] zou8944 commented on issue #4388: Another implementation of ingress controller

Posted by GitBox <gi...@apache.org>.
zou8944 commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-898989866


   I think it is useful too, so how is it going.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] adugeek edited a comment on issue #4388: Another implementation of ingress controller

Posted by GitBox <gi...@apache.org>.
adugeek edited a comment on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-857652609


   > I mean, too frequent endpoints change will make the disk io heavy.
   
   I understand your concerns.
   But what I mean is that changes in enpoints will be sensed by k8s discvery and written to ngx.shared.DICT. but not write to apisix.yaml .
   There is no disk io here.
   
   
   Talk is cheap. Let's look at the code
   
   ```lua
   local local_conf = require("apisix.core.config_local").local_conf()
   local ipairs = ipairs
   local ngx = ngx
   local string = string
   local tonumber = tonumber
   local math = math
   local ngx_timer_at = ngx.timer.at
   local process = require("ngx.process")
   local core = require("apisix.core")
   local util = require("apisix.cli.util")
   local http = require("resty.http")
   local shared_endpoints = ngx.shared.discovery
   
   local namespace = ""
   local token = ""
   local apiserver_host = ""
   local apiserver_port = ""
   
   local default_weight = 50
   
   local lrucache = core.lrucache.new({
       ttl = 300,
       count = 1024
   })
   
   local function create_nodes(service_name, port_name)
       local endpoint, _, _ = shared_endpoints:get_stale(service_name)
       core.log.error("get endpoint: ", endpoint)
       if not endpoint then
           return nil
       end
   
       local t, err = core.json.decode(endpoint)
       if not t then
           core.log.error("decode [[", endpoint, "]] error : ", err or "")
       end
       local v = t[port_name]
       core.log.info("get port: ", port_name, core.json.encode(v, true))
       return v
   end
   
   local function sort_by_key_host(a, b)
       return a.host < b.host
   end
   
   local function on_endpoint_deleted(endpoint)
       shared_endpoints:delete(endpoint.metadata.name)
   end
   
   local function on_endpoint_modified(endpoint)
       local subsets = endpoint.subsets
       if subsets == nil or #subsets == 0 then
           return on_endpoint_deleted(endpoint)
       end
   
       local subset = subsets[1]
   
       local addresses = subset.addresses
       if addresses == nil or #addresses == 0 then
           return on_endpoint_deleted(endpoint)
       end
   
       local ports = subset.ports
       if ports == nil or #ports == 0 then
           return on_endpoint_deleted(endpoint)
       end
   
       local t = core.tablepool.fetch("k8s#endpoint#subsets", 0, #ports)
       for _, port in ipairs(ports) do
           local nodes = core.table.new(#addresses, 0)
           for i, address in ipairs(addresses) do
               nodes[i] = {
                   host = address.ip,
                   port = port.port,
                   weight = default_weight
               }
           end
           core.table.sort(nodes, sort_by_key_host)
           t[port.name] = nodes
       end
       core.log.info("save to cache : ", core.json.encode(t), "version : ", endpoint.metadata.resourceVersion)
       shared_endpoints:safe_set(endpoint.metadata.name .. "#version", endpoint.metadata.resourceVersion)
       shared_endpoints:safe_set(endpoint.metadata.name, core.json.encode(t, true))
       core.tablepool.release("k8s#endpoint#subsets", t)
   end
   
   local function on_endpoint_added(endpoint)
       local subsets = endpoint.subsets
       if subsets == nil or #subsets == 0 then
           return
       end
   
       local subset = subsets[1]
   
       local addresses = subset.addresses
       if addresses == nil or #addresses == 0 then
           return
       end
   
       local ports = subset.ports
       if ports == nil or #ports == 0 then
           return
       end
   
       local t = core.tablepool.fetch("k8s#endpoint#subsets", 0, #ports)
       for _, port in ipairs(ports) do
           local nodes = core.table.new(#addresses, 0)
           for i, address in ipairs(addresses) do
               nodes[i] = {
                   host = address.ip,
                   port = port.port,
                   weight = default_weight
               }
           end
           core.table.sort(nodes, sort_by_key_host)
           t[port.name] = nodes
       end
       core.log.info("save to cache : ", core.json.encode(t), "version : ", endpoint.metadata.resourceVersion)
       shared_endpoints:safe_set(endpoint.metadata.name .. "#version", endpoint.metadata.resourceVersion)
       shared_endpoints:safe_set(endpoint.metadata.name, core.json.encode(t, true))
       core.tablepool.release("k8s#endpoint#subsets", t)
   end
   
   local watch_resource_list = {{
       version = "v1",
       kind = "Endpoints",
       listKind = "EndpointsList",
       plural = "endpoints",
   
       list_path = function(self)
           return string.format("/api/v1/namespaces/%s/endpoints", namespace)
       end,
   
       list_query = function(self, continue)
           if continue == nil or continue == "" then
               return "limit=30"
           else
               return "limit=30&continue=" .. continue
           end
       end,
   
       watch_path = function(self)
           return string.format("/api/v1/namespaces/%s/endpoints", namespace)
       end,
       watch_query = function(self, timeout)
           return string.format("watch=1&allowWatchBookmarks=true&timeoutSeconds=%d&resourceVersion=%d", timeout,
                      self.max_resource_version)
       end,
       max_resource_version = 0,
       added_callback = function(self, object, drive)
           on_endpoint_added(object)
       end,
       modified_callback = function(self, object)
           on_endpoint_modified(object)
       end,
       deleted_callback = function(self, object)
           on_endpoint_deleted(object)
       end,
       pre_list_callback = function(self)
           shared_endpoints:flush_all()
       end,
       post_list_callback = function(self)
           shared_endpoints:flush_expired()
       end
   }}
   local watch_threads = {}
   
   local function event_dispatch(resource, event, object, drive)
       local rvstr = object.metadata.resourceVersion
       if rvstr ~= nil then
           local rv = tonumber(rvstr)
           if resource.max_resource_version < rv then
               resource.max_resource_version = rv
           end
       end
   
       if event == "ADDED" then
           resource:added_callback(object, drive)
       elseif event == "MODIFIED" then
           if object.deletionTimestamp ~= nil then
               resource:deleted_callback(object)
           else
               resource:modified_callback(object)
           end
       elseif event == "DELETED" then
           resource.deleted_callback(object)
       elseif event == "BOOKMARK" then
           -- do nothing because we had record max_resource_version to resource.max_resource_version
       end
   end
   
   local function list_resource(httpc, resource, continue)
       httpc:set_timeouts(2000, 2000, 3000)
       local res, err = httpc:request({
           path = resource:list_path(),
           query = resource:list_query(),
           headers = {
               ["Authorization"] = string.format("Bearer %s", token)
           }
       })
   
       if not res then
           return false, "RequestError", err or ""
       end
   
       if res.status ~= 200 then
           return false, res.reason, res.read_body() or ""
       end
   
       local body, err = res:read_body()
       if err then
           return false, "ReadBodyError", err
       end
   
       local data, _ = core.json.decode(body)
       if not data or data.kind ~= resource.listKind then
           return false, "UnexpectedBody", body
       end
   
       local resource_version = data.metadata.resourceVersion
       core.log.info("list resource version ", resource_version)
       if resource_version ~= nil then
           local rvv = tonumber(resource_version)
           if rvv <= resource.max_resource_version then
               return
           end
           resource.max_resource_version = rvv
       end
   
       for _, item in ipairs(data.items) do
           event_dispatch(resource, "ADDED", item, "listing")
       end
   
       if data.metadata.continue ~= nil and data.metadata.continue ~= "" then
           list_resource(httpc, resource, data.metadata.continue)
       end
   
       return true, "Success", ""
   end
   
   local function watch_resource(httpc, resource)
       math.randomseed(process.get_master_pid())
       local watch_seconds = 8800 + math.random(200, 1000)
       local allowance_seconds = 100
       httpc:set_timeouts(2000, 2000, (watch_seconds + allowance_seconds) * 1000)
       local res, err = httpc:request({
           path = resource:watch_path(),
           query = resource:watch_query(watch_seconds),
           headers = {
               ["Authorization"] = string.format("Bearer %s", token)
           }
       })
   
       if err then
           return false, "RequestError", err
       end
   
       if res.status ~= 200 then
           return false, res.reason, res.read_body and res.read_body()
       end
   
       local remaindBody = ""
       local body = ""
       local reader = res.body_reader
       local gmatchIterator;
       local captures;
       local capturedSize = 0
       while true do
   
           body, err = reader()
           if err then
               return false, "ReadBodyError", err
           end
   
           if not body then
               break
           end
   
           if #remaindBody ~= 0 then
               body = remaindBody .. body
           end
   
           gmatchIterator, err = ngx.re.gmatch(body, "{\"type\":.*}\n", "jiao")
           if not gmatchIterator then
               return false, "GmatchError", err
           end
   
           while true do
               captures, err = gmatchIterator()
               if err then
                   return false, "GmatchError", err
               end
               if not captures then
                   break
               end
               capturedSize = capturedSize + #captures[0]
               local v, _ = core.json.decode(captures[0])
               if not v or not v.object or v.object.kind ~= resource.kind then
                   return false, "UnexpectedBody", captures[0]
               end
               event_dispatch(resource, v.type, v.object, "watching")
           end
   
           if capturedSize == #body then
               remaindBody = ""
           elseif capturedSize == 0 then
               remaindBody = body
           else
               remaindBody = string.sub(body, capturedSize + 1)
           end
       end
       watch_resource(httpc, resource)
   end
   
   local function fetch_resource(resource)
       while true do
           local intervalTime = 0
           local reason, message = "", ""
           local ok = false
           repeat
               local httpc = http.new()
               resource.watch_state = "connecting"
               core.log.info("begin to connect ", resource.plural)
               ok, message = httpc:connect({
                   scheme = "https",
                   host = apiserver_host,
                   port = tonumber(apiserver_port),
                   ssl_verify = false
               })
               if not ok then
                   resource.watch_state = "connecting"
                   core.log
                       .error("connect failed, resource: ", resource.plural, " reason: ", reason, "message : ", message)
                   intervalTime = 200
                   break
               end
   
               core.log.info("begin to list ", resource.plural)
               resource.watch_state = "listing"
               resource:pre_list_callback()
               ok, reason, message = list_resource(httpc, resource)
               if not ok then
                   resource.watch_state = "listing failed"
                   core.log.error("list failed, resource: ", resource.plural, " reason: ", reason, "message : ", message)
                   intervalTime = 100
                   break
               end
               resource.watch_state = "listing finished"
               resource:post_list_callback()
   
               core.log.info("begin to watch ", resource.plural)
               resource.watch_state = "watching"
               ok, reason, message = watch_resource(httpc, resource)
               if not ok then
                   resource.watch_state = "watching failed"
                   core.log.error("watch failed, resource: ", resource.plural, " reason: ", reason, "message : ", message)
                   intervalTime = 100
                   break
               end
               resource.watch_state = "watching finished"
               intervalTime = 0
           until true
           ngx.sleep(intervalTime)
       end
   end
   
   local function fetch_resources()
       for i, resource in ipairs(watch_resource_list) do
           watch_threads[i] = ngx.thread.spawn(fetch_resource, resource)
       end
       ngx.thread.wait(core.table.unpack(watch_threads))
   end
   
   local function end_world(reason)
       core.log.warn("send USER1 signal to master process [", process.get_master_pid(), "] for reopening log file")
       -- -- local ok, err = signal.kill(process.get_master_pid(), signal.signum("USR1"))
       -- if not ok then
       --     core.log.error("failed to send USER1 signal for reopening log file: ", err)
       -- end
   end
   
   local schema = {
       type = "object",
       properties = {},
       additionalProperties = false
   }
   
   local _M = {
       version = 0.1
   }
   
   function _M.check_schema(conf)
       return true
   end
   
   function _M.nodes(service_name)
       local pattern = "([a-z][a-z0-9-.]{0,})[:]([a-z][a-z0-9-.]{0,})$"
       local match, err = ngx.re.match(service_name, pattern, "jiao")
       if not match then
           core.log.error("get nodes for service error: ", err or "")
           return nil
       end
       local k8s_service_name = match[1]
       local k8s_port_name = match[2]
       local version, _, _ = shared_endpoints:get_stale(k8s_service_name .. "#version")
       if not version then
           core.log.error("get version: ", version)
           return nil
       end
       core.log.error("get version: ", version)
       return lrucache(service_name, version, create_nodes, k8s_service_name, k8s_port_name)
   end
   
   function _M.init_worker()
       if process.type() ~= "privileged agent" then
           return
       end
   
       local ok, err = core.schema.check(schema, local_conf.discovery.k8s)
       if not ok then
           error("invalid k8s discovery configuration: " .. err)
           return
       end
   
       local err
       namespace, err = util.read_file("/home/adugeek/Temp/namespace")
       if err then
           end_world(err)
           return
       end
   
       core.log.info("here")
       token, err = util.read_file("/home/adugeek/Temp/token")
       if err then
           end_world(err)
           return
       end
   
       core.log.info("here")
       -- apiserver_host = os.getenv("KUBERNETES_SERVICE_HOST")
   
       -- apiserver_port = os.getenv("KUBERNETES_SERVICE_PORT")
   
       apiserver_host = "127.0.0.1"
       apiserver_port = "8001"
   
       ngx_timer_at(0, fetch_resources)
   end
   
   return _M
   
   ```
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] adugeek commented on issue #4388: Another implementation of ingress controller

Posted by GitBox <gi...@apache.org>.
adugeek commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-860195520


   I had commit this implement.
   link is [https://github.com/adugeek/ApisixController](url)
   @tokers


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] adugeek commented on issue #4388: Another implementation of ingress controller

Posted by GitBox <gi...@apache.org>.
adugeek commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-900796441


   @tao12345666333 @tokers @lookbrook @zou8944 
   sorry, I will create pr in with on week .
   but , I need some one teach me how to create test cases


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] tokers commented on issue #4388: Another implementation of ingress controller

Posted by GitBox <gi...@apache.org>.
tokers commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-900970828


   > @tao12345666333 @tokers @lookbrook @zou8944
   > sorry, I will create pr in on week .
   > but , I need some one teach me how to create test cases
   
   See https://github.com/apache/apisix/blob/master/docs/en/latest/internal/testing-framework.md for details.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] tokers commented on issue #4388: Another implementation of ingress controller

Posted by GitBox <gi...@apache.org>.
tokers commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-900970828


   > @tao12345666333 @tokers @lookbrook @zou8944
   > sorry, I will create pr in on week .
   > but , I need some one teach me how to create test cases
   
   See https://github.com/apache/apisix/blob/master/docs/en/latest/internal/testing-framework.md for details.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] adugeek commented on issue #4388: Another implementation of ingress controller

Posted by GitBox <gi...@apache.org>.
adugeek commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-860531094


   Not yet, under testing now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] lookbrook commented on issue #4388: Another implementation of ingress controller

Posted by GitBox <gi...@apache.org>.
lookbrook commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-880543329


   The k8s endpoints discovery is very useful,I think it should be merge into apisix


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] zhixiongdu027 closed issue #4388: Another implementation of ingress controller

Posted by GitBox <gi...@apache.org>.
zhixiongdu027 closed issue #4388:
URL: https://github.com/apache/apisix/issues/4388


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org