You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by to...@apache.org on 2021/05/06 02:30:56 UTC
[apisix] branch master updated: feat: support to use upstream_id in
stream_route (#4121)
This is an automated email from the ASF dual-hosted git repository.
tokers pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new 3a3874a feat: support to use upstream_id in stream_route (#4121)
3a3874a is described below
commit 3a3874a2cb438854af0c6612f7e4b477a5933f98
Author: Alex Zhang <zc...@gmail.com>
AuthorDate: Thu May 6 10:30:46 2021 +0800
feat: support to use upstream_id in stream_route (#4121)
---
apisix/init.lua | 78 +++++++++++++++++++++++++++++++++-----------------
t/stream-node/sanity.t | 67 +++++++++++++++++++++++++++++++++++++++++++
2 files changed, 119 insertions(+), 26 deletions(-)
diff --git a/apisix/init.lua b/apisix/init.lua
index e090377..6225fcc 100644
--- a/apisix/init.lua
+++ b/apisix/init.lua
@@ -42,12 +42,16 @@ local str_byte = string.byte
local str_sub = string.sub
local tonumber = tonumber
local control_api_router
+
+local is_http = false
if ngx.config.subsystem == "http" then
+ is_http = true
control_api_router = require("apisix.control.router")
end
+
local load_balancer
local local_conf
-local ver_header = "APISIX/" .. core.version.VERSION
+local ver_header = "APISIX/" .. core.version.VERSION
local _M = {version = 0.4}
@@ -118,7 +122,7 @@ function _M.http_init_worker()
end
require("apisix.debug").init_worker()
- require("apisix.upstream").init_worker()
+ apisix_upstream.init_worker()
require("apisix.plugins.ext-plugin.init").init_worker()
local_conf = core.config.local_conf()
@@ -254,6 +258,38 @@ local function set_upstream_host(api_ctx)
end
+local function get_upstream_by_id(up_id)
+ local upstreams = core.config.fetch_created_obj("/upstreams")
+ if upstreams then
+ local upstream = upstreams:get(tostring(up_id))
+ if not upstream then
+ core.log.error("failed to find upstream by id: " .. up_id)
+ if is_http then
+ return core.response.exit(502)
+ end
+
+ return ngx_exit(1)
+ end
+
+ if upstream.has_domain then
+ local err
+ upstream, err = parse_domain_in_up(upstream)
+ if err then
+ core.log.error("failed to get resolved upstream: ", err)
+ if is_http then
+ return core.response.exit(500)
+ end
+
+ return ngx_exit(1)
+ end
+ end
+
+ core.log.info("parsed upstream: ", core.json.delay_encode(upstream))
+ return upstream.dns_value or upstream.value
+ end
+end
+
+
function _M.http_access_phase()
local ngx_ctx = ngx.ctx
@@ -385,30 +421,12 @@ function _M.http_access_phase()
end
if up_id then
- local upstreams = core.config.fetch_created_obj("/upstreams")
- if upstreams then
- local upstream = upstreams:get(tostring(up_id))
- if not upstream then
- core.log.error("failed to find upstream by id: " .. up_id)
- return core.response.exit(502)
- end
-
- if upstream.has_domain then
- local err
- upstream, err = parse_domain_in_up(upstream)
- if err then
- core.log.error("failed to get resolved upstream: ", err)
- return core.response.exit(500)
- end
- end
-
- if upstream.value.pass_host then
- api_ctx.pass_host = upstream.value.pass_host
- api_ctx.upstream_host = upstream.value.upstream_host
- end
+ local upstream = get_upstream_by_id(up_id)
+ api_ctx.matched_upstream = upstream
- core.log.info("parsed upstream: ", core.json.delay_encode(upstream))
- api_ctx.matched_upstream = upstream.dns_value or upstream.value
+ if upstream and upstream.pass_host then
+ api_ctx.pass_host = upstream.pass_host
+ api_ctx.upstream_host = upstream.upstream_host
end
else
@@ -722,6 +740,7 @@ function _M.stream_init_worker()
plugin.init_worker()
router.stream_init_worker()
+ apisix_upstream.init_worker()
if core.config == require("apisix.core.config_yaml") then
core.config.init_worker()
@@ -756,11 +775,18 @@ function _M.stream_preread_phase()
return ngx_exit(1)
end
+
+ local up_id = matched_route.value.upstream_id
+ if up_id then
+ api_ctx.matched_upstream = get_upstream_by_id(up_id)
+ else
+ api_ctx.matched_upstream = matched_route.value.upstream
+ end
+
local plugins = core.tablepool.fetch("plugins", 32, 0)
api_ctx.plugins = plugin.stream_filter(matched_route, plugins)
-- core.log.info("valid plugins: ", core.json.delay_encode(plugins, true))
- api_ctx.matched_upstream = matched_route.value.upstream
api_ctx.conf_type = "stream/route"
api_ctx.conf_version = matched_route.modifiedIndex
api_ctx.conf_id = matched_route.value.id
diff --git a/t/stream-node/sanity.t b/t/stream-node/sanity.t
index 7eaf06e..cd4e019 100644
--- a/t/stream-node/sanity.t
+++ b/t/stream-node/sanity.t
@@ -163,3 +163,70 @@ GET /t
passed
--- no_error_log
[error]
+
+
+
+=== TEST 7: set upstream (id: 1)
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/upstreams/1',
+ ngx.HTTP_PUT,
+ [[{
+ "nodes": {
+ "127.0.0.1:1995": 1
+ },
+ "type": "roundrobin"
+ }]]
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- request
+GET /t
+--- response_body
+passed
+--- no_error_log
+[error]
+
+
+
+=== TEST 8: set stream route (id: 1) which uses upstream_id
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/stream_routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "remote_addr": "127.0.0.1",
+ "upstream_id": "1"
+ }]]
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- request
+GET /t
+--- response_body
+passed
+--- no_error_log
+[error]
+
+
+
+=== TEST 9: hit route
+--- stream_enable
+--- stream_request eval
+mmm
+--- stream_response
+hello world
+--- no_error_log
+[error]