You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by sp...@apache.org on 2022/10/23 01:58:09 UTC

[apisix] branch master updated: perf: simple setup upstream (#8130)

This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new 3887162bc perf: simple setup upstream (#8130)
3887162bc is described below

commit 3887162bc387b89c121f1ccbd823848a388cd6f5
Author: tzssangglass <tz...@gmail.com>
AuthorDate: Sun Oct 23 09:58:00 2022 +0800

    perf: simple setup upstream (#8130)
---
 apisix/http/route.lua    |   2 +-
 apisix/init.lua          | 214 ++++++++++++++++++++-------------------
 apisix/plugins/ai.lua    | 146 +++++++++++++++++++++++----
 conf/config-default.yaml |   2 +-
 t/admin/plugins.t        |   2 +-
 t/core/config.t          |   2 +-
 t/debug/dynamic-hook.t   |   5 +
 t/plugin/ai.t            | 255 +++++++++++++++++++++++++++++++++++++++++++++++
 8 files changed, 501 insertions(+), 127 deletions(-)

diff --git a/apisix/http/route.lua b/apisix/http/route.lua
index 6292b577a..d475646b5 100644
--- a/apisix/http/route.lua
+++ b/apisix/http/route.lua
@@ -92,7 +92,7 @@ function _M.create_radixtree_uri_router(routes, uri_routes, with_parameter)
         end
     end
 
-    event.push(event.CONST.BUILD_ROUTER, uri_routes)
+    event.push(event.CONST.BUILD_ROUTER, routes)
     core.log.info("route items: ", core.json.delay_encode(uri_routes, true))
 
     if with_parameter then
diff --git a/apisix/init.lua b/apisix/init.lua
index 2030e0241..883dbd9ab 100644
--- a/apisix/init.lua
+++ b/apisix/init.lua
@@ -348,6 +348,115 @@ local function common_phase(phase_name)
 end
 
 
+
+function _M.handle_upstream(api_ctx, route, enable_websocket)
+    local up_id = route.value.upstream_id
+
+    -- used for the traffic-split plugin
+    if api_ctx.upstream_id then
+        up_id = api_ctx.upstream_id
+    end
+
+    if up_id then
+        local upstream = apisix_upstream.get_by_id(up_id)
+        if not upstream then
+            if is_http then
+                return core.response.exit(502)
+            end
+
+            return ngx_exit(1)
+        end
+
+        api_ctx.matched_upstream = upstream
+
+    else
+        if route.has_domain then
+            local err
+            route, err = parse_domain_in_route(route)
+            if err then
+                core.log.error("failed to get resolved route: ", err)
+                return core.response.exit(500)
+            end
+
+            api_ctx.conf_version = route.modifiedIndex
+            api_ctx.matched_route = route
+        end
+
+        local route_val = route.value
+
+        api_ctx.matched_upstream = (route.dns_value and
+                                    route.dns_value.upstream)
+                                   or route_val.upstream
+    end
+
+    if api_ctx.matched_upstream and api_ctx.matched_upstream.tls and
+        api_ctx.matched_upstream.tls.client_cert_id then
+
+        local cert_id = api_ctx.matched_upstream.tls.client_cert_id
+        local upstream_ssl = router.router_ssl.get_by_id(cert_id)
+        if not upstream_ssl or upstream_ssl.type ~= "client" then
+            local err  = upstream_ssl and
+                "ssl type should be 'client'" or
+                "ssl id [" .. cert_id .. "] not exits"
+            core.log.error("failed to get ssl cert: ", err)
+
+            if is_http then
+                return core.response.exit(502)
+            end
+
+            return ngx_exit(1)
+        end
+
+        core.log.info("matched ssl: ",
+                  core.json.delay_encode(upstream_ssl, true))
+        api_ctx.upstream_ssl = upstream_ssl
+    end
+
+    if enable_websocket then
+        api_ctx.var.upstream_upgrade    = api_ctx.var.http_upgrade
+        api_ctx.var.upstream_connection = api_ctx.var.http_connection
+        core.log.info("enabled websocket for route: ", route.value.id)
+    end
+
+    -- load balancer is not required by kafka upstream, so the upstream
+    -- node selection process is intercepted and left to kafka to
+    -- handle on its own
+    if api_ctx.matched_upstream and api_ctx.matched_upstream.scheme == "kafka" then
+        return pubsub_kafka.access(api_ctx)
+    end
+
+    local code, err = set_upstream(route, api_ctx)
+    if code then
+        core.log.error("failed to set upstream: ", err)
+        core.response.exit(code)
+    end
+
+    local server, err = load_balancer.pick_server(route, api_ctx)
+    if not server then
+        core.log.error("failed to pick server: ", err)
+        return core.response.exit(502)
+    end
+
+    api_ctx.picked_server = server
+
+    set_upstream_headers(api_ctx, server)
+
+    -- run the before_proxy method in access phase first to avoid always reinit request
+    common_phase("before_proxy")
+
+    local up_scheme = api_ctx.upstream_scheme
+    if up_scheme == "grpcs" or up_scheme == "grpc" then
+        stash_ngx_ctx()
+        return ngx.exec("@grpc_pass")
+    end
+
+    if api_ctx.dubbo_proxy_enabled then
+        stash_ngx_ctx()
+        return ngx.exec("@dubbo_pass")
+    end
+end
+
+
 function _M.http_access_phase()
     local ngx_ctx = ngx.ctx
 
@@ -495,110 +604,7 @@ function _M.http_access_phase()
         plugin.run_plugin("access", plugins, api_ctx)
     end
 
-    local up_id = route.value.upstream_id
-
-    -- used for the traffic-split plugin
-    if api_ctx.upstream_id then
-        up_id = api_ctx.upstream_id
-    end
-
-    if up_id then
-        local upstream = apisix_upstream.get_by_id(up_id)
-        if not upstream then
-            if is_http then
-                return core.response.exit(502)
-            end
-
-            return ngx_exit(1)
-        end
-
-        api_ctx.matched_upstream = upstream
-
-    else
-        if route.has_domain then
-            local err
-            route, err = parse_domain_in_route(route)
-            if err then
-                core.log.error("failed to get resolved route: ", err)
-                return core.response.exit(500)
-            end
-
-            api_ctx.conf_version = route.modifiedIndex
-            api_ctx.matched_route = route
-        end
-
-        local route_val = route.value
-
-        api_ctx.matched_upstream = (route.dns_value and
-                                    route.dns_value.upstream)
-                                   or route_val.upstream
-    end
-
-    if api_ctx.matched_upstream and api_ctx.matched_upstream.tls and
-        api_ctx.matched_upstream.tls.client_cert_id then
-
-        local cert_id = api_ctx.matched_upstream.tls.client_cert_id
-        local upstream_ssl = router.router_ssl.get_by_id(cert_id)
-        if not upstream_ssl or upstream_ssl.type ~= "client" then
-            local err  = upstream_ssl and
-                "ssl type should be 'client'" or
-                "ssl id [" .. cert_id .. "] not exits"
-            core.log.error("failed to get ssl cert: ", err)
-
-            if is_http then
-                return core.response.exit(502)
-            end
-
-            return ngx_exit(1)
-        end
-
-        core.log.info("matched ssl: ",
-                  core.json.delay_encode(upstream_ssl, true))
-        api_ctx.upstream_ssl = upstream_ssl
-    end
-
-    if enable_websocket then
-        api_ctx.var.upstream_upgrade    = api_ctx.var.http_upgrade
-        api_ctx.var.upstream_connection = api_ctx.var.http_connection
-        core.log.info("enabled websocket for route: ", route.value.id)
-    end
-
-    -- load balancer is not required by kafka upstream, so the upstream
-    -- node selection process is intercepted and left to kafka to
-    -- handle on its own
-    if api_ctx.matched_upstream and api_ctx.matched_upstream.scheme == "kafka" then
-        return pubsub_kafka.access(api_ctx)
-    end
-
-    local code, err = set_upstream(route, api_ctx)
-    if code then
-        core.log.error("failed to set upstream: ", err)
-        core.response.exit(code)
-    end
-
-    local server, err = load_balancer.pick_server(route, api_ctx)
-    if not server then
-        core.log.error("failed to pick server: ", err)
-        return core.response.exit(502)
-    end
-
-    api_ctx.picked_server = server
-
-    set_upstream_headers(api_ctx, server)
-
-    -- run the before_proxy method in access phase first to avoid always reinit request
-    common_phase("before_proxy")
-
-    local up_scheme = api_ctx.upstream_scheme
-    if up_scheme == "grpcs" or up_scheme == "grpc" then
-        stash_ngx_ctx()
-        return ngx.exec("@grpc_pass")
-    end
-
-    if api_ctx.dubbo_proxy_enabled then
-        stash_ngx_ctx()
-        return ngx.exec("@dubbo_pass")
-    end
+    _M.handle_upstream(api_ctx, route, enable_websocket)
 end
 
 
diff --git a/apisix/plugins/ai.lua b/apisix/plugins/ai.lua
index cb69f59a7..eeb78ca80 100644
--- a/apisix/plugins/ai.lua
+++ b/apisix/plugins/ai.lua
@@ -15,12 +15,19 @@
 -- limitations under the License.
 --
 local require         = require
+local apisix          = require("apisix")
 local core            = require("apisix.core")
 local router          = require("apisix.router")
 local event           = require("apisix.core.event")
+local load_balancer   = require("apisix.balancer")
+local balancer        = require("ngx.balancer")
+local is_http         = ngx.config.subsystem == "http"
+local enable_keepalive = balancer.enable_keepalive and is_http
 local ipairs          = ipairs
 local pcall           = pcall
 local loadstring      = loadstring
+local type            = type
+local pairs           = pairs
 
 local get_cache_key_func
 local get_cache_key_func_def_render
@@ -50,14 +57,17 @@ local plugin_name = "ai"
 
 local _M = {
     version = 0.1,
-    priority = 25000,
+    priority = 22900,
     name = plugin_name,
     schema = schema,
     scope = "global",
 }
 
 local orig_router_match
+local orig_handle_upstream = apisix.handle_upstream
+local orig_balancer_run = load_balancer.run
 
+local default_keepalive_pool = {}
 
 local function match_route(ctx)
     orig_router_match(ctx)
@@ -100,33 +110,98 @@ local function gen_get_cache_key_func(route_flags)
 end
 
 
-local function routes_analyze(routes)
-    -- TODO: need to add a option in config.yaml to enable this feature(default is true)
-    local route_flags = core.table.new(0, 2)
-    for _, route in ipairs(routes) do
-        if route.methods then
-            route_flags["methods"] = true
-        end
+local function ai_upstream()
+    core.log.info("enable sample upstream")
+end
 
-        if route.host or route.hosts then
-            route_flags["host"] = true
-        end
 
-        if route.vars then
-            route_flags["vars"] = true
+local pool_opt
+local function ai_balancer_run(route)
+    local server = route.value.upstream.nodes[1]
+    if enable_keepalive then
+        local ok, err = balancer.set_current_peer(server.host, server.port or 80, pool_opt)
+        if not ok then
+            core.log.error("failed to set server peer [", server.host, ":",
+                           server.port, "] err: ", err)
+            return ok, err
         end
+        balancer.enable_keepalive(default_keepalive_pool.idle_timeout,
+                                  default_keepalive_pool.requests)
+    else
+        balancer.set_current_peer(server.host, server.port or 80)
+    end
+end
 
-        if route.filter_fun then
-            route_flags["filter_fun"] = true
-        end
+local function routes_analyze(routes)
+    local route_flags = core.table.new(0, 16)
+    local route_up_flags = core.table.new(0, 12)
+    for _, route in ipairs(routes) do
+        if type(route) == "table" then
+            for key, value in pairs(route.value) do
+                -- collect route flags
+                if key == "methods" then
+                    route_flags["methods"] = true
+                elseif key == "host" or key == "hosts" then
+                    route_flags["host"] = true
+                elseif key == "vars" then
+                    route_flags["vars"] = true
+                elseif key == "filter_fun"then
+                    route_flags["filter_fun"] = true
+                elseif key == "remote_addr" or key == "remote_addrs" then
+                    route_flags["remote_addr"] = true
+                elseif key == "service" then
+                    route_flags["service"] = true
+                elseif key == "enable_websocket" then
+                    route_flags["enable_websocket"] = true
+                elseif key == "plugins" then
+                    route_flags["plugins"] = true
+                elseif key == "upstream_id" then
+                    route_flags["upstream_id"] = true
+                elseif key == "service_id" then
+                    route_flags["service_id"] = true
+                elseif key == "plugin_config_id" then
+                    route_flags["plugin_config_id"] = true
+                end
 
-        if route.remote_addr or route.remote_addrs then
-            route_flags["remote_addr"] = true
+                -- collect upstream flags
+                if key == "upstream" then
+                    if value.nodes and #value.nodes == 1 then
+                        for k, v in pairs(value) do
+                            if k == "nodes" then
+                                if (not core.utils.parse_ipv4(v[1].host)
+                                    and not core.utils.parse_ipv6(v[1].host)) then
+                                    route_up_flags["has_domain"] = true
+                                end
+                            elseif k == "pass_host" and v ~= "pass" then
+                                route_up_flags["pass_host"] = true
+                            elseif k == "scheme" and v ~= "http" then
+                                route_up_flags["scheme"] = true
+                            elseif k == "checks" then
+                                route_up_flags["checks"] = true
+                            elseif k == "retries" then
+                                route_up_flags["retries"] = true
+                            elseif k == "timeout" then
+                                route_up_flags["timeout"] = true
+                            elseif k == "tls" then
+                                route_up_flags["tls"] = true
+                            elseif k == "keepalive" then
+                                route_up_flags["keepalive"] = true
+                            elseif k == "service_name" then
+                                route_up_flags["service_name"] = true
+                            end
+                        end
+                    else
+                        route_up_flags["more_nodes"] = true
+                    end
+                end
+            end
         end
     end
 
     if route_flags["vars"] or route_flags["filter_fun"]
-         or route_flags["remote_addr"] then
+         or route_flags["remote_addr"]
+         or route_flags["service_id"]
+         or route_flags["plugin_config_id"] then
         router.router_http.match = orig_router_match
     else
         core.log.info("use ai plane to match route")
@@ -138,11 +213,44 @@ local function routes_analyze(routes)
             router.router_http.match = orig_router_match
         end
     end
+
+    if route_flags["service"]
+         or route_flags["service_id"]
+         or route_flags["upstream_id"]
+         or route_flags["enable_websocket"]
+         or route_flags["plugins"]
+         or route_up_flags["has_domain"]
+         or route_up_flags["pass_host"]
+         or route_up_flags["scheme"]
+         or route_up_flags["checks"]
+         or route_up_flags["retries"]
+         or route_up_flags["timeout"]
+         or route_up_flags["tls"]
+         or route_up_flags["keepalive"]
+         or route_up_flags["service_name"]
+         or route_up_flags["more_nodes"] then
+        apisix.handle_upstream = orig_handle_upstream
+        load_balancer.run = orig_balancer_run
+    else
+        -- replace the upstream module
+        apisix.handle_upstream = ai_upstream
+        load_balancer.run = ai_balancer_run
+    end
 end
 
 
 function _M.init()
     event.register(event.CONST.BUILD_ROUTER, routes_analyze)
+    local local_conf = core.config.local_conf()
+    local up_keepalive_conf =
+                        core.table.try_read_attr(local_conf, "nginx_config",
+                                                 "http", "upstream")
+    default_keepalive_pool.idle_timeout =
+        core.config_util.parse_time_unit(up_keepalive_conf.keepalive_timeout)
+    default_keepalive_pool.size = up_keepalive_conf.keepalive
+    default_keepalive_pool.requests = up_keepalive_conf.keepalive_requests
+
+    pool_opt = { pool_size = default_keepalive_pool.size }
 end
 
 
diff --git a/conf/config-default.yaml b/conf/config-default.yaml
index bad0e41e4..6e714b577 100755
--- a/conf/config-default.yaml
+++ b/conf/config-default.yaml
@@ -389,8 +389,8 @@ graphql:
   #cmd: ["ls", "-l"]
 
 plugins:                          # plugin list (sorted by priority)
-  - ai                             # priority: 25000
   - real-ip                        # priority: 23000
+  - ai                             # priority: 22900
   - client-control                 # priority: 22000
   - proxy-control                  # priority: 21990
   - request-id                     # priority: 12015
diff --git a/t/admin/plugins.t b/t/admin/plugins.t
index 74827e437..98e337e57 100644
--- a/t/admin/plugins.t
+++ b/t/admin/plugins.t
@@ -61,8 +61,8 @@ __DATA__
     }
 
 --- response_body
-ai
 real-ip
+ai
 client-control
 proxy-control
 request-id
diff --git a/t/core/config.t b/t/core/config.t
index 18191dae7..29d1cc52d 100644
--- a/t/core/config.t
+++ b/t/core/config.t
@@ -38,7 +38,7 @@ __DATA__
 GET /t
 --- response_body
 etcd host: http://127.0.0.1:2379
-first plugin: "ai"
+first plugin: "real-ip"
 
 
 
diff --git a/t/debug/dynamic-hook.t b/t/debug/dynamic-hook.t
index 692942d1f..87d4450d5 100644
--- a/t/debug/dynamic-hook.t
+++ b/t/debug/dynamic-hook.t
@@ -377,6 +377,11 @@ qr/call\srequire\(\"apisix.plugin\"\).filter\(\)\sreturn.*GET\s\/mysleep\?second
 
 
 === TEST 6: hook function with ctx as param
+# ai module would conflict with the debug module
+--- extra_yaml_config
+plugins:
+    #ai
+    - example-plugin
 --- debug_config
 basic:
   enable: true
diff --git a/t/plugin/ai.t b/t/plugin/ai.t
index 9415771ab..f695788ec 100644
--- a/t/plugin/ai.t
+++ b/t/plugin/ai.t
@@ -620,3 +620,258 @@ route cache key: /hello#GET
 done
 --- error_log
 route cache key: /hello#GET#127.0.0.1
+
+
+
+=== TEST 9: enable sample upstream
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "methods": ["GET"],
+                    "upstream": {
+                        "nodes": {
+                            "127.0.0.1:1980": 1
+                        },
+                        "type": "roundrobin"
+                    },
+                    "uri": "/hello"
+                }]]
+            )
+            if code >= 300 then
+                ngx.status = code
+                ngx.say(body)
+                return
+            end
+            ngx.sleep(0.5)
+            local http = require "resty.http"
+            local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello"
+            local httpc = http.new()
+            local res, err = httpc:request_uri(uri)
+            assert(res.status == 200)
+            if not res then
+                ngx.log(ngx.ERR, err)
+                return
+            end
+            ngx.say("done")
+        }
+    }
+--- response_body
+done
+--- error_log
+enable sample upstream
+
+
+
+=== TEST 10: route has plugins and run before_proxy, disable samply upstream
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "plugins": {
+                        "serverless-pre-function": {
+                            "phase": "before_proxy",
+                            "functions" : ["return function(conf, ctx) ngx.log(ngx.WARN, \"run before_proxy phase balancer_ip : \", ctx.balancer_ip) end"]
+                        }
+                    },
+                    "upstream": {
+                        "nodes": {
+                            "127.0.0.1:1980": 1
+                        },
+                        "type": "roundrobin"
+                    },
+                    "uri": "/hello"
+                }]]
+            )
+            if code >= 300 then
+                ngx.status = code
+                ngx.say(body)
+                return
+            end
+            local http = require "resty.http"
+            local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello"
+            local httpc = http.new()
+            local res, err = httpc:request_uri(uri)
+            assert(res.status == 200)
+            if not res then
+                ngx.log(ngx.ERR, err)
+                return
+            end
+            ngx.say("done")
+        }
+    }
+--- response_body
+done
+--- error_log
+run before_proxy phase balancer_ip : 127.0.0.1
+--- no_error_log
+enable sample upstream
+
+
+
+=== TEST 11: upstream has more than one nodes, disable sample upstream
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "methods": ["GET"],
+                    "upstream": {
+                        "nodes": {
+                            "127.0.0.1:1980": 1,
+                            "127.0.0.1:1981": 1
+                        },
+                        "type": "roundrobin"
+                    },
+                    "uri": "/hello"
+                }]]
+            )
+            if code >= 300 then
+                ngx.status = code
+                ngx.say(body)
+                return
+            end
+            ngx.sleep(0.5)
+            local http = require "resty.http"
+            local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello"
+            local httpc = http.new()
+            local res, err = httpc:request_uri(uri)
+            assert(res.status == 200)
+            if not res then
+                ngx.log(ngx.ERR, err)
+                return
+            end
+            ngx.say("done")
+        }
+    }
+--- response_body
+done
+--- no_error_log
+enable sample upstream
+
+
+
+=== TEST 12: node has domain, disable sample upstream
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "methods": ["GET"],
+                    "upstream": {
+                        "nodes": {
+                            "admin.apisix.dev:1980": 1
+                        },
+                        "type": "roundrobin"
+                    },
+                    "uri": "/hello"
+                }]]
+            )
+            if code >= 300 then
+                ngx.status = code
+                ngx.say(body)
+                return
+            end
+            local http = require "resty.http"
+            local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello"
+            local httpc = http.new()
+            local res, err = httpc:request_uri(uri)
+            assert(res.status == 200)
+            if not res then
+                ngx.log(ngx.ERR, err)
+                return
+            end
+            ngx.say("done")
+        }
+    }
+--- response_body
+done
+--- no_error_log
+enable sample upstream
+
+
+
+=== TEST 13: enable --> disable sample upstream
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "methods": ["GET"],
+                    "upstream": {
+                        "nodes": {
+                            "127.0.0.1:1980": 1
+                        },
+                        "type": "roundrobin"
+                    },
+                    "uri": "/hello"
+                }]]
+            )
+            if code >= 300 then
+                ngx.status = code
+                ngx.say(body)
+                return
+            end
+            ngx.sleep(0.5)
+
+            local http = require "resty.http"
+            local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello"
+            local httpc = http.new()
+            local res, err = httpc:request_uri(uri)
+            assert(res.status == 200)
+            if not res then
+                ngx.log(ngx.ERR, err)
+                return
+            end
+
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "methods": ["GET"],
+                    "upstream": {
+                        "nodes": {
+                            "127.0.0.1:1980": 1
+                        },
+                        "type": "roundrobin"
+                    },
+                    "enable_websocket": true,
+                    "uri": "/hello"
+                }]]
+            )
+            if code >= 300 then
+                ngx.status = code
+                ngx.say(body)
+                return
+            end
+            ngx.sleep(0.5)
+
+            local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello"
+            local httpc = http.new()
+            local res, err = httpc:request_uri(uri)
+            assert(res.status == 200)
+            if not res then
+                ngx.log(ngx.ERR, err)
+                return
+            end
+
+            ngx.say("done")
+        }
+    }
+--- response_body
+done
+--- grep_error_log eval
+qr/enable sample upstream/
+--- grep_error_log_out
+enable sample upstream