You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by me...@apache.org on 2021/05/04 02:41:15 UTC

[apisix] branch master updated: feat: step 3, manage the plugin runner (#4163)

This is an automated email from the ASF dual-hosted git repository.

membphis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new 624f59f  feat: step 3, manage the plugin runner (#4163)
624f59f is described below

commit 624f59fc78baf9eccc3987ee6bb12d9c0a3d0c43
Author: 罗泽轩 <sp...@gmail.com>
AuthorDate: Tue May 4 10:41:08 2021 +0800

    feat: step 3, manage the plugin runner (#4163)
    
    * feat: step 3, manage the plugin runner
    * feat: step 4, add prepare_conf protocol
---
 apisix/cli/ngx_tpl.lua             |   1 +
 apisix/constants.lua               |   3 +
 apisix/init.lua                    |   1 +
 apisix/plugins/ext-plugin/init.lua | 201 ++++++++++++++++++++++++++++++++++---
 conf/config-default.yaml           |   3 +
 t/APISIX.pm                        |   1 +
 t/lib/ext-plugin.lua               |  29 +++++-
 t/plugin/ext-plugin/runner.sh      |  22 ++++
 t/plugin/ext-plugin/sanity.t       | 151 +++++++++++++++++++++++++++-
 9 files changed, 396 insertions(+), 16 deletions(-)

diff --git a/apisix/cli/ngx_tpl.lua b/apisix/cli/ngx_tpl.lua
index 7173a3a..5f935cd 100644
--- a/apisix/cli/ngx_tpl.lua
+++ b/apisix/cli/ngx_tpl.lua
@@ -47,6 +47,7 @@ worker_rlimit_core  {* worker_rlimit_core *};
 worker_shutdown_timeout {* worker_shutdown_timeout *};
 
 env APISIX_PROFILE;
+env PATH; # for searching external plugin runner's binary
 
 {% if envs then %}
 {% for _, name in ipairs(envs) do %}
diff --git a/apisix/constants.lua b/apisix/constants.lua
index c668959..8dac0cb 100644
--- a/apisix/constants.lua
+++ b/apisix/constants.lua
@@ -15,6 +15,9 @@
 -- limitations under the License.
 --
 return {
+    RPC_ERROR = 0,
+    RPC_PREPARE_CONF = 1,
+    RPC_HTTP_REQ_CALL = 2,
     HTTP_ETCD_DIRECTORY = {
         ["/upstreams"] = true,
         ["/plugins"] = true,
diff --git a/apisix/init.lua b/apisix/init.lua
index 6d8234b..4a30b77 100644
--- a/apisix/init.lua
+++ b/apisix/init.lua
@@ -127,6 +127,7 @@ function _M.http_init_worker()
 
     require("apisix.debug").init_worker()
     require("apisix.upstream").init_worker()
+    require("apisix.plugins.ext-plugin.init").init_worker()
 
     local_conf = core.config.local_conf()
 
diff --git a/apisix/plugins/ext-plugin/init.lua b/apisix/plugins/ext-plugin/init.lua
index 8901529..924e9f4 100644
--- a/apisix/plugins/ext-plugin/init.lua
+++ b/apisix/plugins/ext-plugin/init.lua
@@ -14,8 +14,20 @@
 -- See the License for the specific language governing permissions and
 -- limitations under the License.
 --
+local is_http = ngx.config.subsystem == "http"
+local flatbuffers = require("flatbuffers")
+local prepare_conf_req = require("A6.PrepareConf.Req")
+local prepare_conf_resp = require("A6.PrepareConf.Resp")
+local text_entry = require("A6.TextEntry")
+local constants = require("apisix.constants")
 local core = require("apisix.core")
 local helper = require("apisix.plugins.ext-plugin.helper")
+local process, ngx_pipe, events
+if is_http then
+    process = require("ngx.process")
+    ngx_pipe = require("ngx.pipe")
+    events = require("resty.worker.events")
+end
 local bit = require("bit")
 local band = bit.band
 local lshift = bit.lshift
@@ -25,6 +37,10 @@ local ffi_str = ffi.string
 local socket_tcp = ngx.socket.tcp
 local str_byte = string.byte
 local str_format = string.format
+local ngx_timer_at = ngx.timer.at
+local exiting = ngx.worker.exiting
+local error = error
+local events_list
 
 
 local lrucache = core.lrucache.new({
@@ -34,15 +50,40 @@ local lrucache = core.lrucache.new({
 
 local schema = {
     type = "object",
-    properties = {},
+    properties = {
+        conf = {
+            type = "array",
+            items = {
+                type = "object",
+                properties = {
+                    name = {
+                        type = "string",
+                        maxLength = 128,
+                        minLength = 1
+                    },
+                    value = {
+                        type = "string",
+                    },
+                }
+            },
+            minItems = 1,
+        },
+        extra_info = {
+            type = "array",
+            items = {
+                type = "string",
+                maxLength = 64,
+                minLength = 1,
+            },
+            minItems = 1,
+        }
+    },
 }
 
 local _M = {
     schema = schema,
 }
-local RPC_ERROR = 0
-local RPC_PREPARE_CONF = 1
-local RPC_HTTP_REQ_CALL = 2
+local builder = flatbuffers.Builder(0)
 
 
 local send
@@ -86,7 +127,7 @@ local function receive(sock)
     end
 
     local ty = str_byte(hdr, 1)
-    if ty == RPC_ERROR then
+    if ty == constants.RPC_ERROR then
         return nil, "TODO: handler err"
     end
 
@@ -115,8 +156,36 @@ local rpc_call
 local rpc_handlers = {
     nil,
     function (conf, ctx, sock)
-        local req = "prepare"
-        local ok, err = send(sock, RPC_PREPARE_CONF, req)
+        builder:Clear()
+
+        local conf_vec
+        if conf.conf then
+            local len = #conf.conf
+            local textEntries = core.table.new(len, 0)
+            for i = 1, len do
+                local name = builder:CreateString(conf.conf[i].name)
+                local value = builder:CreateString(conf.conf[i].value)
+                text_entry.Start(builder)
+                text_entry.AddName(builder, name)
+                text_entry.AddValue(builder, value)
+                local c = text_entry.End(builder)
+                textEntries[i] = c
+            end
+            prepare_conf_req.StartConfVector(builder, len)
+            for i = len, 1, -1 do
+                builder:PrependUOffsetTRelative(textEntries[i])
+            end
+            conf_vec = builder:EndVector(len)
+        end
+
+        prepare_conf_req.Start(builder)
+        if conf_vec then
+            prepare_conf_req.AddConf(builder, conf_vec)
+        end
+        local req = prepare_conf_req.End(builder)
+        builder:Finish(req)
+
+        local ok, err = send(sock, constants.RPC_PREPARE_CONF, builder:Output())
         if not ok then
             return nil, "failed to send RPC_PREPARE_CONF: " .. err
         end
@@ -126,22 +195,26 @@ local rpc_handlers = {
             return nil, "failed to receive RPC_PREPARE_CONF: " .. resp
         end
 
-        if ty ~= RPC_PREPARE_CONF then
+        if ty ~= constants.RPC_PREPARE_CONF then
             return nil, "failed to receive RPC_PREPARE_CONF: unexpected type " .. ty
         end
 
-        core.log.warn(resp)
-        return true
+        local buf = flatbuffers.binaryArray.New(resp)
+        local pcr = prepare_conf_resp.GetRootAsResp(buf, 0)
+        local token = pcr:ConfToken()
+
+        core.log.notice("get conf token: ", token, " conf: ", core.json.delay_encode(conf.conf))
+        return token
     end,
     function (conf, ctx, sock)
         local token, err = core.lrucache.plugin_ctx(lrucache, ctx, nil, rpc_call,
-                                                    RPC_PREPARE_CONF, conf, ctx)
+                                                    constants.RPC_PREPARE_CONF, conf, ctx)
         if not token then
             return nil, err
         end
 
         local req = "hello"
-        local ok, err = send(sock, RPC_HTTP_REQ_CALL, req)
+        local ok, err = send(sock, constants.RPC_HTTP_REQ_CALL, req)
         if not ok then
             return nil, "failed to send RPC_HTTP_REQ_CALL: " .. err
         end
@@ -151,7 +224,7 @@ local rpc_handlers = {
             return nil, "failed to receive RPC_HTTP_REQ_CALL: " .. resp
         end
 
-        if ty ~= RPC_HTTP_REQ_CALL then
+        if ty ~= constants.RPC_HTTP_REQ_CALL then
             return nil, "failed to receive RPC_HTTP_REQ_CALL: unexpected type " .. ty
         end
 
@@ -186,7 +259,7 @@ end
 
 
 function _M.communicate(conf, ctx)
-    local ok, err = rpc_call(RPC_HTTP_REQ_CALL, conf, ctx)
+    local ok, err = rpc_call(constants.RPC_HTTP_REQ_CALL, conf, ctx)
     if not ok then
         core.log.error(err)
         return 503
@@ -194,4 +267,104 @@ function _M.communicate(conf, ctx)
 end
 
 
+local function create_lrucache()
+    if lrucache then
+        core.log.warn("flush conf token lrucache")
+    end
+
+    lrucache = core.lrucache.new({
+        type = "plugin",
+        ttl = helper.get_conf_token_cache_time(),
+    })
+end
+
+
+local function spawn_proc(cmd)
+    local opt = {
+        merge_stderr = true,
+        environ = {
+            "APISIX_CONF_EXPIRE_TIME=" .. helper.get_conf_token_cache_time(),
+            "APISIX_LISTEN_ADDRESS=" .. helper.get_path(),
+        },
+    }
+    local proc, err = ngx_pipe.spawn(cmd, opt)
+    if not proc then
+        error(str_format("failed to start %s: %s", core.json.encode(cmd), err))
+        -- TODO: add retry
+    end
+
+    proc:set_timeouts(nil, nil, nil, 0)
+    return proc
+end
+
+
+local function setup_runner()
+    local local_conf = core.config.local_conf()
+    local cmd = core.table.try_read_attr(local_conf, "ext-plugin", "cmd")
+    if not cmd then
+        return
+    end
+
+    events_list = events.event_list(
+        "process_runner_exit_event",
+        "runner_exit"
+    )
+
+    -- flush cache when runner exited
+    events.register(create_lrucache, events_list._source, events_list.runner_exit)
+
+    -- note that the runner is run under the same user as the Nginx master
+    if process.type() ~= "privileged agent" then
+        return
+    end
+
+    local proc = spawn_proc(cmd)
+    ngx_timer_at(0, function(premature)
+        if premature then
+            return
+        end
+
+        while not exiting() do
+            while true do
+                -- drain output
+                local max = 3800 -- smaller than Nginx error log length limit
+                local data, err = proc:stdout_read_any(max)
+                if not data then
+                    if exiting() then
+                        return
+                    end
+
+                    if err == "closed" then
+                        break
+                    end
+                else
+                    -- we log stdout here just for debug or test
+                    -- the runner itself should log to a file
+                    core.log.warn(data)
+                end
+            end
+
+            local ok, reason, status = proc:wait()
+            if not ok then
+                core.log.warn("runner exited with reason: ", reason, ", status: ", status)
+            end
+
+            local ok, err = events.post(events_list._source, events_list.runner_exit)
+            if not ok then
+                core.log.error("post event failure with ", events_list._source, ", error: ", err)
+            end
+
+            core.log.warn("respawn runner with cmd: ", core.json.encode(cmd))
+            proc = spawn_proc(cmd)
+        end
+    end)
+end
+
+
+function _M.init_worker()
+    create_lrucache()
+    setup_runner()
+end
+
+
 return _M
diff --git a/conf/config-default.yaml b/conf/config-default.yaml
index 39bbe24..5385d6c 100644
--- a/conf/config-default.yaml
+++ b/conf/config-default.yaml
@@ -228,6 +228,9 @@ etcd:
 graphql:
   max_size: 1048576               # the maximum size limitation of graphql in bytes, default 1MiB
 
+# ext-plugin:
+  # cmd: ["ls", "-l"]
+
 plugins:                          # plugin list (sorted in alphabetical order)
   - api-breaker
   - authz-keycloak
diff --git a/t/APISIX.pm b/t/APISIX.pm
index 0e147f6..8e662f6 100644
--- a/t/APISIX.pm
+++ b/t/APISIX.pm
@@ -229,6 +229,7 @@ _EOC_
 worker_rlimit_core  500M;
 env ENABLE_ETCD_AUTH;
 env APISIX_PROFILE;
+env PATH; # for searching external plugin runner's binary
 env TEST_NGINX_HTML_DIR;
 _EOC_
 
diff --git a/t/lib/ext-plugin.lua b/t/lib/ext-plugin.lua
index a2c9380..9b0a52d 100644
--- a/t/lib/ext-plugin.lua
+++ b/t/lib/ext-plugin.lua
@@ -15,12 +15,17 @@
 -- limitations under the License.
 --
 local ext = require("apisix.plugins.ext-plugin.init")
+local constants = require("apisix.constants")
+local flatbuffers = require("flatbuffers")
+local prepare_conf_req = require("A6.PrepareConf.Req")
+local prepare_conf_resp = require("A6.PrepareConf.Resp")
 
 
 local _M = {}
+local builder = flatbuffers.Builder(0)
 
 
-function _M.go()
+function _M.go(case)
     local sock = ngx.req.socket()
     local ty, data = ext.receive(sock)
     if not ty then
@@ -29,6 +34,28 @@ function _M.go()
     end
     ngx.log(ngx.WARN, "receive rpc call successfully")
 
+    if ty == constants.RPC_PREPARE_CONF then
+        local buf = flatbuffers.binaryArray.New(data)
+        local pc = prepare_conf_req.GetRootAsReq(buf, 0)
+
+        if case.with_conf then
+            local conf = pc:Conf(1)
+            assert(conf:Name(), "foo")
+            assert(conf:Value(), "bar")
+            local conf = pc:Conf(2)
+            assert(conf:Name(), "cat")
+            assert(conf:Value(), "dog")
+        else
+            assert(pc:ConfLength() == 0)
+        end
+
+        prepare_conf_resp.Start(builder)
+        prepare_conf_resp.AddConfToken(builder, 233)
+        local req = prepare_conf_req.End(builder)
+        builder:Finish(req)
+        data = builder:Output()
+    end
+
     local ok, err = ext.send(sock, ty, data)
     if not ok then
         ngx.log(ngx.ERR, err)
diff --git a/t/plugin/ext-plugin/runner.sh b/t/plugin/ext-plugin/runner.sh
new file mode 100755
index 0000000..5b87852
--- /dev/null
+++ b/t/plugin/ext-plugin/runner.sh
@@ -0,0 +1,22 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+echo "LISTEN $APISIX_LISTEN_ADDRESS"
+echo "EXPIRE $APISIX_CONF_EXPIRE_TIME"
+sleep "$1"
+exit 111
diff --git a/t/plugin/ext-plugin/sanity.t b/t/plugin/ext-plugin/sanity.t
index 388bf82..be275f4 100644
--- a/t/plugin/ext-plugin/sanity.t
+++ b/t/plugin/ext-plugin/sanity.t
@@ -22,6 +22,8 @@ no_root_location();
 no_shuffle();
 log_level("info");
 
+$ENV{"PATH"} = $ENV{PATH} . ":" . $ENV{TEST_NGINX_HTML_DIR};
+
 add_block_preprocessor(sub {
     my ($block) = @_;
 
@@ -34,7 +36,7 @@ add_block_preprocessor(sub {
 
         content_by_lua_block {
             local ext = require("lib.ext-plugin")
-            ext.go()
+            ext.go({})
         }
     }
 
@@ -43,10 +45,12 @@ _EOC_
     }
 
     my $unix_socket_path = $ENV{"TEST_NGINX_HTML_DIR"} . "/nginx.sock";
+    my $orig_extra_yaml_config = $block->extra_yaml_config // "";
     my $extra_yaml_config = <<_EOC_;
 ext-plugin:
     path_for_test: $unix_socket_path
 _EOC_
+    $extra_yaml_config = $extra_yaml_config . $orig_extra_yaml_config;
 
     $block->set_value("extra_yaml_config", $extra_yaml_config);
 
@@ -106,6 +110,10 @@ passed
 GET /hello
 --- response_body
 hello world
+--- error_log
+get conf token: 233
+--- no_error_log
+[error]
 --- grep_error_log eval
 qr/(sending|receiving) rpc type: \d data length:/
 --- grep_error_log_out
@@ -167,3 +175,144 @@ GET /hello
 --- error_code: 503
 --- error_log
 failed to connect to the unix socket
+
+
+
+=== TEST 6: spawn runner
+--- extra_yaml_config
+    cmd: ["t/plugin/ext-plugin/runner.sh", "3600"]
+--- config
+    location /t {
+        return 200;
+    }
+--- grep_error_log eval
+qr/LISTEN unix:\S+/
+--- grep_error_log_out eval
+qr/LISTEN unix:.+\/nginx.sock/
+--- error_log
+EXPIRE 3600
+
+
+
+=== TEST 7: respawn runner when it exited
+--- extra_yaml_config
+    cmd: ["t/plugin/ext-plugin/runner.sh", "0.1"]
+--- config
+    location /t {
+        content_by_lua_block {
+            ngx.sleep(0.2)
+        }
+    }
+--- error_log
+runner exited with reason: exit, status: 111
+respawn runner with cmd: ["t\/plugin\/ext-plugin\/runner.sh","0.1"]
+
+
+
+=== TEST 8: flush cache when runner exited
+--- extra_yaml_config
+    cmd: ["t/plugin/ext-plugin/runner.sh", "0.4"]
+--- config
+    location /t {
+        content_by_lua_block {
+            local http = require "resty.http"
+            local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello"
+            local function r()
+                local httpc = http.new()
+                local res, err = httpc:request_uri(uri)
+                if not res then
+                    ngx.log(ngx.ERR, err)
+                    return
+                else
+                    ngx.print(res.body)
+                end
+            end
+
+            r()
+            r()
+            ngx.sleep(0.5)
+            r()
+        }
+    }
+--- response_body
+hello world
+hello world
+hello world
+--- grep_error_log eval
+qr/(sending|receiving) rpc type: 1 data length:/
+--- grep_error_log_out
+sending rpc type: 1 data length:
+receiving rpc type: 1 data length:
+sending rpc type: 1 data length:
+receiving rpc type: 1 data length:
+sending rpc type: 1 data length:
+receiving rpc type: 1 data length:
+sending rpc type: 1 data length:
+receiving rpc type: 1 data length:
+--- error_log
+flush conf token lrucache
+--- no_error_log
+[error]
+
+
+
+=== TEST 9: prepare conf
+--- config
+    location /t {
+        content_by_lua_block {
+            local json = require("toolkit.json")
+            local t = require("lib.test_admin")
+
+            local code, message, res = t.test('/apisix/admin/routes/1',
+                ngx.HTTP_PUT,
+                 [[{
+                    "uri": "/hello",
+                    "plugins": {
+                        "ext-plugin-pre-req": {
+                            "conf": [
+                                {"name":"foo", "value":"bar"},
+                                {"name":"cat", "value":"dog"}
+                            ]
+                        }
+                    },
+                    "upstream": {
+                        "nodes": {
+                            "127.0.0.1:1980": 1
+                        },
+                        "type": "roundrobin"
+                    }
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+                ngx.say(message)
+                return
+            end
+
+            ngx.say(message)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 10: hit
+--- request
+GET /hello
+--- response_body
+hello world
+--- extra_stream_config
+    server {
+        listen unix:$TEST_NGINX_HTML_DIR/nginx.sock;
+
+        content_by_lua_block {
+            local ext = require("lib.ext-plugin")
+            ext.go({with_conf = true})
+        }
+    }
+--- error_log eval
+qr/get conf token: 233 conf: \[(\{"value":"bar","name":"foo"\}|\{"name":"foo","value":"bar"\}),(\{"value":"dog","name":"cat"\}|\{"name":"cat","value":"dog"\})\]/
+--- no_error_log
+[error]