You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by sp...@apache.org on 2022/04/19 01:49:13 UTC
[apisix] branch master updated: feat(xRPC): simple redis support (#6873)
This is an automated email from the ASF dual-hosted git repository.
spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new 1d88741bd feat(xRPC): simple redis support (#6873)
1d88741bd is described below
commit 1d88741bdcc98937e87503655b2f66c3b2653317
Author: 罗泽轩 <sp...@gmail.com>
AuthorDate: Tue Apr 19 09:49:05 2022 +0800
feat(xRPC): simple redis support (#6873)
Signed-off-by: spacewander <sp...@gmail.com>
---
Makefile | 3 +
apisix/stream/xrpc/protocols/redis/init.lua | 239 ++++++++++++++++++++++++++
apisix/stream/xrpc/protocols/redis/schema.lua | 34 ++++
apisix/stream/xrpc/runner.lua | 16 +-
apisix/stream/xrpc/sdk.lua | 78 +++++++++
t/xrpc/redis.t | 172 ++++++++++++++++++
6 files changed, 541 insertions(+), 1 deletion(-)
diff --git a/Makefile b/Makefile
index 708b4fba6..39131e20c 100644
--- a/Makefile
+++ b/Makefile
@@ -352,6 +352,9 @@ install: runtime
$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/stream/xrpc
$(ENV_INSTALL) apisix/stream/xrpc/*.lua $(ENV_INST_LUADIR)/apisix/stream/xrpc/
+ $(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/stream/xrpc/protocols/redis
+ $(ENV_INSTALL) apisix/stream/xrpc/protocols/redis/*.lua $(ENV_INST_LUADIR)/apisix/stream/xrpc/protocols/redis/
+
$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/utils
$(ENV_INSTALL) apisix/utils/*.lua $(ENV_INST_LUADIR)/apisix/utils/
diff --git a/apisix/stream/xrpc/protocols/redis/init.lua b/apisix/stream/xrpc/protocols/redis/init.lua
new file mode 100644
index 000000000..4edcb5922
--- /dev/null
+++ b/apisix/stream/xrpc/protocols/redis/init.lua
@@ -0,0 +1,239 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local sdk = require("apisix.stream.xrpc.sdk")
+local xrpc_socket = require("resty.apisix.stream.xrpc.socket")
+local ffi = require("ffi")
+local ffi_str = ffi.string
+local math_random = math.random
+local OK = ngx.OK
+local DECLINED = ngx.DECLINED
+local DONE = ngx.DONE
+local str_byte = string.byte
+local str_fmt = string.format
+local tonumber = tonumber
+
+
+-- redis protocol spec: https://redis.io/docs/reference/protocol-spec/
+-- There is no plan to support inline command format
+local _M = {}
+local MAX_LINE_LEN = 128
+local PREFIX_ARR = str_byte("*")
+local PREFIX_STR = str_byte("$")
+local PREFIX_STA = str_byte("+")
+local PREFIX_INT = str_byte(":")
+local PREFIX_ERR = str_byte("-")
+
+
+function _M.init_downstream(session)
+ return xrpc_socket.downstream.socket()
+end
+
+
+local function read_line(sk)
+ local p, err, len = sk:read_line(MAX_LINE_LEN)
+ if not p then
+ return nil, err
+ end
+
+ if len < 2 then
+ return nil, "line too short"
+ end
+
+ return p, nil, len
+end
+
+
+local function read_len(sk)
+ local p, err, len = read_line(sk)
+ if not p then
+ return nil, err
+ end
+
+ local s = ffi_str(p + 1, len - 1)
+ local n = tonumber(s)
+ if not n then
+ return nil, str_fmt("invalid len string: \"%s\"", s)
+ end
+ return n
+end
+
+
+local function read_req(sk)
+ local narg, err = read_len(sk)
+ if not narg then
+ return nil, err
+ end
+
+ local ctx = {
+ cmd_line = core.table.new(narg, 0)
+ }
+
+ for i = 1, narg do
+ local n, err = read_len(sk)
+ if not n then
+ return nil, err
+ end
+
+ local p, err = sk:read(n + 2)
+ if not p then
+ return nil, err
+ end
+
+ local s = ffi_str(p, n)
+ ctx.cmd_line[i] = s
+ end
+
+ ctx.cmd = ctx.cmd_line[1]
+ return ctx
+end
+
+
+local function read_reply(sk)
+ local line, err, n = read_line(sk)
+ if not line then
+ return nil, err
+ end
+
+ local prefix = line[0]
+
+ if prefix == PREFIX_STR then -- char '$'
+ -- print("bulk reply")
+
+ local size = tonumber(ffi_str(line + 1, n - 1))
+ if size < 0 then
+ -- return null
+ return true
+ end
+
+ local ok, err = sk:drain(size + 2)
+ if not ok then
+ return nil, err
+ end
+
+ return true
+
+ elseif prefix == PREFIX_STA then -- char '+'
+ -- print("status reply")
+ -- return sub(line, 2)
+ return true
+
+ elseif prefix == PREFIX_ARR then -- char '*'
+ local narr = tonumber(ffi_str(line + 1, n - 1))
+
+ -- print("multi-bulk reply: ", narr)
+ if narr < 0 then
+ -- return null
+ return true
+ end
+
+ local vals = core.table.new(n, 0)
+ local nvals = 0
+ for i = 1, narr do
+ local res, err = read_reply(sk)
+ if res then
+ nvals = nvals + 1
+ vals[nvals] = res
+
+ elseif res == nil then
+ return nil, err
+
+ else
+ -- be a valid redis error value
+ nvals = nvals + 1
+ vals[nvals] = {false, err}
+ end
+ end
+
+ return vals
+
+ elseif prefix == PREFIX_INT then -- char ':'
+ -- print("integer reply")
+ -- return tonumber(str_sub(line, 2))
+ return true
+
+ elseif prefix == PREFIX_ERR then -- char '-'
+ -- print("error reply: ", n)
+ -- return false, str_sub(line, 2)
+ return true
+
+ else
+ return nil, str_fmt("unknown prefix: \"%s\"", prefix)
+ end
+end
+
+
+function _M.from_downstream(session, downstream)
+ local ctx, err = read_req(downstream)
+ if not ctx then
+ if err ~= "timeout" and err ~= "closed" then
+ core.log.error("failed to read request: ", err)
+ end
+ return DECLINED
+ end
+
+ return OK, ctx
+end
+
+
+function _M.connect_upstream(session, ctx)
+ local conf = session.upstream_conf
+ local nodes = conf.nodes
+ if #nodes == 0 then
+ core.log.error("failed to connect: no nodes")
+ return DECLINED
+ end
+
+ local node = nodes[math_random(#nodes)]
+ local sk = sdk.connect_upstream(node, conf)
+ if not sk then
+ return DECLINED
+ end
+
+ return OK, sk
+end
+
+
+function _M.to_upstream(session, ctx, downstream, upstream)
+ local ok, err = upstream:move(downstream)
+ if not ok then
+ core.log.error("failed to send to upstream: ", err)
+ return DECLINED
+ end
+
+ local p, err = read_reply(upstream)
+ if p == nil then
+ core.log.error("failed to handle upstream: ", err)
+ return DECLINED
+ end
+
+ local ok, err = downstream:move(upstream)
+ if not ok then
+ core.log.error("failed to handle upstream: ", err)
+ return DECLINED
+ end
+
+ return DONE
+end
+
+
+function _M.log(session, ctx)
+ -- TODO
+end
+
+
+return _M
diff --git a/apisix/stream/xrpc/protocols/redis/schema.lua b/apisix/stream/xrpc/protocols/redis/schema.lua
new file mode 100644
index 000000000..49a7a0e48
--- /dev/null
+++ b/apisix/stream/xrpc/protocols/redis/schema.lua
@@ -0,0 +1,34 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+
+
+local schema = {
+ type = "object",
+ properties = {
+ },
+}
+
+local _M = {}
+
+
+function _M.check_schema(conf)
+ return core.schema.check(schema, conf)
+end
+
+
+return _M
diff --git a/apisix/stream/xrpc/runner.lua b/apisix/stream/xrpc/runner.lua
index dd250c610..b364b0fb4 100644
--- a/apisix/stream/xrpc/runner.lua
+++ b/apisix/stream/xrpc/runner.lua
@@ -14,6 +14,8 @@
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
+local core = require("apisix.core")
+local ngx_now = ngx.now
local OK = ngx.OK
local DECLINED = ngx.DECLINED
local DONE = ngx.DONE
@@ -24,7 +26,8 @@ local _M = {}
local function open_session(conn_ctx)
conn_ctx.xrpc_session = {
- upstream_conf = conn_ctx.matched_upstream
+ upstream_conf = conn_ctx.matched_upstream,
+ id_seq = 0,
}
return conn_ctx.xrpc_session
end
@@ -42,8 +45,19 @@ local function close_session(session, upstream_broken)
end
+local function put_req_ctx(session, ctx)
+ local id = ctx.id
+ session.ctxs[id] = nil
+
+ core.tablepool.release("xrpc_ctxs", ctx)
+end
+
+
local function finish_req(protocol, session, ctx)
+ ctx.rpc_end_time = ngx_now()
+
protocol.log(session, ctx)
+ put_req_ctx(session, ctx)
end
diff --git a/apisix/stream/xrpc/sdk.lua b/apisix/stream/xrpc/sdk.lua
new file mode 100644
index 000000000..63a179dab
--- /dev/null
+++ b/apisix/stream/xrpc/sdk.lua
@@ -0,0 +1,78 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Upstream helper functions which can be used in xRPC
+--
+-- @module xrpc.sdk
+local core = require("apisix.core")
+local xrpc_socket = require("resty.apisix.stream.xrpc.socket")
+local ngx_now = ngx.now
+
+
+local _M = {}
+
+
+---
+-- Returns the connected xRPC upstream socket according to the configuration
+--
+-- @function xrpc.sdk.connect_upstream
+-- @tparam table selected upstream node
+-- @tparam table upstream configuration
+-- @treturn table|nil the xRPC upstream socket, or nil if failed
+function _M.connect_upstream(node, up_conf)
+ local sk = xrpc_socket.upstream.socket()
+
+ local ok, err = sk:connect(node.host, node.port)
+ if not ok then
+ core.log.error("failed to connect: ", err)
+ return nil
+ end
+
+ if up_conf.scheme == "tls" then
+ local ok, err = sk:sslhandshake(nil, node.host)
+ if not ok then
+ core.log.error("failed to handshake: ", err)
+ return nil
+ end
+ end
+
+ return sk
+end
+
+
+---
+-- Returns the request level ctx with an optional id
+--
+-- @function xrpc.sdk.get_req_ctx
+-- @tparam table xrpc session
+-- @tparam string optional ctx id
+-- @treturn table the request level ctx
+function _M.get_req_ctx(session, id)
+ if not id then
+ id = session.id_seq
+ session.id_seq = session.id_seq + 1
+ end
+
+ local ctx = core.tablepool.fetch("xrpc_ctxs")
+ session.ctxs[id] = ctx
+
+ ctx.rpc_start_time = ngx_now()
+ return ctx
+end
+
+
+return _M
diff --git a/t/xrpc/redis.t b/t/xrpc/redis.t
new file mode 100644
index 000000000..bc5319d65
--- /dev/null
+++ b/t/xrpc/redis.t
@@ -0,0 +1,172 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX;
+
+my $nginx_binary = $ENV{'TEST_NGINX_BINARY'} || 'nginx';
+my $version = eval { `$nginx_binary -V 2>&1` };
+
+if ($version !~ m/\/apisix-nginx-module/) {
+ plan(skip_all => "apisix-nginx-module not installed");
+} else {
+ plan('no_plan');
+}
+
+$ENV{TEST_NGINX_REDIS_PORT} ||= 1985;
+
+add_block_preprocessor(sub {
+ my ($block) = @_;
+
+ if (!$block->extra_yaml_config) {
+ my $extra_yaml_config = <<_EOC_;
+xrpc:
+ protocols:
+ - name: redis
+_EOC_
+ $block->set_value("extra_yaml_config", $extra_yaml_config);
+ }
+
+ if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
+ $block->set_value("no_error_log", "[error]");
+ }
+
+ if (!defined $block->request) {
+ $block->set_value("request", "GET /t");
+ }
+
+ $block;
+});
+
+run_tests;
+
+__DATA__
+
+=== TEST 1: init
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local etcd = require("apisix.core.etcd")
+ local code, body = t('/apisix/admin/stream_routes/1',
+ ngx.HTTP_PUT,
+ {
+ protocol = {
+ name = "redis"
+ },
+ upstream = {
+ nodes = {
+ ["127.0.0.1:6379"] = 1
+ },
+ type = "roundrobin"
+ }
+ }
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- response_body
+passed
+
+
+
+=== TEST 2: sanity
+--- config
+ location /t {
+ content_by_lua_block {
+ local redis = require "resty.redis"
+ local red = redis:new()
+
+ local ok, err = red:connect("127.0.0.1", $TEST_NGINX_REDIS_PORT)
+ if not ok then
+ ngx.say("failed to connect: ", err)
+ return
+ end
+
+ local res, err = red:hmset("animals", "dog", "bark", "cat", "meow")
+ if not res then
+ ngx.say("failed to set animals: ", err)
+ return
+ end
+ ngx.say("hmset animals: ", res)
+
+ local res, err = red:hmget("animals", "dog", "cat")
+ if not res then
+ ngx.say("failed to get animals: ", err)
+ return
+ end
+
+ ngx.say("hmget animals: ", res)
+
+ local res, err = red:hget("animals", "dog")
+ if not res then
+ ngx.say("failed to get animals: ", err)
+ return
+ end
+
+ ngx.say("hget animals: ", res)
+
+ local res, err = red:hget("animals", "not_found")
+ if not res then
+ ngx.say("failed to get animals: ", err)
+ return
+ end
+
+ ngx.say("hget animals: ", res)
+ }
+ }
+--- response_body
+hmset animals: OK
+hmget animals: barkmeow
+hget animals: bark
+hget animals: null
+--- stream_conf_enable
+
+
+
+=== TEST 3: error
+--- config
+ location /t {
+ content_by_lua_block {
+ local redis = require "resty.redis"
+ local red = redis:new()
+
+ local ok, err = red:connect("127.0.0.1", $TEST_NGINX_REDIS_PORT)
+ if not ok then
+ ngx.say("failed to connect: ", err)
+ return
+ end
+
+ local res, err = red:get("animals")
+ if not res then
+ ngx.say("failed to set animals: ", err)
+ end
+
+ local res, err = red:hget("animals", "dog")
+ if not res then
+ ngx.say("failed to get animals: ", err)
+ return
+ end
+
+ ngx.say("hget animals: ", res)
+ }
+ }
+--- response_body
+failed to set animals: WRONGTYPE Operation against a key holding the wrong kind of value
+hget animals: bark
+--- stream_conf_enable