You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by me...@apache.org on 2020/06/22 06:21:50 UTC
[incubator-apisix] branch master updated: refactory: collect
`upstream` logic and put them in a single file. (#1734)
This is an automated email from the ASF dual-hosted git repository.
membphis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-apisix.git
The following commit(s) were added to refs/heads/master by this push:
new 748e337 refactory: collect `upstream` logic and put them in a single file. (#1734)
748e337 is described below
commit 748e33756bf2095c5baed67bd32e7b05f20377ae
Author: YuanSheng Wang <me...@gmail.com>
AuthorDate: Mon Jun 22 14:21:42 2020 +0800
refactory: collect `upstream` logic and put them in a single file. (#1734)
feature: support dynamic upstream in plugin.
here is a mini example in `access` phase of plugin:
```lua
local up_conf = {
type = "roundrobin",
nodes = {
{host = conf.upstream.ip, port = conf.upstream.port, weight = 1},
}
}
local ok, err = upstream.check_schema(up_conf)
if not ok then
return 500, err
end
local matched_route = ctx.matched_route
upstream.set(ctx, up_conf.type .. "#route_" .. matched_route.value.id,
ctx.conf_version, up_conf, matched_route)
return
```
---
apisix/balancer.lua | 93 +--------------------
apisix/init.lua | 65 ++++-----------
apisix/plugins/example-plugin.lua | 26 +++---
apisix/stream/plugins/mqtt-proxy.lua | 35 ++++----
apisix/upstream.lua | 154 +++++++++++++++++++++++++++++++++++
t/admin/balancer.t | 119 ++++++++++++---------------
t/node/not-exist-upstream.t | 2 +-
t/stream-plugin/mqtt-proxy.t | 3 -
8 files changed, 260 insertions(+), 237 deletions(-)
diff --git a/apisix/balancer.lua b/apisix/balancer.lua
index 9ba7185..8b544fe 100644
--- a/apisix/balancer.lua
+++ b/apisix/balancer.lua
@@ -19,15 +19,11 @@ local require = require
local discovery = require("apisix.discovery.init").discovery
local balancer = require("ngx.balancer")
local core = require("apisix.core")
-local error = error
-local pairs = pairs
local ipairs = ipairs
local tostring = tostring
-
local set_more_tries = balancer.set_more_tries
local get_last_failure = balancer.get_last_failure
local set_timeouts = balancer.set_timeouts
-local upstreams_etcd
local module_name = "balancer"
@@ -150,38 +146,10 @@ end
local function pick_server(route, ctx)
core.log.info("route: ", core.json.delay_encode(route, true))
core.log.info("ctx: ", core.json.delay_encode(ctx, true))
- local healthcheck_parent = route
- local up_id = route.value.upstream_id
- local up_conf = (route.dns_value and route.dns_value.upstream)
- or route.value.upstream
- if not up_id and not up_conf then
- return nil, nil, "missing upstream configuration"
- end
-
- local version
- local key
-
- if up_id then
- if not upstreams_etcd then
- return nil, nil, "need to create a etcd instance for fetching "
- .. "upstream information"
- end
-
- local up_obj = upstreams_etcd:get(tostring(up_id))
- if not up_obj then
- return nil, nil, "failed to find upstream by id: " .. up_id
- end
- core.log.info("upstream: ", core.json.delay_encode(up_obj))
-
- healthcheck_parent = up_obj
- up_conf = up_obj.dns_value or up_obj.value
- version = up_obj.modifiedIndex
- key = up_conf.type .. "#upstream_" .. up_id
-
- else
- version = ctx.conf_version
- key = up_conf.type .. "#route_" .. route.value.id
- end
+ local healthcheck_parent = ctx.upstream_healthcheck_parent
+ local up_conf = ctx.upstream_conf
+ local version = ctx.upstream_version
+ local key = ctx.upstream_key
if up_conf.service_name then
if not discovery then
@@ -277,59 +245,6 @@ end
function _M.init_worker()
- local err
- upstreams_etcd, err = core.config.new("/upstreams", {
- automatic = true,
- item_schema = core.schema.upstream,
- filter = function(upstream)
- upstream.has_domain = false
- if not upstream.value or not upstream.value.nodes then
- return
- end
-
- local nodes = upstream.value.nodes
- if core.table.isarray(nodes) then
- for _, node in ipairs(nodes) do
- local host = node.host
- if not core.utils.parse_ipv4(host) and
- not core.utils.parse_ipv6(host) then
- upstream.has_domain = true
- break
- end
- end
- else
- local new_nodes = core.table.new(core.table.nkeys(nodes), 0)
- for addr, weight in pairs(nodes) do
- local host, port = core.utils.parse_addr(addr)
- if not core.utils.parse_ipv4(host) and
- not core.utils.parse_ipv6(host) then
- upstream.has_domain = true
- end
- local node = {
- host = host,
- port = port,
- weight = weight,
- }
- core.table.insert(new_nodes, node)
- end
- upstream.value.nodes = new_nodes
- end
-
- core.log.info("filter upstream: ", core.json.delay_encode(upstream))
- end,
- })
- if not upstreams_etcd then
- error("failed to create etcd instance for fetching upstream: " .. err)
- return
- end
-end
-
-function _M.upstreams()
- if not upstreams_etcd then
- return nil, nil
- end
-
- return upstreams_etcd.values, upstreams_etcd.conf_version
end
return _M
diff --git a/apisix/init.lua b/apisix/init.lua
index be1afac..35f6076 100644
--- a/apisix/init.lua
+++ b/apisix/init.lua
@@ -22,6 +22,7 @@ local service_fetch = require("apisix.http.service").get
local admin_init = require("apisix.admin.init")
local get_var = require("resty.ngxvar").fetch
local router = require("apisix.router")
+local set_upstream = require("apisix.upstream").set_by_route
local ipmatcher = require("resty.ipmatcher")
local ngx = ngx
local get_method = ngx.req.get_method
@@ -92,6 +93,7 @@ function _M.http_init_worker()
end
require("apisix.debug").init_worker()
+ require("apisix.upstream").init_worker()
local local_conf = core.config.local_conf()
local dns_resolver_valid = local_conf and local_conf.apisix and
@@ -114,29 +116,6 @@ local function run_plugin(phase, plugins, api_ctx)
return api_ctx
end
- if phase == "balancer" then
- local balancer_name = api_ctx.balancer_name
- local balancer_plugin = api_ctx.balancer_plugin
- if balancer_name and balancer_plugin then
- local phase_fun = balancer_plugin[phase]
- phase_fun(balancer_plugin, api_ctx)
- return api_ctx
- end
-
- for i = 1, #plugins, 2 do
- local phase_fun = plugins[i][phase]
- if phase_fun and
- (not balancer_name or balancer_name == plugins[i].name) then
- phase_fun(plugins[i + 1], api_ctx)
- if api_ctx.balancer_name == plugins[i].name then
- api_ctx.balancer_plugin = plugins[i]
- return api_ctx
- end
- end
- end
- return api_ctx
- end
-
if phase ~= "log"
and phase ~= "header_filter"
and phase ~= "body_filter"
@@ -383,6 +362,12 @@ function _M.http_access_phase()
end
end
run_plugin("access", plugins, api_ctx)
+
+ local ok, err = set_upstream(route, api_ctx)
+ if not ok then
+ core.log.error("failed to parse upstream: ", err)
+ core.response.exit(500)
+ end
end
@@ -443,6 +428,8 @@ function _M.grpc_access_phase()
run_plugin("rewrite", plugins, api_ctx)
run_plugin("access", plugins, api_ctx)
+
+ set_upstream(route, api_ctx)
end
@@ -503,19 +490,6 @@ function _M.http_balancer_phase()
return core.response.exit(500)
end
- -- first time
- if not api_ctx.balancer_name then
- run_plugin("balancer", nil, api_ctx)
- if api_ctx.balancer_name then
- return
- end
- end
-
- if api_ctx.balancer_name and api_ctx.balancer_name ~= "default" then
- return run_plugin("balancer", nil, api_ctx)
- end
-
- api_ctx.balancer_name = "default"
load_balancer(api_ctx.matched_route, api_ctx)
end
@@ -615,7 +589,13 @@ function _M.stream_preread_phase()
api_ctx.plugins = plugin.stream_filter(matched_route, plugins)
-- core.log.info("valid plugins: ", core.json.delay_encode(plugins, true))
+ api_ctx.conf_type = "stream/route"
+ api_ctx.conf_version = matched_route.modifiedIndex
+ api_ctx.conf_id = matched_route.value.id
+
run_plugin("preread", plugins, api_ctx)
+
+ set_upstream(matched_route, api_ctx)
end
@@ -627,19 +607,6 @@ function _M.stream_balancer_phase()
return ngx_exit(1)
end
- -- first time
- if not api_ctx.balancer_name then
- run_plugin("balancer", nil, api_ctx)
- if api_ctx.balancer_name then
- return
- end
- end
-
- if api_ctx.balancer_name and api_ctx.balancer_name ~= "default" then
- return run_plugin("balancer", nil, api_ctx)
- end
-
- api_ctx.balancer_name = "default"
load_balancer(api_ctx.matched_route, api_ctx)
end
diff --git a/apisix/plugins/example-plugin.lua b/apisix/plugins/example-plugin.lua
index 025ade4..bf36837 100644
--- a/apisix/plugins/example-plugin.lua
+++ b/apisix/plugins/example-plugin.lua
@@ -15,7 +15,7 @@
-- limitations under the License.
--
local core = require("apisix.core")
-local balancer = require("ngx.balancer")
+local upstream = require("apisix.upstream")
local schema = {
type = "object",
@@ -60,25 +60,27 @@ end
function _M.access(conf, ctx)
core.log.warn("plugin access phase, conf: ", core.json.encode(conf))
-- return 200, {message = "hit example plugin"}
-end
-
-
-function _M.balancer(conf, ctx)
- core.log.warn("plugin balancer phase, conf: ", core.json.encode(conf))
if not conf.ip then
return
end
- -- NOTE: update `ctx.balancer_name` is important, APISIX will skip other
- -- balancer handler.
- ctx.balancer_name = plugin_name
+ local up_conf = {
+ type = "roundrobin",
+ nodes = {
+ {host = conf.ip, port = conf.port, weight = 1}
+ }
+ }
- local ok, err = balancer.set_current_peer(conf.ip, conf.port)
+ local ok, err = upstream.check_schema(up_conf)
if not ok then
- core.log.error("failed to set server peer: ", err)
- return core.response.exit(502)
+ return 500, err
end
+
+ local matched_route = ctx.matched_route
+ upstream.set(ctx, up_conf.type .. "#route_" .. matched_route.value.id,
+ ctx.conf_version, up_conf, matched_route)
+ return
end
diff --git a/apisix/stream/plugins/mqtt-proxy.lua b/apisix/stream/plugins/mqtt-proxy.lua
index f8d3552..b533430 100644
--- a/apisix/stream/plugins/mqtt-proxy.lua
+++ b/apisix/stream/plugins/mqtt-proxy.lua
@@ -15,8 +15,8 @@
-- limitations under the License.
--
local core = require("apisix.core")
-local balancer = require("ngx.balancer")
-local bit = require "bit"
+local upstream = require("apisix.upstream")
+local bit = require("bit")
local ngx = ngx
local ngx_exit = ngx.exit
local str_byte = string.byte
@@ -158,25 +158,28 @@ function _M.preread(conf, ctx)
end
core.log.info("mqtt client id: ", res.client_id)
-end
+ local up_conf = {
+ type = "roundrobin",
+ nodes = {
+ {host = conf.upstream.ip, port = conf.upstream.port, weight = 1},
+ }
+ }
-function _M.log(conf, ctx)
- core.log.info("plugin log phase, conf: ", core.json.encode(conf))
-end
+ local ok, err = upstream.check_schema(up_conf)
+ if not ok then
+ return 500, err
+ end
+ local matched_route = ctx.matched_route
+ upstream.set(ctx, up_conf.type .. "#route_" .. matched_route.value.id,
+ ctx.conf_version, up_conf, matched_route)
+ return
+end
-function _M.balancer(conf, ctx)
- core.log.info("plugin balancer phase, conf: ", core.json.encode(conf))
- -- ctx.balancer_name = plugin_name
- local up = conf.upstream
- ctx.balancer_name = plugin_name
- local ok, err = balancer.set_current_peer(up.ip, up.port)
- if not ok then
- core.log.error("failed to set server peer: ", err)
- return ngx_exit(1)
- end
+function _M.log(conf, ctx)
+ core.log.info("plugin log phase, conf: ", core.json.encode(conf))
end
diff --git a/apisix/upstream.lua b/apisix/upstream.lua
new file mode 100644
index 0000000..203b713
--- /dev/null
+++ b/apisix/upstream.lua
@@ -0,0 +1,154 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local core = require("apisix.core")
+local error = error
+local tostring = tostring
+local ipairs = ipairs
+local pairs = pairs
+local upstreams
+
+
+local _M = {}
+
+
+local function set_directly(ctx, key, ver, conf, parent)
+ if not ctx then
+ error("missing argument ctx", 2)
+ end
+ if not key then
+ error("missing argument key", 2)
+ end
+ if not ver then
+ error("missing argument ver", 2)
+ end
+ if not conf then
+ error("missing argument conf", 2)
+ end
+ if not parent then
+ error("missing argument parent", 2)
+ end
+
+ ctx.upstream_conf = conf
+ ctx.upstream_version = ver
+ ctx.upstream_key = key
+ ctx.upstream_healthcheck_parent = parent
+ return
+end
+_M.set = set_directly
+
+
+function _M.set_by_route(route, api_ctx)
+ if api_ctx.upstream_conf then
+ return true
+ end
+
+ local up_id = route.value.upstream_id
+ if up_id then
+ if not upstreams then
+ return false, "need to create a etcd instance for fetching "
+ .. "upstream information"
+ end
+
+ local up_obj = upstreams:get(tostring(up_id))
+ if not up_obj then
+ return false, "failed to find upstream by id: " .. up_id
+ end
+ core.log.info("upstream: ", core.json.delay_encode(up_obj))
+
+ local up_conf = up_obj.dns_value or up_obj.value
+ set_directly(api_ctx, up_conf.type .. "#upstream_" .. up_id,
+ up_obj.modifiedIndex, up_conf, up_obj)
+ return true
+ end
+
+ local up_conf = (route.dns_value and route.dns_value.upstream)
+ or route.value.upstream
+ if not up_conf then
+ return false, "missing upstream configuration in Route or Service"
+ end
+
+ set_directly(api_ctx, up_conf.type .. "#route_" .. route.value.id,
+ api_ctx.conf_version, up_conf, route)
+ return true
+end
+
+
+function _M.upstreams()
+ if not upstreams then
+ return nil, nil
+ end
+
+ return upstreams.values, upstreams.conf_version
+end
+
+
+function _M.check_schema(conf)
+ return core.schema.check(core.schema.upstream, conf)
+end
+
+
+function _M.init_worker()
+ local err
+ upstreams, err = core.config.new("/upstreams", {
+ automatic = true,
+ item_schema = core.schema.upstream,
+ filter = function(upstream)
+ upstream.has_domain = false
+ if not upstream.value or not upstream.value.nodes then
+ return
+ end
+
+ local nodes = upstream.value.nodes
+ if core.table.isarray(nodes) then
+ for _, node in ipairs(nodes) do
+ local host = node.host
+ if not core.utils.parse_ipv4(host) and
+ not core.utils.parse_ipv6(host) then
+ upstream.has_domain = true
+ break
+ end
+ end
+ else
+ local new_nodes = core.table.new(core.table.nkeys(nodes), 0)
+ for addr, weight in pairs(nodes) do
+ local host, port = core.utils.parse_addr(addr)
+ if not core.utils.parse_ipv4(host) and
+ not core.utils.parse_ipv6(host) then
+ upstream.has_domain = true
+ end
+ local node = {
+ host = host,
+ port = port,
+ weight = weight,
+ }
+ core.table.insert(new_nodes, node)
+ end
+ upstream.value.nodes = new_nodes
+ end
+
+ core.log.info("filter upstream: ", core.json.delay_encode(upstream))
+ end,
+ })
+ if not upstreams then
+ error("failed to create etcd instance for fetching upstream: " .. err)
+ return
+ end
+end
+
+
+return _M
diff --git a/t/admin/balancer.t b/t/admin/balancer.t
index 7054d22..c3ec573 100644
--- a/t/admin/balancer.t
+++ b/t/admin/balancer.t
@@ -26,18 +26,19 @@ add_block_preprocessor(sub {
my $init_by_lua_block = <<_EOC_;
require "resty.core"
apisix = require("apisix")
+ core = require("apisix.core")
apisix.http_init()
function test(route, ctx, count)
local balancer = require("apisix.balancer")
- local router = require("apisix.router")
- router.filter_test(route)
local res = {}
for i = 1, count or 12 do
local host, port, err = balancer.pick_server(route, ctx)
if err then
ngx.say("failed: ", err)
end
+
+ core.log.warn("host: ", host, " port: ", port)
res[host] = (res[host] or 0) + 1
end
@@ -63,20 +64,18 @@ __DATA__
--- config
location /t {
content_by_lua_block {
- local route = {
- value = {
- upstream = {
- nodes = {
- ["39.97.63.215:80"] = 1,
- ["39.97.63.216:81"] = 1,
- ["39.97.63.217:82"] = 1,
- },
- type = "roundrobin",
- },
- id = 1
- }
+ local up_conf = {
+ type = "roundrobin",
+ nodes = {
+ {host = "39.97.63.215", port = 80, weight = 1},
+ {host = "39.97.63.216", port = 81, weight = 1},
+ {host = "39.97.63.217", port = 82, weight = 1},
}
+ }
local ctx = {conf_version = 1}
+ ctx.upstream_conf = up_conf
+ ctx.upstream_version = "ver"
+ ctx.upstream_key = up_conf.type .. "#route_" .. "id"
test(route, ctx)
}
@@ -96,23 +95,18 @@ host: 39.97.63.217 count: 4
--- config
location /t {
content_by_lua_block {
- local core = require("apisix.core")
- local balancer = require("apisix.balancer")
-
- local route = {
- value = {
- upstream = {
- nodes = {
- ["39.97.63.215:80"] = 1,
- ["39.97.63.216:81"] = 2,
- ["39.97.63.217:82"] = 3,
- },
- type = "roundrobin",
- },
- id = 1
- }
+ local up_conf = {
+ type = "roundrobin",
+ nodes = {
+ {host = "39.97.63.215", port = 80, weight = 1},
+ {host = "39.97.63.216", port = 81, weight = 2},
+ {host = "39.97.63.217", port = 82, weight = 3},
}
+ }
local ctx = {conf_version = 1}
+ ctx.upstream_conf = up_conf
+ ctx.upstream_version = "ver"
+ ctx.upstream_key = up_conf.type .. "#route_" .. "id"
test(route, ctx)
}
@@ -132,33 +126,29 @@ host: 39.97.63.217 count: 6
--- config
location /t {
content_by_lua_block {
- local balancer = require("apisix.balancer")
-
- local route = {
- value = {
- upstream = {
- nodes = {
- ["39.97.63.215:80"] = 1,
- ["39.97.63.216:81"] = 1,
- ["39.97.63.217:82"] = 1,
- },
- type = "roundrobin",
- },
- id = 1
- }
+ local up_conf = {
+ type = "roundrobin",
+ nodes = {
+ {host = "39.97.63.215", port = 80, weight = 1},
+ {host = "39.97.63.216", port = 81, weight = 1},
+ {host = "39.97.63.217", port = 82, weight = 1},
}
- local ctx = {conf_version = 1}
+ }
+ local ctx = {}
+ ctx.upstream_conf = up_conf
+ ctx.upstream_version = 1
+ ctx.upstream_key = up_conf.type .. "#route_" .. "id"
test(route, ctx)
-- cached by version
- route.value.upstream.nodes = {
- ["39.97.63.218:83"] = 1,
+ up_conf.nodes = {
+ {host = "39.97.63.218", port = 80, weight = 1},
}
test(route, ctx)
-- update, version changed
- ctx = {conf_version = 2}
+ ctx.upstream_version = 2
test(route, ctx)
}
}
@@ -181,37 +171,32 @@ host: 39.97.63.218 count: 12
--- config
location /t {
content_by_lua_block {
- local route = {
- value = {
- upstream = {
- nodes = {
- ["39.97.63.215:80"] = 1,
- ["39.97.63.216:81"] = 1,
- ["39.97.63.217:82"] = 1,
- },
- type = "chash",
- key = "remote_addr",
- },
- id = 1
- }
+ local up_conf = {
+ type = "chash",
+ key = "remote_addr",
+ nodes = {
+ {host = "39.97.63.215", port = 80, weight = 1},
+ {host = "39.97.63.216", port = 81, weight = 1},
+ {host = "39.97.63.217", port = 82, weight = 1},
}
+ }
local ctx = {
- conf_version = 1,
- var = {
- remote_addr = "127.0.0.1"
- }
+ var = {remote_addr = "127.0.0.1"},
}
+ ctx.upstream_conf = up_conf
+ ctx.upstream_version = 1
+ ctx.upstream_key = up_conf.type .. "#route_" .. "id"
test(route, ctx)
-- cached by version
- route.value.upstream.nodes = {
- ["39.97.63.218:83"] = 1,
+ up_conf.nodes = {
+ {host = "39.97.63.218", port = 80, weight = 1},
}
test(route, ctx)
-- update, version changed
- ctx.conf_version = 2
+ ctx.upstream_version = 2
test(route, ctx)
}
}
diff --git a/t/node/not-exist-upstream.t b/t/node/not-exist-upstream.t
index a5a008d..73684ea 100644
--- a/t/node/not-exist-upstream.t
+++ b/t/node/not-exist-upstream.t
@@ -83,4 +83,4 @@ qr/502 Bad Gateway|500 Internal Server Error/
--- grep_error_log eval
qr/\[error\].*/
--- grep_error_log_out eval
-qr/failed to pick server: missing upstream configuration while connecting to upstream/
+qr/missing upstream configuration in Route or Service/
diff --git a/t/stream-plugin/mqtt-proxy.t b/t/stream-plugin/mqtt-proxy.t
index 7556fcd..82f5453 100644
--- a/t/stream-plugin/mqtt-proxy.t
+++ b/t/stream-plugin/mqtt-proxy.t
@@ -14,9 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-BEGIN {
- $ENV{TEST_NGINX_USE_HUP} = 1;
-}
use t::APISIX 'no_plan';