You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by GitBox <gi...@apache.org> on 2021/06/08 01:42:51 UTC
[GitHub] [apisix] adugeek opened a new issue #4388: Another implementation of ingress controller
adugeek opened a new issue #4388:
URL: https://github.com/apache/apisix/issues/4388
Although there is an official implementation of Ingress Controller, it is not very practical in some scenarios.
1: Small-scale cluster. If your cluster only has 1~2 nodes, you cannot provide a stable Etcd deployment environment. Even if you deploy it in a certain way, it will consume additional resources.
2: The general private k8s cluster does not have a stable external storage service. Even if the number of nodes is sufficient, it is not convenient to deploy etcd
3: An isolated multi-tenant cluster through the namespace. The appearance of the Ingress Controller as a cluster entry will break the isolation of tenants. Even creating a new Ingress Contoller Class for each tenant is not appropriate.
Based on the above reasons, I designed a new implementation of Ingress Controller.
The main points of implementation are as follows:
1: Define CRD on the cluster
2: Deploy apisix service in each namespace of the cluster as and only as the traffic entrance of that namespace
3: Each apisix uses the privileged process list-watch k8s namespaced crd resources, and then writes it to conf/apisix.yaml
4: Implement k8s discovery (list-watch k8s namespaced enpoints)
This way
No need for additional development language and framework intervention,
No need for additional etcd, reduced data transfer process,
It is more robust than the official implementation.
The only implementation difficulty may be how to implement webhooks.
If use client-go development, it is not convenient to verify config schema, plugin schema.
If use the apisix plugin, it is not convenient to verify the uniqueness of service_id, upstream_id and route_id because of the nginx process model.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [apisix] tao12345666333 commented on issue #4388: Another implementation of ingress controller
Posted by GitBox <gi...@apache.org>.
tao12345666333 commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-887230920
> The k8s endpoints discovery is very useful,I think it should be merge into apisix
I agree. @adugeek would you like to add a `kubernetes` discovery plugin for Apache APISIX?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [apisix] adugeek edited a comment on issue #4388: Another implementation of ingress controller
Posted by GitBox <gi...@apache.org>.
adugeek edited a comment on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-857652609
> I mean, too frequent endpoints change will make the disk io heavy.
I understand your concerns.
But what I mean is that changes in enpoints will be sensed by k8s discvery and written to ngx.shared.DICT. but not write to apisix.yaml .
There is no disk io here.
Talk is cheap. Let's look at the code
```lua
local ipairs = ipairs
local ngx = ngx
local string = string
local tonumber = tonumber
local math = math
local os = os
local process = require("ngx.process")
local core = require("apisix.core")
local util = require("apisix.cli.util")
local http = require("resty.http")
local signal = require("resty.signal")
local ngx_timer_at = ngx.timer.at
local shared_endpoints = ngx.shared.discovery
local apiserver_host = ""
local apiserver_port = ""
local namespace = ""
local token = ""
local default_weight = 50
local lrucache = core.lrucache.new({
ttl = 300,
count = 1024
})
local cache_table = {}
local function end_world(reason)
core.log.emerg(reason)
signal.kill(process.get_master_pid(), signal.signum("QUIT"))
end
local function sort_by_key_host(a, b)
return a.host < b.host
end
local function on_endpoint_added(endpoint)
local subsets = endpoint.subsets
if subsets == nil or #subsets == 0 then
return
end
local subset = subsets[1]
local addresses = subset.addresses
if addresses == nil or #addresses == 0 then
return
end
local ports = subset.ports
if ports == nil or #ports == 0 then
return
end
core.table.clear(cache_table)
for _, port in ipairs(ports) do
local nodes = core.table.new(#addresses, 0)
for i, address in ipairs(addresses) do
nodes[i] = {
host = address.ip,
port = port.port,
weight = default_weight
}
end
core.table.sort(nodes, sort_by_key_host)
cache_table[port.name] = nodes
end
local _, err
_, err = shared_endpoints:safe_set(endpoint.metadata.name .. "#version", endpoint.metadata.resourceVersion)
if err then
core.log.emerg("set endpoint version into discovery DICT failed ,", err)
end
shared_endpoints:safe_set(endpoint.metadata.name, core.json.encode(cache_table, true))
if err then
core.log.emerg("set endpoint into discovery DICT failed ,", err)
end
end
local function on_endpoint_deleted(endpoint)
shared_endpoints:deleted(endpoint.metadata.name .. "#version")
shared_endpoints:delete(endpoint.metadata.name)
end
local function on_endpoint_modified(endpoint)
local subsets = endpoint.subsets
if subsets == nil or #subsets == 0 then
return on_endpoint_deleted(endpoint)
end
local subset = subsets[1]
local addresses = subset.addresses
if addresses == nil or #addresses == 0 then
return on_endpoint_deleted(endpoint)
end
local ports = subset.ports
if ports == nil or #ports == 0 then
return on_endpoint_deleted(endpoint)
end
core.table.clear(cache_table)
for _, port in ipairs(ports) do
local nodes = core.table.new(#addresses, 0)
for i, address in ipairs(addresses) do
nodes[i] = {
host = address.ip,
port = port.port,
weight = default_weight
}
end
core.table.sort(nodes, sort_by_key_host)
cache_table[port.name] = nodes
end
local _, err
_, err = shared_endpoints:safe_set(endpoint.metadata.name .. "#version", endpoint.metadata.resourceVersion)
if err then
core.log.emerg("set endpoints version into discovery DICT failed ,", err)
end
shared_endpoints:safe_set(endpoint.metadata.name, core.json.encode(cache_table, true))
if err then
core.log.emerg("set endpoints into discovery DICT failed ,", err)
end
end
local endpoint_resource = {
version = "v1",
kind = "Endpoints",
listKind = "EndpointsList",
plural = "endpoints",
max_resource_version = 0,
list_path = function(self)
return string.format("/api/v1/namespaces/%s/endpoints", namespace)
end,
list_query = function(self, continue)
if continue == nil or continue == "" then
return "limit=45"
else
return "limit=45&continue=" .. continue
end
end,
watch_path = function(self)
return string.format("/api/v1/namespaces/%s/endpoints", namespace)
end,
watch_query = function(self, timeout)
return string.format("watch=1&allowWatchBookmarks=true&timeoutSeconds=%d&resourceVersion=%d", timeout,
self.max_resource_version)
end,
pre_list_callback = function(self)
self.max_resource_version = 0
shared_endpoints:flush_all()
end,
post_list_callback = function(self)
shared_endpoints:flush_expired()
end,
added_callback = function(self, object, drive)
on_endpoint_added(object)
end,
modified_callback = function(self, object)
on_endpoint_modified(object)
end,
deleted_callback = function(self, object)
on_endpoint_deleted(object)
end
}
local function event_dispatch(resource, event, object, drive)
if drive == "watch" then
local resource_version = object.metadata.resourceVersion
local rvv = tonumber(resource_version)
if rvv <= resource.max_resource_version then
return
end
resource.max_resource_version = rvv
end
if event == "ADDED" then
resource:added_callback(object, drive)
elseif event == "MODIFIED" then
if object.deletionTimestamp ~= nil then
resource:deleted_callback(object)
else
resource:modified_callback(object)
end
elseif event == "DELETED" then
resource:deleted_callback(object)
elseif event == "BOOKMARK" then
-- do nothing because we had record max_resource_version to resource.max_resource_version
end
end
local function list_resource(httpc, resource, continue)
httpc:set_timeouts(2000, 2000, 3000)
local res, err = httpc:request({
path = resource:list_path(),
query = resource:list_query(),
headers = {
["Authorization"] = string.format("Bearer %s", token)
}
})
if not res then
return false, "RequestError", err or ""
end
if res.status ~= 200 then
return false, res.reason, res.read_body() or ""
end
local body, err = res:read_body()
if err then
return false, "ReadBodyError", err
end
local data, _ = core.json.decode(body)
if not data or data.kind ~= resource.listKind then
return false, "UnexpectedBody", body
end
local resource_version = data.metadata.resourceVersion
resource.max_resource_version = tonumber(resource_version)
for _, item in ipairs(data.items) do
event_dispatch(resource, "ADDED", item, "list")
end
if data.metadata.continue ~= nil and data.metadata.continue ~= "" then
list_resource(httpc, resource, data.metadata.continue)
end
return true, "Success", ""
end
local function watch_resource(httpc, resource)
math.randomseed(process.get_master_pid())
local watch_seconds = 1800 + math.random(60, 1200)
local allowance_seconds = 120
httpc:set_timeouts(2000, 3000, (watch_seconds + allowance_seconds) * 1000)
local res, err = httpc:request({
path = resource:watch_path(),
query = resource:watch_query(watch_seconds),
headers = {
["Authorization"] = string.format("Bearer %s", token)
}
})
if err then
return false, "RequestError", err
end
if res.status ~= 200 then
return false, res.reason, res.read_body and res.read_body()
end
local remaindBody = ""
local body = ""
local reader = res.body_reader
local gmatchIterator;
local captures;
local capturedSize = 0
while true do
body, err = reader()
if err then
return false, "ReadBodyError", err
end
if not body then
break
end
if #remaindBody ~= 0 then
body = remaindBody .. body
end
gmatchIterator, err = ngx.re.gmatch(body, "{\"type\":.*}\n", "jiao")
if not gmatchIterator then
return false, "GmatchError", err
end
while true do
captures, err = gmatchIterator()
if err then
return false, "GmatchError", err
end
if not captures then
break
end
capturedSize = capturedSize + #captures[0]
local v, _ = core.json.decode(captures[0])
if not v or not v.object or v.object.kind ~= resource.kind then
return false, "UnexpectedBody", captures[0]
end
event_dispatch(resource, v.type, v.object, "watch")
end
if capturedSize == #body then
remaindBody = ""
elseif capturedSize == 0 then
remaindBody = body
else
remaindBody = string.sub(body, capturedSize + 1)
end
end
watch_resource(httpc, resource)
end
local function fetch_resource(resource)
while true do
local ok = false
local reason, message = "", ""
local intervalTime = 0
repeat
local httpc = http.new()
resource.watch_state = "connecting"
core.log.info("begin to connect ", resource.plural)
ok, message = httpc:connect({
scheme = "https",
host = apiserver_host,
port = tonumber(apiserver_port),
ssl_verify = false
})
if not ok then
resource.watch_state = "connecting"
core.log.error("connect apiserver failed , apiserver_host: ", apiserver_host, "apiserver_port",
apiserver_port, "message : ", message)
intervalTime = 200
break
end
core.log.info("begin to list ", resource.plural)
resource.watch_state = "listing"
resource:pre_list_callback()
ok, reason, message = list_resource(httpc, resource)
if not ok then
resource.watch_state = "listing failed"
core.log.error("list failed , resource: ", resource.plural, " reason: ", reason, "message : ", message)
intervalTime = 200
break
end
resource.watch_state = "list finished"
resource:post_list_callback()
core.log.info("begin to watch ", resource.plural)
resource.watch_state = "watching"
ok, reason, message = watch_resource(httpc, resource)
if not ok then
resource.watch_state = "watch failed"
core.log.error("watch failed, resource: ", resource.plural, " reason: ", reason, "message : ", message)
intervalTime = 100
break
end
resource.watch_state = "watch finished"
intervalTime = 0
until true
ngx.sleep(intervalTime)
end
end
local function create_lrucache(service_name, port_name)
local endpoint, _, _ = shared_endpoints:get_stale(service_name)
if not endpoint then
core.log.error("get emppty endpoint from discovery DICT,this should not happen ", service_name)
return nil
end
local t, _ = core.json.decode(endpoint)
if not t then
core.log.error("json decode endpoint failed, this should not happen, content : ", endpoint)
end
return t[port_name]
end
local _M = {
version = 0.01
}
function _M.nodes(service_name)
local pattern = "([a-z][a-z0-9-.]{0,62})[:]([a-z][a-z0-9-.]{0,62})$"
local match, _ = ngx.re.match(service_name, pattern, "jiao")
if not match then
core.log.info("get unexpected upstream service_name: ", service_name)
return nil
end
local k8s_service_name = match[1]
local k8s_port_name = match[2]
local version, _, err = shared_endpoints:get_stale(k8s_service_name .. "#version")
if not version then
core.log.info("get emppty endpoint version from discovery DICT ", k8s_service_name)
return nil
end
return lrucache(service_name, version, create_lrucache, k8s_service_name, k8s_port_name)
end
function _M.init_worker()
if process.type() ~= "privileg,med agent" then
return
end
local err
namespace, err = util.read_file("/home/adugeek/Temp/namespace")
if not namespace or namespace == "" then
end_world("get empty namespace value " .. (err or ""))
return
end
token, err = util.read_file("/home/adugeek/Temp/token")
if not token or token == "" then
end_world("get empty token value " .. (err or ""))
return
end
apiserver_host = os.getenv("KUBERNETES_SERVICE_HOST")
if not apiserver_host or apiserver_host == "" then
end_world("get empty KUBERNETES_SERVICE_HOST value")
end
apiserver_port = os.getenv("KUBERNETES_SERVICE_PORT")
if not apiserver_port or apiserver_port == "" then
end_world("get empty KUBERNETES_SERVICE_PORT value")
end
ngx_timer_at(0, fetch_resource, endpoint_resource)
end
return _M
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [apisix] adugeek edited a comment on issue #4388: Another implementation of ingress controller
Posted by GitBox <gi...@apache.org>.
adugeek edited a comment on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-857652609
> I mean, too frequent endpoints change will make the disk io heavy.
I understand your concerns.
But what I mean is that changes in enpoints will be sensed by k8s discvery and written to ngx.shared.DICT. but not write to apisix.yaml .
There is no disk io here.
Talk is cheap. Let's look at the code
```lua
local ipairs = ipairs
local ngx = ngx
local string = string
local tonumber = tonumber
local math = math
local ngx_timer_at = ngx.timer.at
local process = require("ngx.process")
local core = require("apisix.core")
local util = require("apisix.cli.util")
local http = require("resty.http")
local shared_endpoints = ngx.shared.discovery
local signal = require("resty.signal")
local os = os
local apiserver_host = ""
local apiserver_port = ""
local namespace = ""
local token = ""
local default_weight = 50
local lrucache = core.lrucache.new({
ttl = 300,
count = 1024
})
local cache_table = {}
local function end_world(reason)
core.log.emerg(reason)
signal.kill(process.get_master_pid(), signal.signum("QUIT"))
end
local function sort_by_key_host(a, b)
return a.host < b.host
end
local function on_endpoint_added(endpoint)
local subsets = endpoint.subsets
if subsets == nil or #subsets == 0 then
return
end
local subset = subsets[1]
local addresses = subset.addresses
if addresses == nil or #addresses == 0 then
return
end
local ports = subset.ports
if ports == nil or #ports == 0 then
return
end
core.table.clear(cache_table)
for _, port in ipairs(ports) do
local nodes = core.table.new(#addresses, 0)
for i, address in ipairs(addresses) do
nodes[i] = {
host = address.ip,
port = port.port,
weight = default_weight
}
end
core.table.sort(nodes, sort_by_key_host)
cache_table[port.name] = nodes
end
local _, err
_, err = shared_endpoints:safe_set(endpoint.metadata.name .. "#version", endpoint.metadata.resourceVersion)
if err then
core.log.emerg("set endpoints version into discovery DICT field ,", err)
end
shared_endpoints:safe_set(endpoint.metadata.name, core.json.encode(cache_table, true))
if err then
core.log.emerg("set endpoints into discovery DICT field ,", err)
end
end
local function on_endpoint_deleted(endpoint)
shared_endpoints:deleted(endpoint.metadata.name .. "#version")
shared_endpoints:delete(endpoint.metadata.name)
end
local function on_endpoint_modified(endpoint)
local subsets = endpoint.subsets
if subsets == nil or #subsets == 0 then
return on_endpoint_deleted(endpoint)
end
local subset = subsets[1]
local addresses = subset.addresses
if addresses == nil or #addresses == 0 then
return on_endpoint_deleted(endpoint)
end
local ports = subset.ports
if ports == nil or #ports == 0 then
return on_endpoint_deleted(endpoint)
end
core.table.clear(cache_table)
for _, port in ipairs(ports) do
local nodes = core.table.new(#addresses, 0)
for i, address in ipairs(addresses) do
nodes[i] = {
host = address.ip,
port = port.port,
weight = default_weight
}
end
core.table.sort(nodes, sort_by_key_host)
cache_table[port.name] = nodes
end
local _, err
_, err = shared_endpoints:safe_set(endpoint.metadata.name .. "#version", endpoint.metadata.resourceVersion)
if err then
core.log.emerg("set endpoints version into discovery DICT field ,", err)
end
shared_endpoints:safe_set(endpoint.metadata.name, core.json.encode(cache_table, true))
if err then
core.log.emerg("set endpoints into discovery DICT field ,", err)
end
end
local endpoint_resource = {
version = "v1",
kind = "Endpoints",
listKind = "EndpointsList",
plural = "endpoints",
max_resource_version = 0,
list_path = function(self)
return string.format("/api/v1/namespaces/%s/endpoints", namespace)
end,
list_query = function(self, continue)
if continue == nil or continue == "" then
return "limit=45"
else
return "limit=45&continue=" .. continue
end
end,
watch_path = function(self)
return string.format("/api/v1/namespaces/%s/endpoints", namespace)
end,
watch_query = function(self, timeout)
return string.format("watch=1&allowWatchBookmarks=true&timeoutSeconds=%d&resourceVersion=%d", timeout,
self.max_resource_version)
end,
pre_list_callback = function(self)
self.max_resource_version = 0
shared_endpoints:flush_all()
end,
post_list_callback = function(self)
shared_endpoints:flush_expired()
end,
added_callback = function(self, object, drive)
on_endpoint_added(object)
end,
modified_callback = function(self, object)
on_endpoint_modified(object)
end,
deleted_callback = function(self, object)
on_endpoint_deleted(object)
end
}
local function event_dispatch(resource, event, object, drive)
local rvstr = object.metadata.resourceVersion
if rvstr ~= nil then
local rv = tonumber(rvstr)
if resource.max_resource_version < rv then
resource.max_resource_version = rv
end
end
if event == "ADDED" then
resource:added_callback(object, drive)
elseif event == "MODIFIED" then
if object.deletionTimestamp ~= nil then
resource:deleted_callback(object)
else
resource:modified_callback(object)
end
elseif event == "DELETED" then
resource:deleted_callback(object)
elseif event == "BOOKMARK" then
-- do nothing because we had record max_resource_version to resource.max_resource_version
end
end
local function list_resource(httpc, resource, continue)
httpc:set_timeouts(2000, 2000, 3000)
local res, err = httpc:request({
path = resource:list_path(),
query = resource:list_query(),
headers = {
["Authorization"] = string.format("Bearer %s", token)
}
})
if not res then
return false, "RequestError", err or ""
end
if res.status ~= 200 then
return false, res.reason, res.read_body() or ""
end
local body, err = res:read_body()
if err then
return false, "ReadBodyError", err
end
local data, _ = core.json.decode(body)
if not data or data.kind ~= resource.listKind then
return false, "UnexpectedBody", body
end
local resource_version = data.metadata.resourceVersion
if resource_version ~= nil then
local rvv = tonumber(resource_version)
if rvv <= resource.max_resource_version then
return
end
resource.max_resource_version = rvv
end
for _, item in ipairs(data.items) do
event_dispatch(resource, "ADDED", item, "list")
end
if data.metadata.continue ~= nil and data.metadata.continue ~= "" then
list_resource(httpc, resource, data.metadata.continue)
end
return true, "Success", ""
end
local function watch_resource(httpc, resource)
math.randomseed(process.get_master_pid())
local watch_seconds = 1800 + math.random(60, 1200)
local allowance_seconds = 120
httpc:set_timeouts(2000, 3000, (watch_seconds + allowance_seconds) * 1000)
local res, err = httpc:request({
path = resource:watch_path(),
query = resource:watch_query(watch_seconds),
headers = {
["Authorization"] = string.format("Bearer %s", token)
}
})
if err then
return false, "RequestError", err
end
if res.status ~= 200 then
return false, res.reason, res.read_body and res.read_body()
end
local remaindBody = ""
local body = ""
local reader = res.body_reader
local gmatchIterator;
local captures;
local capturedSize = 0
while true do
body, err = reader()
if err then
return false, "ReadBodyError", err
end
if not body then
break
end
if #remaindBody ~= 0 then
body = remaindBody .. body
end
gmatchIterator, err = ngx.re.gmatch(body, "{\"type\":.*}\n", "jiao")
if not gmatchIterator then
return false, "GmatchError", err
end
while true do
captures, err = gmatchIterator()
if err then
return false, "GmatchError", err
end
if not captures then
break
end
capturedSize = capturedSize + #captures[0]
local v, _ = core.json.decode(captures[0])
if not v or not v.object or v.object.kind ~= resource.kind then
return false, "UnexpectedBody", captures[0]
end
event_dispatch(resource, v.type, v.object, "watch")
end
if capturedSize == #body then
remaindBody = ""
elseif capturedSize == 0 then
remaindBody = body
else
remaindBody = string.sub(body, capturedSize + 1)
end
end
watch_resource(httpc, resource)
end
local function fetch_resource(resource)
while true do
local ok = false
local reason, message = "", ""
local intervalTime = 0
repeat
local httpc = http.new()
resource.watch_state = "connecting"
core.log.info("begin to connect ", resource.plural)
ok, message = httpc:connect({
scheme = "https",
host = apiserver_host,
port = tonumber(apiserver_port),
ssl_verify = false
})
if not ok then
resource.watch_state = "connecting"
core.log.error("connect apiserver failed , apiserver_host: ", apiserver_host, "apiserver_port",
apiserver_port, "message : ", message)
intervalTime = 200
break
end
core.log.info("begin to list ", resource.plural)
resource.watch_state = "listing"
resource:pre_list_callback()
ok, reason, message = list_resource(httpc, resource)
if not ok then
resource.watch_state = "listing failed"
core.log.error("list failed , resource: ", resource.plural, " reason: ", reason, "message : ", message)
intervalTime = 200
break
end
resource.watch_state = "list finished"
resource:post_list_callback()
core.log.info("begin to watch ", resource.plural)
resource.watch_state = "watching"
ok, reason, message = watch_resource(httpc, resource)
if not ok then
resource.watch_state = "watch failed"
core.log.error("watch failed, resource: ", resource.plural, " reason: ", reason, "message : ", message)
intervalTime = 100
break
end
resource.watch_state = "watch finished"
intervalTime = 0
until true
ngx.sleep(intervalTime)
end
end
local function create_lrucache(service_name, port_name)
local endpoint, _, _ = shared_endpoints:get_stale(service_name)
if not endpoint then
core.log.error("get emppty endpoint from discovery DICT,this should not happen ", service_name)
return nil
end
local t, _ = core.json.decode(endpoint)
if not t then
core.log.error("json decode endpoint failed, this should not happen, content : ", endpoint)
end
return t[port_name]
end
local _M = {
version = 0.01
}
function _M.nodes(service_name)
local pattern = "([a-z][a-z0-9-.]{0,62})[:]([a-z][a-z0-9-.]{0,62})$"
local match, _ = ngx.re.match(service_name, pattern, "jiao")
if not match then
core.log.info("get unexpected upstream service_name: ", service_name)
return nil
end
local k8s_service_name = match[1]
local k8s_port_name = match[2]
local version, _, err = shared_endpoints:get_stale(k8s_service_name .. "#version")
if not version then
core.log.info("get emppty endpoint version from discovery DICT ", k8s_service_name)
return nil
end
return lrucache(service_name, version, create_lrucache, k8s_service_name, k8s_port_name)
end
function _M.init_worker()
if process.type() ~= "privileg,med agent" then
return
end
local err
namespace, err = util.read_file("/home/adugeek/Temp/namespace")
if not namespace or namespace == "" then
end_world("get empty namespace value " .. (err or ""))
return
end
token, err = util.read_file("/home/adugeek/Temp/token")
if not token or token == "" then
end_world("get empty token value " .. (err or ""))
return
end
apiserver_host = os.getenv("KUBERNETES_SERVICE_HOST")
if not apiserver_host or apiserver_host == "" then
end_world("get empty KUBERNETES_SERVICE_HOST value")
end
apiserver_port = os.getenv("KUBERNETES_SERVICE_PORT")
if not apiserver_port or apiserver_port == "" then
end_world("get empty KUBERNETES_SERVICE_PORT value")
end
ngx_timer_at(0, fetch_resource, endpoint_resource)
end
return _M
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [apisix] adugeek commented on issue #4388: Another implementation of ingress controller
Posted by GitBox <gi...@apache.org>.
adugeek commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-900796441
@tao12345666333 @tokers @lookbrook @zou8944
sorry, I will create pr in with on week .
but , I need some one teach me how to create test cases
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [apisix] adugeek commented on issue #4388: Another implementation of ingress controller
Posted by GitBox <gi...@apache.org>.
adugeek commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-857315028
> @adugeek Actually we considered this plan but the biggest obstacle is we have to do many disk IO operations, especially for the Service endpoints change.
"Endpoints Change" is not a big problem, just provide a k8s discovery.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [apisix] adugeek edited a comment on issue #4388: Another implementation of ingress controller
Posted by GitBox <gi...@apache.org>.
adugeek edited a comment on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-860195520
I had commit this implement.
link is https://github.com/adugeek/ApisixController
@tokers
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [apisix] tokers commented on issue #4388: Another implementation of ingress controller
Posted by GitBox <gi...@apache.org>.
tokers commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-860454680
@adugeek Wow, what a good masterpiece! Are you using it in your production now?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [apisix] tokers commented on issue #4388: Another implementation of ingress controller
Posted by GitBox <gi...@apache.org>.
tokers commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-857613699
> > @adugeek Actually we considered this plan but the biggest obstacle is we have to do many disk IO operations, especially for the Service endpoints change.
>
> "Endpoints Change" is not a big problem, just provide a k8s discovery.
I mean, too frequent endpoints change will make the disk io heavy.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [apisix] tokers commented on issue #4388: Another implementation of ingress controller
Posted by GitBox <gi...@apache.org>.
tokers commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-903217461
@adugeek Just refer to the chaos mesh testing: https://github.com/apache/apisix/blob/master/.github/workflows/chaos.yml.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [apisix] adugeek edited a comment on issue #4388: Another implementation of ingress controller
Posted by GitBox <gi...@apache.org>.
adugeek edited a comment on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-857652609
> I mean, too frequent endpoints change will make the disk io heavy.
I understand your concerns.
But what I mean is that changes in enpoints will be sensed by k8s discvery and written to ngx.shared.DICT. but not write to apisix.yaml .
There is no disk io here.
Talk is cheap. Let's look at the code
```lua
local local_conf = require("apisix.core.config_local").local_conf()
local ipairs = ipairs
local ngx = ngx
local string = string
local tonumber = tonumber
local math = math
local ngx_timer_at = ngx.timer.at
local process = require("ngx.process")
local core = require("apisix.core")
local util = require("apisix.cli.util")
local http = require("resty.http")
local shared_endpoints = ngx.shared.discovery
local namespace = ""
local token = ""
local apiserver_host = ""
local apiserver_port = ""
local default_weight = 50
local lrucache = core.lrucache.new({
ttl = 300,
count = 1024
})
local function create_nodes(service_name, port_name)
local endpoint, _, _ = shared_endpoints:get_stale(service_name)
core.log.error("get endpoint: ", endpoint)
if not endpoint then
return nil
end
local t, err = core.json.decode(endpoint)
if not t then
core.log.error("decode [[", endpoint, "]] error : ", err or "")
end
local v = t[port_name]
core.log.info("get port: ", port_name, core.json.encode(v, true))
return v
end
local function sort_by_key_host(a, b)
return a.host < b.host
end
local function on_endpoint_deleted(endpoint)
shared_endpoints:delete(endpoint.metadata.name)
end
local function on_endpoint_modified(endpoint)
local subsets = endpoint.subsets
if subsets == nil or #subsets == 0 then
return on_endpoint_deleted(endpoint)
end
local subset = subsets[1]
local addresses = subset.addresses
if addresses == nil or #addresses == 0 then
return on_endpoint_deleted(endpoint)
end
local ports = subset.ports
if ports == nil or #ports == 0 then
return on_endpoint_deleted(endpoint)
end
local t = core.tablepool.fetch("k8s#endpoint#subsets", 0, #ports)
for _, port in ipairs(ports) do
local nodes = core.table.new(#addresses, 0)
for i, address in ipairs(addresses) do
nodes[i] = {
host = address.ip,
port = port.port,
weight = default_weight
}
end
core.table.sort(nodes, sort_by_key_host)
t[port.name] = nodes
end
core.log.info("save to cache : ", core.json.encode(t), "version : ", endpoint.metadata.resourceVersion)
shared_endpoints:safe_set(endpoint.metadata.name .. "#version", endpoint.metadata.resourceVersion)
shared_endpoints:safe_set(endpoint.metadata.name, core.json.encode(t, true))
core.tablepool.release("k8s#endpoint#subsets", t)
end
local function on_endpoint_added(endpoint)
local subsets = endpoint.subsets
if subsets == nil or #subsets == 0 then
return
end
local subset = subsets[1]
local addresses = subset.addresses
if addresses == nil or #addresses == 0 then
return
end
local ports = subset.ports
if ports == nil or #ports == 0 then
return
end
local t = core.tablepool.fetch("k8s#endpoint#subsets", 0, #ports)
for _, port in ipairs(ports) do
local nodes = core.table.new(#addresses, 0)
for i, address in ipairs(addresses) do
nodes[i] = {
host = address.ip,
port = port.port,
weight = default_weight
}
end
core.table.sort(nodes, sort_by_key_host)
t[port.name] = nodes
end
core.log.info("save to cache : ", core.json.encode(t), "version : ", endpoint.metadata.resourceVersion)
shared_endpoints:safe_set(endpoint.metadata.name .. "#version", endpoint.metadata.resourceVersion)
shared_endpoints:safe_set(endpoint.metadata.name, core.json.encode(t, true))
core.tablepool.release("k8s#endpoint#subsets", t)
end
local watch_resource_list = {{
version = "v1",
kind = "Endpoints",
listKind = "EndpointsList",
plural = "endpoints",
list_path = function(self)
return string.format("/api/v1/namespaces/%s/endpoints", namespace)
end,
list_query = function(self, continue)
if continue == nil or continue == "" then
return "limit=30"
else
return "limit=30&continue=" .. continue
end
end,
watch_path = function(self)
return string.format("/api/v1/namespaces/%s/endpoints", namespace)
end,
watch_query = function(self, timeout)
return string.format("watch=1&allowWatchBookmarks=true&timeoutSeconds=%d&resourceVersion=%d", timeout,
self.max_resource_version)
end,
max_resource_version = 0,
added_callback = function(self, object, drive)
on_endpoint_added(object)
end,
modified_callback = function(self, object)
on_endpoint_modified(object)
end,
deleted_callback = function(self, object)
on_endpoint_deleted(object)
end,
pre_list_callback = function(self)
shared_endpoints:flush_all()
end,
post_list_callback = function(self)
shared_endpoints:flush_expired()
end
}}
local watch_threads = {}
local function event_dispatch(resource, event, object, drive)
local rvstr = object.metadata.resourceVersion
if rvstr ~= nil then
local rv = tonumber(rvstr)
if resource.max_resource_version < rv then
resource.max_resource_version = rv
end
end
if event == "ADDED" then
resource:added_callback(object, drive)
elseif event == "MODIFIED" then
if object.deletionTimestamp ~= nil then
resource:deleted_callback(object)
else
resource:modified_callback(object)
end
elseif event == "DELETED" then
resource:deleted_callback(object)
elseif event == "BOOKMARK" then
-- do nothing because we had record max_resource_version to resource.max_resource_version
end
end
local function list_resource(httpc, resource, continue)
httpc:set_timeouts(2000, 2000, 3000)
local res, err = httpc:request({
path = resource:list_path(),
query = resource:list_query(),
headers = {
["Authorization"] = string.format("Bearer %s", token)
}
})
if not res then
return false, "RequestError", err or ""
end
if res.status ~= 200 then
return false, res.reason, res.read_body() or ""
end
local body, err = res:read_body()
if err then
return false, "ReadBodyError", err
end
local data, _ = core.json.decode(body)
if not data or data.kind ~= resource.listKind then
return false, "UnexpectedBody", body
end
local resource_version = data.metadata.resourceVersion
core.log.info("list resource version ", resource_version)
if resource_version ~= nil then
local rvv = tonumber(resource_version)
if rvv <= resource.max_resource_version then
return
end
resource.max_resource_version = rvv
end
for _, item in ipairs(data.items) do
event_dispatch(resource, "ADDED", item, "listing")
end
if data.metadata.continue ~= nil and data.metadata.continue ~= "" then
list_resource(httpc, resource, data.metadata.continue)
end
return true, "Success", ""
end
local function watch_resource(httpc, resource)
math.randomseed(process.get_master_pid())
local watch_seconds = 8800 + math.random(200, 1000)
local allowance_seconds = 100
httpc:set_timeouts(2000, 2000, (watch_seconds + allowance_seconds) * 1000)
local res, err = httpc:request({
path = resource:watch_path(),
query = resource:watch_query(watch_seconds),
headers = {
["Authorization"] = string.format("Bearer %s", token)
}
})
if err then
return false, "RequestError", err
end
if res.status ~= 200 then
return false, res.reason, res.read_body and res.read_body()
end
local remaindBody = ""
local body = ""
local reader = res.body_reader
local gmatchIterator;
local captures;
local capturedSize = 0
while true do
body, err = reader()
if err then
return false, "ReadBodyError", err
end
if not body then
break
end
if #remaindBody ~= 0 then
body = remaindBody .. body
end
gmatchIterator, err = ngx.re.gmatch(body, "{\"type\":.*}\n", "jiao")
if not gmatchIterator then
return false, "GmatchError", err
end
while true do
captures, err = gmatchIterator()
if err then
return false, "GmatchError", err
end
if not captures then
break
end
capturedSize = capturedSize + #captures[0]
local v, _ = core.json.decode(captures[0])
if not v or not v.object or v.object.kind ~= resource.kind then
return false, "UnexpectedBody", captures[0]
end
event_dispatch(resource, v.type, v.object, "watching")
end
if capturedSize == #body then
remaindBody = ""
elseif capturedSize == 0 then
remaindBody = body
else
remaindBody = string.sub(body, capturedSize + 1)
end
end
watch_resource(httpc, resource)
end
local function fetch_resource(resource)
while true do
local intervalTime = 0
local reason, message = "", ""
local ok = false
repeat
local httpc = http.new()
resource.watch_state = "connecting"
core.log.info("begin to connect ", resource.plural)
ok, message = httpc:connect({
scheme = "https",
host = apiserver_host,
port = tonumber(apiserver_port),
ssl_verify = false
})
if not ok then
resource.watch_state = "connecting"
core.log
.error("connect failed, resource: ", resource.plural, " reason: ", reason, "message : ", message)
intervalTime = 200
break
end
core.log.info("begin to list ", resource.plural)
resource.watch_state = "listing"
resource:pre_list_callback()
ok, reason, message = list_resource(httpc, resource)
if not ok then
resource.watch_state = "listing failed"
core.log.error("list failed, resource: ", resource.plural, " reason: ", reason, "message : ", message)
intervalTime = 100
break
end
resource.watch_state = "listing finished"
resource:post_list_callback()
core.log.info("begin to watch ", resource.plural)
resource.watch_state = "watching"
ok, reason, message = watch_resource(httpc, resource)
if not ok then
resource.watch_state = "watching failed"
core.log.error("watch failed, resource: ", resource.plural, " reason: ", reason, "message : ", message)
intervalTime = 100
break
end
resource.watch_state = "watching finished"
intervalTime = 0
until true
ngx.sleep(intervalTime)
end
end
local function fetch_resources()
for i, resource in ipairs(watch_resource_list) do
watch_threads[i] = ngx.thread.spawn(fetch_resource, resource)
end
ngx.thread.wait(core.table.unpack(watch_threads))
end
local function end_world(reason)
core.log.warn("send USER1 signal to master process [", process.get_master_pid(), "] for reopening log file")
-- -- local ok, err = signal.kill(process.get_master_pid(), signal.signum("USR1"))
-- if not ok then
-- core.log.error("failed to send USER1 signal for reopening log file: ", err)
-- end
end
local schema = {
type = "object",
properties = {},
additionalProperties = false
}
local _M = {
version = 0.1
}
function _M.check_schema(conf)
return true
end
function _M.nodes(service_name)
local pattern = "([a-z][a-z0-9-.]{0,})[:]([a-z][a-z0-9-.]{0,})$"
local match, err = ngx.re.match(service_name, pattern, "jiao")
if not match then
core.log.error("get nodes for service error: ", err or "")
return nil
end
local k8s_service_name = match[1]
local k8s_port_name = match[2]
local version, _, _ = shared_endpoints:get_stale(k8s_service_name .. "#version")
if not version then
core.log.error("get version: ", version)
return nil
end
core.log.error("get version: ", version)
return lrucache(service_name, version, create_nodes, k8s_service_name, k8s_port_name)
end
function _M.init_worker()
if process.type() ~= "privileged agent" then
return
end
local ok, err = core.schema.check(schema, local_conf.discovery.k8s)
if not ok then
error("invalid k8s discovery configuration: " .. err)
return
end
local err
namespace, err = util.read_file("/home/adugeek/Temp/namespace")
if err then
end_world(err)
return
end
core.log.info("here")
token, err = util.read_file("/home/adugeek/Temp/token")
if err then
end_world(err)
return
end
core.log.info("here")
-- apiserver_host = os.getenv("KUBERNETES_SERVICE_HOST")
-- apiserver_port = os.getenv("KUBERNETES_SERVICE_PORT")
apiserver_host = "127.0.0.1"
apiserver_port = "8001"
ngx_timer_at(0, fetch_resources)
end
return _M
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [apisix] adugeek commented on issue #4388: Another implementation of ingress controller
Posted by GitBox <gi...@apache.org>.
adugeek commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-903045705
It's not about how to send request and check response by "testing framework",
It's about how to create an k8s cluster ,build an apisix docker ,deploy and scale test service in "testing framework"
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [apisix] adugeek edited a comment on issue #4388: Another implementation of ingress controller
Posted by GitBox <gi...@apache.org>.
adugeek edited a comment on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-900796441
@tao12345666333 @tokers @lookbrook @zou8944
sorry, I will create pr in on week .
but , I need some one teach me how to create test cases
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [apisix] adugeek edited a comment on issue #4388: Another implementation of ingress controller
Posted by GitBox <gi...@apache.org>.
adugeek edited a comment on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-900796441
@tao12345666333 @tokers @lookbrook @zou8944
sorry, I will create pr in on week .
but , I need some one teach me how to create test cases
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [apisix] adugeek commented on issue #4388: Another implementation of ingress controller
Posted by GitBox <gi...@apache.org>.
adugeek commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-858216378
For Routes and Services, Upstreams, ...
First: we can deploy CRD, just like any other xController
Then, we can develop a "controller.lua" as apisix plugins.
Its function is list-watch crds then write to apisix.yaml
If "Routes, Services, Upstreams" changes frequently, cause heavy disk io, we can develop a "apisix/core/config_memory.lua",
usually, this is not needed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [apisix] adugeek commented on issue #4388: Another implementation of ingress controller
Posted by GitBox <gi...@apache.org>.
adugeek commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-857652609
> I mean, too frequent endpoints change will make the disk io heavy.
I understand your concerns.
But what I mean is that changes in enpoints will be sensed by k8s discvery and written to ngx.shared.DICT. but not to apisix.yaml.
And no disk io
Talk is cheap. Let's look at the code
```lua
local local_conf = require("apisix.core.config_local").local_conf()
local ipairs = ipairs
local ngx = ngx
local string = string
local tonumber = tonumber
local math = math
local ngx_timer_at = ngx.timer.at
local process = require("ngx.process")
local core = require("apisix.core")
local util = require("apisix.cli.util")
local http = require("resty.http")
local shared_endpoints = ngx.shared.discovery
local namespace = ""
local token = ""
local apiserver_host = ""
local apiserver_port = ""
local default_weight = 50
local lrucache = core.lrucache.new({
ttl = 300,
count = 1024
})
local function create_nodes(service_name, port_name)
local endpoint, _, _ = shared_endpoints:get_stale(service_name)
core.log.error("get endpoint: ", endpoint)
if not endpoint then
return nil
end
local t, err = core.json.decode(endpoint)
if not t then
core.log.error("decode [[", endpoint, "]] error : ", err or "")
end
local v = t[port_name]
core.log.info("get port: ", port_name, core.json.encode(v, true))
return v
end
local function sort_by_key_host(a, b)
return a.host < b.host
end
local function on_endpoint_deleted(endpoint)
shared_endpoints:delete(endpoint.metadata.name)
end
local function on_endpoint_modified(endpoint)
local subsets = endpoint.subsets
if subsets == nil or #subsets == 0 then
return on_endpoint_deleted(endpoint)
end
local subset = subsets[1]
local addresses = subset.addresses
if addresses == nil or #addresses == 0 then
return on_endpoint_deleted(endpoint)
end
local ports = subset.ports
if ports == nil or #ports == 0 then
return on_endpoint_deleted(endpoint)
end
local t = core.tablepool.fetch("k8s#endpoint#subsets", 0, #ports)
for _, port in ipairs(ports) do
local nodes = core.table.new(#addresses, 0)
for i, address in ipairs(addresses) do
nodes[i] = {
host = address.ip,
port = port.port,
weight = default_weight
}
end
core.table.sort(nodes, sort_by_key_host)
t[port.name] = nodes
end
core.log.info("save to cache : ", core.json.encode(t), "version : ", endpoint.metadata.resourceVersion)
shared_endpoints:safe_set(endpoint.metadata.name .. "#version", endpoint.metadata.resourceVersion)
shared_endpoints:safe_set(endpoint.metadata.name, core.json.encode(t, true))
core.tablepool.release("k8s#endpoint#subsets", t)
end
local function on_endpoint_added(endpoint)
local subsets = endpoint.subsets
if subsets == nil or #subsets == 0 then
return
end
local subset = subsets[1]
local addresses = subset.addresses
if addresses == nil or #addresses == 0 then
return
end
local ports = subset.ports
if ports == nil or #ports == 0 then
return
end
local t = core.tablepool.fetch("k8s#endpoint#subsets", 0, #ports)
for _, port in ipairs(ports) do
local nodes = core.table.new(#addresses, 0)
for i, address in ipairs(addresses) do
nodes[i] = {
host = address.ip,
port = port.port,
weight = default_weight
}
end
core.table.sort(nodes, sort_by_key_host)
t[port.name] = nodes
end
core.log.info("save to cache : ", core.json.encode(t), "version : ", endpoint.metadata.resourceVersion)
shared_endpoints:safe_set(endpoint.metadata.name .. "#version", endpoint.metadata.resourceVersion)
shared_endpoints:safe_set(endpoint.metadata.name, core.json.encode(t, true))
core.tablepool.release("k8s#endpoint#subsets", t)
end
local watch_resource_list = {{
version = "v1",
kind = "Endpoints",
listKind = "EndpointsList",
plural = "endpoints",
list_path = function(self)
return string.format("/api/v1/namespaces/%s/endpoints", namespace)
end,
list_query = function(self, continue)
if continue == nil or continue == "" then
return "limit=30"
else
return "limit=30&continue=" .. continue
end
end,
watch_path = function(self)
return string.format("/api/v1/namespaces/%s/endpoints", namespace)
end,
watch_query = function(self, timeout)
return string.format("watch=1&allowWatchBookmarks=true&timeoutSeconds=%d&resourceVersion=%d", timeout,
self.max_resource_version)
end,
max_resource_version = 0,
added_callback = function(self, object, drive)
on_endpoint_added(object)
end,
modified_callback = function(self, object)
on_endpoint_modified(object)
end,
deleted_callback = function(self, object)
on_endpoint_deleted(object)
end,
pre_list_callback = function(self)
shared_endpoints:flush_all()
end,
post_list_callback = function(self)
shared_endpoints:flush_expired()
end
}}
local watch_threads = {}
local function event_dispatch(resource, event, object, drive)
local rvstr = object.metadata.resourceVersion
if rvstr ~= nil then
local rv = tonumber(rvstr)
if resource.max_resource_version < rv then
resource.max_resource_version = rv
end
end
if event == "ADDED" then
resource:added_callback(object, drive)
elseif event == "MODIFIED" then
if object.deletionTimestamp ~= nil then
resource:deleted_callback(object)
else
resource:modified_callback(object)
end
elseif event == "DELETED" then
resource.deleted_callback(object)
elseif event == "BOOKMARK" then
-- do nothing because we had record max_resource_version to resource.max_resource_version
end
end
local function list_resource(httpc, resource, continue)
httpc:set_timeouts(2000, 2000, 3000)
local res, err = httpc:request({
path = resource:list_path(),
query = resource:list_query(),
headers = {
["Authorization"] = string.format("Bearer %s", token)
}
})
if not res then
return false, "RequestError", err or ""
end
if res.status ~= 200 then
return false, res.reason, res.read_body() or ""
end
local body, err = res:read_body()
if err then
return false, "ReadBodyError", err
end
local data, _ = core.json.decode(body)
if not data or data.kind ~= resource.listKind then
return false, "UnexpectedBody", body
end
local resource_version = data.metadata.resourceVersion
core.log.info("list resource version ", resource_version)
if resource_version ~= nil then
local rvv = tonumber(resource_version)
if rvv <= resource.max_resource_version then
return
end
resource.max_resource_version = rvv
end
for _, item in ipairs(data.items) do
event_dispatch(resource, "ADDED", item, "listing")
end
if data.metadata.continue ~= nil and data.metadata.continue ~= "" then
list_resource(httpc, resource, data.metadata.continue)
end
return true, "Success", ""
end
local function watch_resource(httpc, resource)
math.randomseed(process.get_master_pid())
local watch_seconds = 8800 + math.random(200, 1000)
local allowance_seconds = 100
httpc:set_timeouts(2000, 2000, (watch_seconds + allowance_seconds) * 1000)
local res, err = httpc:request({
path = resource:watch_path(),
query = resource:watch_query(watch_seconds),
headers = {
["Authorization"] = string.format("Bearer %s", token)
}
})
if err then
return false, "RequestError", err
end
if res.status ~= 200 then
return false, res.reason, res.read_body and res.read_body()
end
local remaindBody = ""
local body = ""
local reader = res.body_reader
local gmatchIterator;
local captures;
local capturedSize = 0
while true do
body, err = reader()
if err then
return false, "ReadBodyError", err
end
if not body then
break
end
if #remaindBody ~= 0 then
body = remaindBody .. body
end
gmatchIterator, err = ngx.re.gmatch(body, "{\"type\":.*}\n", "jiao")
if not gmatchIterator then
return false, "GmatchError", err
end
while true do
captures, err = gmatchIterator()
if err then
return false, "GmatchError", err
end
if not captures then
break
end
capturedSize = capturedSize + #captures[0]
local v, _ = core.json.decode(captures[0])
if not v or not v.object or v.object.kind ~= resource.kind then
return false, "UnexpectedBody", captures[0]
end
event_dispatch(resource, v.type, v.object, "watching")
end
if capturedSize == #body then
remaindBody = ""
elseif capturedSize == 0 then
remaindBody = body
else
remaindBody = string.sub(body, capturedSize + 1)
end
end
watch_resource(httpc, resource)
end
local function fetch_resource(resource)
while true do
local intervalTime = 0
local reason, message = "", ""
local ok = false
repeat
local httpc = http.new()
resource.watch_state = "connecting"
core.log.info("begin to connect ", resource.plural)
ok, message = httpc:connect({
scheme = "https",
host = apiserver_host,
port = tonumber(apiserver_port),
ssl_verify = false
})
if not ok then
resource.watch_state = "connecting"
core.log
.error("connect failed, resource: ", resource.plural, " reason: ", reason, "message : ", message)
intervalTime = 200
break
end
core.log.info("begin to list ", resource.plural)
resource.watch_state = "listing"
resource:pre_list_callback()
ok, reason, message = list_resource(httpc, resource)
if not ok then
resource.watch_state = "listing failed"
core.log.error("list failed, resource: ", resource.plural, " reason: ", reason, "message : ", message)
intervalTime = 100
break
end
resource.watch_state = "listing finished"
resource:post_list_callback()
core.log.info("begin to watch ", resource.plural)
resource.watch_state = "watching"
ok, reason, message = watch_resource(httpc, resource)
if not ok then
resource.watch_state = "watching failed"
core.log.error("watch failed, resource: ", resource.plural, " reason: ", reason, "message : ", message)
intervalTime = 100
break
end
resource.watch_state = "watching finished"
intervalTime = 0
until true
ngx.sleep(intervalTime)
end
end
local function fetch_resources()
for i, resource in ipairs(watch_resource_list) do
watch_threads[i] = ngx.thread.spawn(fetch_resource, resource)
end
ngx.thread.wait(core.table.unpack(watch_threads))
end
local function end_world(reason)
core.log.warn("send USER1 signal to master process [", process.get_master_pid(), "] for reopening log file")
-- -- local ok, err = signal.kill(process.get_master_pid(), signal.signum("USR1"))
-- if not ok then
-- core.log.error("failed to send USER1 signal for reopening log file: ", err)
-- end
end
local schema = {
type = "object",
properties = {},
additionalProperties = false
}
local _M = {
version = 0.1
}
function _M.check_schema(conf)
return true
end
function _M.nodes(service_name)
local pattern = "([a-z][a-z0-9-.]{0,})[:]([a-z][a-z0-9-.]{0,})$"
local match, err = ngx.re.match(service_name, pattern, "jiao")
if not match then
core.log.error("get nodes for service error: ", err or "")
return nil
end
local k8s_service_name = match[1]
local k8s_port_name = match[2]
local version, _, _ = shared_endpoints:get_stale(k8s_service_name .. "#version")
if not version then
core.log.error("get version: ", version)
return nil
end
core.log.error("get version: ", version)
return lrucache(service_name, version, create_nodes, k8s_service_name, k8s_port_name)
end
function _M.init_worker()
if process.type() ~= "privileged agent" then
return
end
local ok, err = core.schema.check(schema, local_conf.discovery.k8s)
if not ok then
error("invalid k8s discovery configuration: " .. err)
return
end
local err
namespace, err = util.read_file("/home/adugeek/Temp/namespace")
if err then
end_world(err)
return
end
core.log.info("here")
token, err = util.read_file("/home/adugeek/Temp/token")
if err then
end_world(err)
return
end
core.log.info("here")
-- apiserver_host = os.getenv("KUBERNETES_SERVICE_HOST")
-- apiserver_port = os.getenv("KUBERNETES_SERVICE_PORT")
apiserver_host = "127.0.0.1"
apiserver_port = "8001"
ngx_timer_at(0, fetch_resources)
end
return _M
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [apisix] tokers commented on issue #4388: Another implementation of ingress controller
Posted by GitBox <gi...@apache.org>.
tokers commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-858197762
All right, I thought you were going to write all configurations into apisix.yaml. It seems like you want to modify APISIX so that it can communicate with Kube API Server.
So how do we should handle other data? Like Routes and Consumers?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [apisix] tokers commented on issue #4388: Another implementation of ingress controller
Posted by GitBox <gi...@apache.org>.
tokers commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-858480685
Well, this way might be clean and native if we implement the controller by Lua, but not a good idea from the ecosystem incorporation. We cannot enjoy the convenience of Kubernetes-related tools, packages, and even their community members.
cc @tao12345666333
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [apisix] tokers commented on issue #4388: Another implementation of ingress controller
Posted by GitBox <gi...@apache.org>.
tokers commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-857299316
@adugeek Actually we considered this plan but the biggest obstacle is we have to do many disk IO operations, especially for the Service endpoints change.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [apisix] tokers commented on issue #4388: Another implementation of ingress controller
Posted by GitBox <gi...@apache.org>.
tokers commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-899031680
> Not yet, under testing now.
@adugeek Any updates?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [apisix] zou8944 commented on issue #4388: Another implementation of ingress controller
Posted by GitBox <gi...@apache.org>.
zou8944 commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-898989866
I think it is useful too, so how is it going.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [apisix] adugeek edited a comment on issue #4388: Another implementation of ingress controller
Posted by GitBox <gi...@apache.org>.
adugeek edited a comment on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-857652609
> I mean, too frequent endpoints change will make the disk io heavy.
I understand your concerns.
But what I mean is that changes in enpoints will be sensed by k8s discvery and written to ngx.shared.DICT. but not write to apisix.yaml .
There is no disk io here.
Talk is cheap. Let's look at the code
```lua
local local_conf = require("apisix.core.config_local").local_conf()
local ipairs = ipairs
local ngx = ngx
local string = string
local tonumber = tonumber
local math = math
local ngx_timer_at = ngx.timer.at
local process = require("ngx.process")
local core = require("apisix.core")
local util = require("apisix.cli.util")
local http = require("resty.http")
local shared_endpoints = ngx.shared.discovery
local namespace = ""
local token = ""
local apiserver_host = ""
local apiserver_port = ""
local default_weight = 50
local lrucache = core.lrucache.new({
ttl = 300,
count = 1024
})
local function create_nodes(service_name, port_name)
local endpoint, _, _ = shared_endpoints:get_stale(service_name)
core.log.error("get endpoint: ", endpoint)
if not endpoint then
return nil
end
local t, err = core.json.decode(endpoint)
if not t then
core.log.error("decode [[", endpoint, "]] error : ", err or "")
end
local v = t[port_name]
core.log.info("get port: ", port_name, core.json.encode(v, true))
return v
end
local function sort_by_key_host(a, b)
return a.host < b.host
end
local function on_endpoint_deleted(endpoint)
shared_endpoints:delete(endpoint.metadata.name)
end
local function on_endpoint_modified(endpoint)
local subsets = endpoint.subsets
if subsets == nil or #subsets == 0 then
return on_endpoint_deleted(endpoint)
end
local subset = subsets[1]
local addresses = subset.addresses
if addresses == nil or #addresses == 0 then
return on_endpoint_deleted(endpoint)
end
local ports = subset.ports
if ports == nil or #ports == 0 then
return on_endpoint_deleted(endpoint)
end
local t = core.tablepool.fetch("k8s#endpoint#subsets", 0, #ports)
for _, port in ipairs(ports) do
local nodes = core.table.new(#addresses, 0)
for i, address in ipairs(addresses) do
nodes[i] = {
host = address.ip,
port = port.port,
weight = default_weight
}
end
core.table.sort(nodes, sort_by_key_host)
t[port.name] = nodes
end
core.log.info("save to cache : ", core.json.encode(t), "version : ", endpoint.metadata.resourceVersion)
shared_endpoints:safe_set(endpoint.metadata.name .. "#version", endpoint.metadata.resourceVersion)
shared_endpoints:safe_set(endpoint.metadata.name, core.json.encode(t, true))
core.tablepool.release("k8s#endpoint#subsets", t)
end
local function on_endpoint_added(endpoint)
local subsets = endpoint.subsets
if subsets == nil or #subsets == 0 then
return
end
local subset = subsets[1]
local addresses = subset.addresses
if addresses == nil or #addresses == 0 then
return
end
local ports = subset.ports
if ports == nil or #ports == 0 then
return
end
local t = core.tablepool.fetch("k8s#endpoint#subsets", 0, #ports)
for _, port in ipairs(ports) do
local nodes = core.table.new(#addresses, 0)
for i, address in ipairs(addresses) do
nodes[i] = {
host = address.ip,
port = port.port,
weight = default_weight
}
end
core.table.sort(nodes, sort_by_key_host)
t[port.name] = nodes
end
core.log.info("save to cache : ", core.json.encode(t), "version : ", endpoint.metadata.resourceVersion)
shared_endpoints:safe_set(endpoint.metadata.name .. "#version", endpoint.metadata.resourceVersion)
shared_endpoints:safe_set(endpoint.metadata.name, core.json.encode(t, true))
core.tablepool.release("k8s#endpoint#subsets", t)
end
local watch_resource_list = {{
version = "v1",
kind = "Endpoints",
listKind = "EndpointsList",
plural = "endpoints",
list_path = function(self)
return string.format("/api/v1/namespaces/%s/endpoints", namespace)
end,
list_query = function(self, continue)
if continue == nil or continue == "" then
return "limit=30"
else
return "limit=30&continue=" .. continue
end
end,
watch_path = function(self)
return string.format("/api/v1/namespaces/%s/endpoints", namespace)
end,
watch_query = function(self, timeout)
return string.format("watch=1&allowWatchBookmarks=true&timeoutSeconds=%d&resourceVersion=%d", timeout,
self.max_resource_version)
end,
max_resource_version = 0,
added_callback = function(self, object, drive)
on_endpoint_added(object)
end,
modified_callback = function(self, object)
on_endpoint_modified(object)
end,
deleted_callback = function(self, object)
on_endpoint_deleted(object)
end,
pre_list_callback = function(self)
shared_endpoints:flush_all()
end,
post_list_callback = function(self)
shared_endpoints:flush_expired()
end
}}
local watch_threads = {}
local function event_dispatch(resource, event, object, drive)
local rvstr = object.metadata.resourceVersion
if rvstr ~= nil then
local rv = tonumber(rvstr)
if resource.max_resource_version < rv then
resource.max_resource_version = rv
end
end
if event == "ADDED" then
resource:added_callback(object, drive)
elseif event == "MODIFIED" then
if object.deletionTimestamp ~= nil then
resource:deleted_callback(object)
else
resource:modified_callback(object)
end
elseif event == "DELETED" then
resource.deleted_callback(object)
elseif event == "BOOKMARK" then
-- do nothing because we had record max_resource_version to resource.max_resource_version
end
end
local function list_resource(httpc, resource, continue)
httpc:set_timeouts(2000, 2000, 3000)
local res, err = httpc:request({
path = resource:list_path(),
query = resource:list_query(),
headers = {
["Authorization"] = string.format("Bearer %s", token)
}
})
if not res then
return false, "RequestError", err or ""
end
if res.status ~= 200 then
return false, res.reason, res.read_body() or ""
end
local body, err = res:read_body()
if err then
return false, "ReadBodyError", err
end
local data, _ = core.json.decode(body)
if not data or data.kind ~= resource.listKind then
return false, "UnexpectedBody", body
end
local resource_version = data.metadata.resourceVersion
core.log.info("list resource version ", resource_version)
if resource_version ~= nil then
local rvv = tonumber(resource_version)
if rvv <= resource.max_resource_version then
return
end
resource.max_resource_version = rvv
end
for _, item in ipairs(data.items) do
event_dispatch(resource, "ADDED", item, "listing")
end
if data.metadata.continue ~= nil and data.metadata.continue ~= "" then
list_resource(httpc, resource, data.metadata.continue)
end
return true, "Success", ""
end
local function watch_resource(httpc, resource)
math.randomseed(process.get_master_pid())
local watch_seconds = 8800 + math.random(200, 1000)
local allowance_seconds = 100
httpc:set_timeouts(2000, 2000, (watch_seconds + allowance_seconds) * 1000)
local res, err = httpc:request({
path = resource:watch_path(),
query = resource:watch_query(watch_seconds),
headers = {
["Authorization"] = string.format("Bearer %s", token)
}
})
if err then
return false, "RequestError", err
end
if res.status ~= 200 then
return false, res.reason, res.read_body and res.read_body()
end
local remaindBody = ""
local body = ""
local reader = res.body_reader
local gmatchIterator;
local captures;
local capturedSize = 0
while true do
body, err = reader()
if err then
return false, "ReadBodyError", err
end
if not body then
break
end
if #remaindBody ~= 0 then
body = remaindBody .. body
end
gmatchIterator, err = ngx.re.gmatch(body, "{\"type\":.*}\n", "jiao")
if not gmatchIterator then
return false, "GmatchError", err
end
while true do
captures, err = gmatchIterator()
if err then
return false, "GmatchError", err
end
if not captures then
break
end
capturedSize = capturedSize + #captures[0]
local v, _ = core.json.decode(captures[0])
if not v or not v.object or v.object.kind ~= resource.kind then
return false, "UnexpectedBody", captures[0]
end
event_dispatch(resource, v.type, v.object, "watching")
end
if capturedSize == #body then
remaindBody = ""
elseif capturedSize == 0 then
remaindBody = body
else
remaindBody = string.sub(body, capturedSize + 1)
end
end
watch_resource(httpc, resource)
end
local function fetch_resource(resource)
while true do
local intervalTime = 0
local reason, message = "", ""
local ok = false
repeat
local httpc = http.new()
resource.watch_state = "connecting"
core.log.info("begin to connect ", resource.plural)
ok, message = httpc:connect({
scheme = "https",
host = apiserver_host,
port = tonumber(apiserver_port),
ssl_verify = false
})
if not ok then
resource.watch_state = "connecting"
core.log
.error("connect failed, resource: ", resource.plural, " reason: ", reason, "message : ", message)
intervalTime = 200
break
end
core.log.info("begin to list ", resource.plural)
resource.watch_state = "listing"
resource:pre_list_callback()
ok, reason, message = list_resource(httpc, resource)
if not ok then
resource.watch_state = "listing failed"
core.log.error("list failed, resource: ", resource.plural, " reason: ", reason, "message : ", message)
intervalTime = 100
break
end
resource.watch_state = "listing finished"
resource:post_list_callback()
core.log.info("begin to watch ", resource.plural)
resource.watch_state = "watching"
ok, reason, message = watch_resource(httpc, resource)
if not ok then
resource.watch_state = "watching failed"
core.log.error("watch failed, resource: ", resource.plural, " reason: ", reason, "message : ", message)
intervalTime = 100
break
end
resource.watch_state = "watching finished"
intervalTime = 0
until true
ngx.sleep(intervalTime)
end
end
local function fetch_resources()
for i, resource in ipairs(watch_resource_list) do
watch_threads[i] = ngx.thread.spawn(fetch_resource, resource)
end
ngx.thread.wait(core.table.unpack(watch_threads))
end
local function end_world(reason)
core.log.warn("send USER1 signal to master process [", process.get_master_pid(), "] for reopening log file")
-- -- local ok, err = signal.kill(process.get_master_pid(), signal.signum("USR1"))
-- if not ok then
-- core.log.error("failed to send USER1 signal for reopening log file: ", err)
-- end
end
local schema = {
type = "object",
properties = {},
additionalProperties = false
}
local _M = {
version = 0.1
}
function _M.check_schema(conf)
return true
end
function _M.nodes(service_name)
local pattern = "([a-z][a-z0-9-.]{0,})[:]([a-z][a-z0-9-.]{0,})$"
local match, err = ngx.re.match(service_name, pattern, "jiao")
if not match then
core.log.error("get nodes for service error: ", err or "")
return nil
end
local k8s_service_name = match[1]
local k8s_port_name = match[2]
local version, _, _ = shared_endpoints:get_stale(k8s_service_name .. "#version")
if not version then
core.log.error("get version: ", version)
return nil
end
core.log.error("get version: ", version)
return lrucache(service_name, version, create_nodes, k8s_service_name, k8s_port_name)
end
function _M.init_worker()
if process.type() ~= "privileged agent" then
return
end
local ok, err = core.schema.check(schema, local_conf.discovery.k8s)
if not ok then
error("invalid k8s discovery configuration: " .. err)
return
end
local err
namespace, err = util.read_file("/home/adugeek/Temp/namespace")
if err then
end_world(err)
return
end
core.log.info("here")
token, err = util.read_file("/home/adugeek/Temp/token")
if err then
end_world(err)
return
end
core.log.info("here")
-- apiserver_host = os.getenv("KUBERNETES_SERVICE_HOST")
-- apiserver_port = os.getenv("KUBERNETES_SERVICE_PORT")
apiserver_host = "127.0.0.1"
apiserver_port = "8001"
ngx_timer_at(0, fetch_resources)
end
return _M
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [apisix] adugeek commented on issue #4388: Another implementation of ingress controller
Posted by GitBox <gi...@apache.org>.
adugeek commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-860195520
I had commit this implement.
link is [https://github.com/adugeek/ApisixController](url)
@tokers
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [apisix] adugeek commented on issue #4388: Another implementation of ingress controller
Posted by GitBox <gi...@apache.org>.
adugeek commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-900796441
@tao12345666333 @tokers @lookbrook @zou8944
sorry, I will create pr in with on week .
but , I need some one teach me how to create test cases
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [apisix] tokers commented on issue #4388: Another implementation of ingress controller
Posted by GitBox <gi...@apache.org>.
tokers commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-900970828
> @tao12345666333 @tokers @lookbrook @zou8944
> sorry, I will create pr in on week .
> but , I need some one teach me how to create test cases
See https://github.com/apache/apisix/blob/master/docs/en/latest/internal/testing-framework.md for details.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [apisix] tokers commented on issue #4388: Another implementation of ingress controller
Posted by GitBox <gi...@apache.org>.
tokers commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-900970828
> @tao12345666333 @tokers @lookbrook @zou8944
> sorry, I will create pr in on week .
> but , I need some one teach me how to create test cases
See https://github.com/apache/apisix/blob/master/docs/en/latest/internal/testing-framework.md for details.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [apisix] adugeek commented on issue #4388: Another implementation of ingress controller
Posted by GitBox <gi...@apache.org>.
adugeek commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-860531094
Not yet, under testing now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [apisix] lookbrook commented on issue #4388: Another implementation of ingress controller
Posted by GitBox <gi...@apache.org>.
lookbrook commented on issue #4388:
URL: https://github.com/apache/apisix/issues/4388#issuecomment-880543329
The k8s endpoints discovery is very useful,I think it should be merge into apisix
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [apisix] zhixiongdu027 closed issue #4388: Another implementation of ingress controller
Posted by GitBox <gi...@apache.org>.
zhixiongdu027 closed issue #4388:
URL: https://github.com/apache/apisix/issues/4388
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org