You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by sp...@apache.org on 2022/05/13 08:04:45 UTC
[apisix] branch master updated: feat(redis): support pubsub (#7031)
This is an automated email from the ASF dual-hosted git repository.
spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new 025710564 feat(redis): support pubsub (#7031)
025710564 is described below
commit 025710564ac717f99fb96dd19dc1fdd1344bc7ef
Author: 罗泽轩 <sp...@gmail.com>
AuthorDate: Fri May 13 16:04:39 2022 +0800
feat(redis): support pubsub (#7031)
Signed-off-by: spacewander <sp...@gmail.com>
---
apisix/stream/xrpc/protocols/redis/init.lua | 107 ++++++++++++--
t/APISIX.pm | 3 +
t/xrpc/redis.t | 220 ++++++++++++++++++++++++++++
3 files changed, 321 insertions(+), 9 deletions(-)
diff --git a/apisix/stream/xrpc/protocols/redis/init.lua b/apisix/stream/xrpc/protocols/redis/init.lua
index 24ca98f0c..35b8604f8 100644
--- a/apisix/stream/xrpc/protocols/redis/init.lua
+++ b/apisix/stream/xrpc/protocols/redis/init.lua
@@ -153,7 +153,12 @@ local function read_req(session, sk)
end
local s = ffi_str(p, n)
- cmd_line[1] = s
+ local cmd = s:lower()
+ cmd_line[1] = cmd
+
+ if cmd == "subscribe" or cmd == "psubscribe" then
+ session.in_pub_sub = true
+ end
local key_finder
local matcher = session.matcher
@@ -209,7 +214,7 @@ local function read_req(session, sk)
session.req_id_seq = session.req_id_seq + 1
local ctx = sdk.get_req_ctx(session, session.req_id_seq)
ctx.cmd_line = cmd_line
- ctx.cmd = ctx.cmd_line[1]
+ ctx.cmd = cmd
local pipelined = sk:has_pending_data()
@@ -228,7 +233,37 @@ local function read_req(session, sk)
end
-local function read_reply(sk)
+local function read_subscribe_reply(sk)
+ local line, err, n = read_line(sk)
+ if not line then
+ return nil, err
+ end
+
+ local prefix = line[0]
+
+ if prefix == PREFIX_STR then -- char '$'
+ local size = tonumber(ffi_str(line + 1, n - 1))
+ if size < 0 then
+ return true
+ end
+
+ local p, err = sk:read(size + 2)
+ if not p then
+ return nil, err
+ end
+
+ return ffi_str(p, size)
+
+ elseif prefix == PREFIX_INT then -- char ':'
+ return tonumber(ffi_str(line + 1, n - 1))
+
+ else
+ return nil, str_fmt("unknown prefix: \"%s\"", prefix)
+ end
+end
+
+
+local function read_reply(sk, session)
local line, err, n = read_line(sk)
if not line then
return nil, err
@@ -263,12 +298,49 @@ local function read_reply(sk)
return true
end
- for i = 1, narr do
+ if session and session.in_pub_sub and (narr == 3 or narr == 4) then
+ local msg_type, err = read_subscribe_reply(sk)
+ if msg_type == nil then
+ return nil, err
+ end
+
+ session.pub_sub_msg_type = msg_type
+
local res, err = read_reply(sk)
if res == nil then
return nil, err
end
+
+ if msg_type == "unsubscribe" or msg_type == "punsubscribe" then
+ local n_ch, err = read_subscribe_reply(sk)
+ if n_ch == nil then
+ return nil, err
+ end
+
+ if n_ch == 0 then
+ session.in_pub_sub = -1
+ -- clear this flag later at the end of `handle_reply`
+ end
+
+ else
+ local n = msg_type == "pmessage" and 2 or 1
+ for i = 1, n do
+ local res, err = read_reply(sk)
+ if res == nil then
+ return nil, err
+ end
+ end
+ end
+
+ else
+ for i = 1, narr do
+ local res, err = read_reply(sk)
+ if res == nil then
+ return nil, err
+ end
+ end
end
+
return true
elseif prefix == PREFIX_INT then -- char ':'
@@ -286,14 +358,31 @@ end
local function handle_reply(session, sk)
- local ok, err = read_reply(sk)
+ local ok, err = read_reply(sk, session)
if not ok then
return nil, err
end
- -- TODO: don't update resp_id_seq if the reply is subscribed msg
- session.resp_id_seq = session.resp_id_seq + 1
- local ctx = sdk.get_req_ctx(session, session.resp_id_seq)
+ local ctx
+ if session.in_pub_sub and session.pub_sub_msg_type then
+ local msg_type = session.pub_sub_msg_type
+ session.pub_sub_msg_type = nil
+ if session.resp_id_seq < session.req_id_seq then
+ local cur_ctx = sdk.get_req_ctx(session, session.resp_id_seq + 1)
+ local cmd = cur_ctx.cmd
+ if cmd == msg_type then
+ ctx = cur_ctx
+ session.resp_id_seq = session.resp_id_seq + 1
+ end
+ end
+
+ if session.in_pub_sub == -1 then
+ session.in_pub_sub = nil
+ end
+ else
+ session.resp_id_seq = session.resp_id_seq + 1
+ ctx = sdk.get_req_ctx(session, session.resp_id_seq)
+ end
return ctx
end
@@ -371,7 +460,7 @@ end
function _M.from_upstream(session, downstream, upstream)
local ctx, err = handle_reply(session, upstream)
- if ctx == nil then
+ if err then
core.log.error("failed to handle upstream: ", err)
return DECLINED
end
diff --git a/t/APISIX.pm b/t/APISIX.pm
index 21bcedceb..b8e87a859 100644
--- a/t/APISIX.pm
+++ b/t/APISIX.pm
@@ -376,9 +376,12 @@ _EOC_
apisix.stream_init(args)
_EOC_
+ my $stream_extra_init_by_lua = $block->stream_extra_init_by_lua // "";
+
$stream_config .= <<_EOC_;
init_by_lua_block {
$stream_init_by_lua_block
+ $stream_extra_init_by_lua
}
init_worker_by_lua_block {
apisix.stream_init_worker()
diff --git a/t/xrpc/redis.t b/t/xrpc/redis.t
index a3a9ec9ca..054971e08 100644
--- a/t/xrpc/redis.t
+++ b/t/xrpc/redis.t
@@ -565,3 +565,223 @@ passed
--- response_body
ok
--- stream_conf_enable
+
+
+
+=== TEST 12: publish & subscribe
+--- stream_extra_init_by_lua
+ local cjson = require "cjson"
+ local redis_proto = require("apisix.stream.xrpc.protocols.redis")
+ redis_proto.log = function(sess, ctx)
+ ngx.log(ngx.WARN, "log redis request ", cjson.encode(ctx.cmd_line))
+ end
+
+--- config
+ location /t {
+ content_by_lua_block {
+ local cjson = require "cjson"
+ local redis = require "resty.redis"
+
+ local red = redis:new()
+ local red2 = redis:new()
+
+ red:set_timeout(1000) -- 1 sec
+ red2:set_timeout(1000) -- 1 sec
+
+ local ok, err = red:connect("127.0.0.1", $TEST_NGINX_REDIS_PORT)
+ if not ok then
+ ngx.say("1: failed to connect: ", err)
+ return
+ end
+
+ ok, err = red2:connect("127.0.0.1", $TEST_NGINX_REDIS_PORT)
+ if not ok then
+ ngx.say("2: failed to connect: ", err)
+ return
+ end
+
+ local res, err = red:subscribe("dog")
+ if not res then
+ ngx.say("1: failed to subscribe: ", err)
+ return
+ end
+
+ ngx.say("1: subscribe dog: ", cjson.encode(res))
+
+ res, err = red:subscribe("cat")
+ if not res then
+ ngx.say("1: failed to subscribe: ", err)
+ return
+ end
+
+ ngx.say("1: subscribe cat: ", cjson.encode(res))
+
+ res, err = red2:publish("dog", "Hello")
+ if not res then
+ ngx.say("2: failed to publish: ", err)
+ return
+ end
+
+ ngx.say("2: publish: ", cjson.encode(res))
+
+ res, err = red:read_reply()
+ if not res then
+ ngx.say("1: failed to read reply: ", err)
+ else
+ ngx.say("1: receive: ", cjson.encode(res))
+ end
+
+ red:set_timeout(10) -- 10ms
+ res, err = red:read_reply()
+ if not res then
+ ngx.say("1: failed to read reply: ", err)
+ else
+ ngx.say("1: receive: ", cjson.encode(res))
+ end
+ red:set_timeout(1000) -- 1s
+
+ res, err = red:unsubscribe()
+ if not res then
+ ngx.say("1: failed to unscribe: ", err)
+ else
+ ngx.say("1: unsubscribe: ", cjson.encode(res))
+ end
+
+ res, err = red:read_reply()
+ if not res then
+ ngx.say("1: failed to read reply: ", err)
+ else
+ ngx.say("1: receive: ", cjson.encode(res))
+ end
+
+ red:set_timeout(10) -- 10ms
+ res, err = red:read_reply()
+ if not res then
+ ngx.say("1: failed to read reply: ", err)
+ else
+ ngx.say("1: receive: ", cjson.encode(res))
+ end
+ red:set_timeout(1000) -- 1s
+
+ res, err = red:set("dog", 1)
+ if not res then
+ ngx.say("1: failed to set: ", err)
+ else
+ ngx.say("1: receive: ", cjson.encode(res))
+ end
+
+ red:close()
+ red2:close()
+ }
+ }
+--- response_body_like chop
+^1: subscribe dog: \["subscribe","dog",1\]
+1: subscribe cat: \["subscribe","cat",2\]
+2: publish: 1
+1: receive: \["message","dog","Hello"\]
+1: failed to read reply: timeout
+1: unsubscribe: \[\["unsubscribe","(?:dog|cat)",1\],\["unsubscribe","(?:dog|cat)",0\]\]
+1: failed to read reply: not subscribed
+1: failed to read reply: not subscribed
+1: receive: "OK"
+$
+--- stream_conf_enable
+--- grep_error_log eval
+qr/log redis request \[[^]]+\]/
+--- grep_error_log_out
+log redis request ["subscribe","dog"]
+log redis request ["subscribe","cat"]
+log redis request ["publish","dog","Hello"]
+log redis request ["unsubscribe"]
+log redis request ["set","dog","1"]
+
+
+
+=== TEST 13: psubscribe & punsubscribe
+--- stream_extra_init_by_lua
+ local cjson = require "cjson"
+ local redis_proto = require("apisix.stream.xrpc.protocols.redis")
+ redis_proto.log = function(sess, ctx)
+ ngx.log(ngx.WARN, "log redis request ", cjson.encode(ctx.cmd_line))
+ end
+
+--- config
+ location /t {
+ content_by_lua_block {
+ local cjson = require "cjson"
+ local redis = require "resty.redis"
+
+ local red = redis:new()
+ local red2 = redis:new()
+
+ red:set_timeout(1000) -- 1 sec
+ red2:set_timeout(1000) -- 1 sec
+
+ local ok, err = red:connect("127.0.0.1", $TEST_NGINX_REDIS_PORT)
+ if not ok then
+ ngx.say("1: failed to connect: ", err)
+ return
+ end
+
+ ok, err = red2:connect("127.0.0.1", $TEST_NGINX_REDIS_PORT)
+ if not ok then
+ ngx.say("2: failed to connect: ", err)
+ return
+ end
+
+ local res, err = red:psubscribe("dog*", "cat*")
+ if not res then
+ ngx.say("1: failed to subscribe: ", err)
+ return
+ end
+
+ ngx.say("1: psubscribe: ", cjson.encode(res))
+
+ res, err = red2:publish("dog1", "Hello")
+ if not res then
+ ngx.say("2: failed to publish: ", err)
+ return
+ end
+
+ ngx.say("2: publish: ", cjson.encode(res))
+
+ res, err = red:read_reply()
+ if not res then
+ ngx.say("1: failed to read reply: ", err)
+ else
+ ngx.say("1: receive: ", cjson.encode(res))
+ end
+
+ res, err = red:punsubscribe("cat*", "dog*")
+ if not res then
+ ngx.say("1: failed to unscribe: ", err)
+ else
+ ngx.say("1: punsubscribe: ", cjson.encode(res))
+ end
+
+ res, err = red:set("dog", 1)
+ if not res then
+ ngx.say("1: failed to set: ", err)
+ else
+ ngx.say("1: receive: ", cjson.encode(res))
+ end
+
+ red:close()
+ red2:close()
+ }
+ }
+--- response_body_like chop
+^1: psubscribe: \[\["psubscribe","dog\*",1\],\["psubscribe","cat\*",2\]\]
+2: publish: 1
+1: receive: \["pmessage","dog\*","dog1","Hello"\]
+1: punsubscribe: \[\["punsubscribe","cat\*",1\],\["punsubscribe","dog\*",0\]\]
+1: receive: "OK"
+$
+--- stream_conf_enable
+--- grep_error_log eval
+qr/log redis request \[[^]]+\]/
+--- grep_error_log_out
+log redis request ["psubscribe","dog*","cat*"]
+log redis request ["publish","dog1","Hello"]
+log redis request ["punsubscribe","cat*","dog*"]
+log redis request ["set","dog","1"]