You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by sp...@apache.org on 2022/04/19 01:49:13 UTC

[apisix] branch master updated: feat(xRPC): simple redis support (#6873)

This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d88741bd feat(xRPC): simple redis support (#6873)
1d88741bd is described below

commit 1d88741bdcc98937e87503655b2f66c3b2653317
Author: 罗泽轩 <sp...@gmail.com>
AuthorDate: Tue Apr 19 09:49:05 2022 +0800

    feat(xRPC): simple redis support (#6873)
    
    Signed-off-by: spacewander <sp...@gmail.com>
---
 Makefile                                      |   3 +
 apisix/stream/xrpc/protocols/redis/init.lua   | 239 ++++++++++++++++++++++++++
 apisix/stream/xrpc/protocols/redis/schema.lua |  34 ++++
 apisix/stream/xrpc/runner.lua                 |  16 +-
 apisix/stream/xrpc/sdk.lua                    |  78 +++++++++
 t/xrpc/redis.t                                | 172 ++++++++++++++++++
 6 files changed, 541 insertions(+), 1 deletion(-)

diff --git a/Makefile b/Makefile
index 708b4fba6..39131e20c 100644
--- a/Makefile
+++ b/Makefile
@@ -352,6 +352,9 @@ install: runtime
 	$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/stream/xrpc
 	$(ENV_INSTALL) apisix/stream/xrpc/*.lua $(ENV_INST_LUADIR)/apisix/stream/xrpc/
 
+	$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/stream/xrpc/protocols/redis
+	$(ENV_INSTALL) apisix/stream/xrpc/protocols/redis/*.lua $(ENV_INST_LUADIR)/apisix/stream/xrpc/protocols/redis/
+
 	$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/utils
 	$(ENV_INSTALL) apisix/utils/*.lua $(ENV_INST_LUADIR)/apisix/utils/
 
diff --git a/apisix/stream/xrpc/protocols/redis/init.lua b/apisix/stream/xrpc/protocols/redis/init.lua
new file mode 100644
index 000000000..4edcb5922
--- /dev/null
+++ b/apisix/stream/xrpc/protocols/redis/init.lua
@@ -0,0 +1,239 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local sdk = require("apisix.stream.xrpc.sdk")
+local xrpc_socket = require("resty.apisix.stream.xrpc.socket")
+local ffi = require("ffi")
+local ffi_str = ffi.string
+local math_random = math.random
+local OK = ngx.OK
+local DECLINED = ngx.DECLINED
+local DONE = ngx.DONE
+local str_byte = string.byte
+local str_fmt = string.format
+local tonumber = tonumber
+
+
+-- redis protocol spec: https://redis.io/docs/reference/protocol-spec/
+-- There is no plan to support inline command format
+local _M = {}
+local MAX_LINE_LEN = 128
+local PREFIX_ARR = str_byte("*")
+local PREFIX_STR = str_byte("$")
+local PREFIX_STA = str_byte("+")
+local PREFIX_INT = str_byte(":")
+local PREFIX_ERR = str_byte("-")
+
+
+function _M.init_downstream(session)
+    return xrpc_socket.downstream.socket()
+end
+
+
+local function read_line(sk)
+    local p, err, len = sk:read_line(MAX_LINE_LEN)
+    if not p then
+        return nil, err
+    end
+
+    if len < 2 then
+        return nil, "line too short"
+    end
+
+    return p, nil, len
+end
+
+
+local function read_len(sk)
+    local p, err, len = read_line(sk)
+    if not p then
+        return nil, err
+    end
+
+    local s = ffi_str(p + 1, len - 1)
+    local n = tonumber(s)
+    if not n then
+        return nil, str_fmt("invalid len string: \"%s\"", s)
+    end
+    return n
+end
+
+
+local function read_req(sk)
+    local narg, err = read_len(sk)
+    if not narg then
+        return nil, err
+    end
+
+    local ctx = {
+        cmd_line = core.table.new(narg, 0)
+    }
+
+    for i = 1, narg do
+        local n, err = read_len(sk)
+        if not n then
+            return nil, err
+        end
+
+        local p, err = sk:read(n + 2)
+        if not p then
+            return nil, err
+        end
+
+        local s = ffi_str(p, n)
+        ctx.cmd_line[i] = s
+    end
+
+    ctx.cmd = ctx.cmd_line[1]
+    return ctx
+end
+
+
+local function read_reply(sk)
+    local line, err, n = read_line(sk)
+    if not line then
+        return nil, err
+    end
+
+    local prefix = line[0]
+
+    if prefix == PREFIX_STR then    -- char '$'
+        -- print("bulk reply")
+
+        local size = tonumber(ffi_str(line + 1, n - 1))
+        if size < 0 then
+            -- return null
+            return true
+        end
+
+        local ok, err = sk:drain(size + 2)
+        if not ok then
+            return nil, err
+        end
+
+        return true
+
+    elseif prefix == PREFIX_STA then    -- char '+'
+        -- print("status reply")
+        -- return sub(line, 2)
+        return true
+
+    elseif prefix == PREFIX_ARR then -- char '*'
+        local narr = tonumber(ffi_str(line + 1, n - 1))
+
+        -- print("multi-bulk reply: ", narr)
+        if narr < 0 then
+            -- return null
+            return true
+        end
+
+        local vals = core.table.new(n, 0)
+        local nvals = 0
+        for i = 1, narr do
+            local res, err = read_reply(sk)
+            if res then
+                nvals = nvals + 1
+                vals[nvals] = res
+
+            elseif res == nil then
+                return nil, err
+
+            else
+                -- be a valid redis error value
+                nvals = nvals + 1
+                vals[nvals] = {false, err}
+            end
+        end
+
+        return vals
+
+    elseif prefix == PREFIX_INT then    -- char ':'
+        -- print("integer reply")
+        -- return tonumber(str_sub(line, 2))
+        return true
+
+    elseif prefix == PREFIX_ERR then    -- char '-'
+        -- print("error reply: ", n)
+        -- return false, str_sub(line, 2)
+        return true
+
+    else
+        return nil, str_fmt("unknown prefix: \"%s\"", prefix)
+    end
+end
+
+
+function _M.from_downstream(session, downstream)
+    local ctx, err = read_req(downstream)
+    if not ctx then
+        if err ~= "timeout" and err ~= "closed" then
+            core.log.error("failed to read request: ", err)
+        end
+        return DECLINED
+    end
+
+    return OK, ctx
+end
+
+
+function _M.connect_upstream(session, ctx)
+    local conf = session.upstream_conf
+    local nodes = conf.nodes
+    if #nodes == 0 then
+        core.log.error("failed to connect: no nodes")
+        return DECLINED
+    end
+
+    local node = nodes[math_random(#nodes)]
+    local sk = sdk.connect_upstream(node, conf)
+    if not sk then
+        return DECLINED
+    end
+
+    return OK, sk
+end
+
+
+function _M.to_upstream(session, ctx, downstream, upstream)
+    local ok, err = upstream:move(downstream)
+    if not ok then
+        core.log.error("failed to send to upstream: ", err)
+        return DECLINED
+    end
+
+    local p, err = read_reply(upstream)
+    if p == nil then
+        core.log.error("failed to handle upstream: ", err)
+        return DECLINED
+    end
+
+    local ok, err = downstream:move(upstream)
+    if not ok then
+        core.log.error("failed to handle upstream: ", err)
+        return DECLINED
+    end
+
+    return DONE
+end
+
+
+function _M.log(session, ctx)
+    -- TODO
+end
+
+
+return _M
diff --git a/apisix/stream/xrpc/protocols/redis/schema.lua b/apisix/stream/xrpc/protocols/redis/schema.lua
new file mode 100644
index 000000000..49a7a0e48
--- /dev/null
+++ b/apisix/stream/xrpc/protocols/redis/schema.lua
@@ -0,0 +1,34 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+
+
+local schema = {
+    type = "object",
+    properties = {
+    },
+}
+
+local _M = {}
+
+
+function _M.check_schema(conf)
+    return core.schema.check(schema, conf)
+end
+
+
+return _M
diff --git a/apisix/stream/xrpc/runner.lua b/apisix/stream/xrpc/runner.lua
index dd250c610..b364b0fb4 100644
--- a/apisix/stream/xrpc/runner.lua
+++ b/apisix/stream/xrpc/runner.lua
@@ -14,6 +14,8 @@
 -- See the License for the specific language governing permissions and
 -- limitations under the License.
 --
+local core = require("apisix.core")
+local ngx_now = ngx.now
 local OK = ngx.OK
 local DECLINED = ngx.DECLINED
 local DONE = ngx.DONE
@@ -24,7 +26,8 @@ local _M = {}
 
 local function open_session(conn_ctx)
     conn_ctx.xrpc_session = {
-        upstream_conf = conn_ctx.matched_upstream
+        upstream_conf = conn_ctx.matched_upstream,
+        id_seq = 0,
     }
     return conn_ctx.xrpc_session
 end
@@ -42,8 +45,19 @@ local function close_session(session, upstream_broken)
 end
 
 
+local function put_req_ctx(session, ctx)
+    local id = ctx.id
+    session.ctxs[id] = nil
+
+    core.tablepool.release("xrpc_ctxs", ctx)
+end
+
+
 local function finish_req(protocol, session, ctx)
+    ctx.rpc_end_time = ngx_now()
+
     protocol.log(session, ctx)
+    put_req_ctx(session, ctx)
 end
 
 
diff --git a/apisix/stream/xrpc/sdk.lua b/apisix/stream/xrpc/sdk.lua
new file mode 100644
index 000000000..63a179dab
--- /dev/null
+++ b/apisix/stream/xrpc/sdk.lua
@@ -0,0 +1,78 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Upstream helper functions which can be used in xRPC
+--
+-- @module xrpc.sdk
+local core = require("apisix.core")
+local xrpc_socket = require("resty.apisix.stream.xrpc.socket")
+local ngx_now = ngx.now
+
+
+local _M = {}
+
+
+---
+-- Returns the connected xRPC upstream socket according to the configuration
+--
+-- @function xrpc.sdk.connect_upstream
+-- @tparam table selected upstream node
+-- @tparam table upstream configuration
+-- @treturn table|nil the xRPC upstream socket, or nil if failed
+function _M.connect_upstream(node, up_conf)
+    local sk = xrpc_socket.upstream.socket()
+
+    local ok, err = sk:connect(node.host, node.port)
+    if not ok then
+        core.log.error("failed to connect: ", err)
+        return nil
+    end
+
+    if up_conf.scheme == "tls" then
+        local ok, err = sk:sslhandshake(nil, node.host)
+        if not ok then
+            core.log.error("failed to handshake: ", err)
+            return nil
+        end
+    end
+
+    return sk
+end
+
+
+---
+-- Returns the request level ctx with an optional id
+--
+-- @function xrpc.sdk.get_req_ctx
+-- @tparam table xrpc session
+-- @tparam string optional ctx id
+-- @treturn table the request level ctx
+function _M.get_req_ctx(session, id)
+    if not id then
+        id = session.id_seq
+        session.id_seq = session.id_seq + 1
+    end
+
+    local ctx = core.tablepool.fetch("xrpc_ctxs")
+    session.ctxs[id] = ctx
+
+    ctx.rpc_start_time = ngx_now()
+    return ctx
+end
+
+
+return _M
diff --git a/t/xrpc/redis.t b/t/xrpc/redis.t
new file mode 100644
index 000000000..bc5319d65
--- /dev/null
+++ b/t/xrpc/redis.t
@@ -0,0 +1,172 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX;
+
+my $nginx_binary = $ENV{'TEST_NGINX_BINARY'} || 'nginx';
+my $version = eval { `$nginx_binary -V 2>&1` };
+
+if ($version !~ m/\/apisix-nginx-module/) {
+    plan(skip_all => "apisix-nginx-module not installed");
+} else {
+    plan('no_plan');
+}
+
+$ENV{TEST_NGINX_REDIS_PORT} ||= 1985;
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    if (!$block->extra_yaml_config) {
+        my $extra_yaml_config = <<_EOC_;
+xrpc:
+  protocols:
+    - name: redis
+_EOC_
+        $block->set_value("extra_yaml_config", $extra_yaml_config);
+    }
+
+    if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
+        $block->set_value("no_error_log", "[error]");
+    }
+
+    if (!defined $block->request) {
+        $block->set_value("request", "GET /t");
+    }
+
+    $block;
+});
+
+run_tests;
+
+__DATA__
+
+=== TEST 1: init
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local etcd = require("apisix.core.etcd")
+            local code, body = t('/apisix/admin/stream_routes/1',
+                ngx.HTTP_PUT,
+                {
+                    protocol = {
+                        name = "redis"
+                    },
+                    upstream = {
+                        nodes = {
+                            ["127.0.0.1:6379"] = 1
+                        },
+                        type = "roundrobin"
+                    }
+                }
+            )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 2: sanity
+--- config
+    location /t {
+        content_by_lua_block {
+            local redis = require "resty.redis"
+            local red = redis:new()
+
+            local ok, err = red:connect("127.0.0.1", $TEST_NGINX_REDIS_PORT)
+            if not ok then
+                ngx.say("failed to connect: ", err)
+                return
+            end
+
+            local res, err = red:hmset("animals", "dog", "bark", "cat", "meow")
+            if not res then
+                ngx.say("failed to set animals: ", err)
+                return
+            end
+            ngx.say("hmset animals: ", res)
+
+            local res, err = red:hmget("animals", "dog", "cat")
+            if not res then
+                ngx.say("failed to get animals: ", err)
+                return
+            end
+
+            ngx.say("hmget animals: ", res)
+
+            local res, err = red:hget("animals", "dog")
+            if not res then
+                ngx.say("failed to get animals: ", err)
+                return
+            end
+
+            ngx.say("hget animals: ", res)
+
+            local res, err = red:hget("animals", "not_found")
+            if not res then
+                ngx.say("failed to get animals: ", err)
+                return
+            end
+
+            ngx.say("hget animals: ", res)
+        }
+    }
+--- response_body
+hmset animals: OK
+hmget animals: barkmeow
+hget animals: bark
+hget animals: null
+--- stream_conf_enable
+
+
+
+=== TEST 3: error
+--- config
+    location /t {
+        content_by_lua_block {
+            local redis = require "resty.redis"
+            local red = redis:new()
+
+            local ok, err = red:connect("127.0.0.1", $TEST_NGINX_REDIS_PORT)
+            if not ok then
+                ngx.say("failed to connect: ", err)
+                return
+            end
+
+            local res, err = red:get("animals")
+            if not res then
+                ngx.say("failed to set animals: ", err)
+            end
+
+            local res, err = red:hget("animals", "dog")
+            if not res then
+                ngx.say("failed to get animals: ", err)
+                return
+            end
+
+            ngx.say("hget animals: ", res)
+        }
+    }
+--- response_body
+failed to set animals: WRONGTYPE Operation against a key holding the wrong kind of value
+hget animals: bark
+--- stream_conf_enable