You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by sp...@apache.org on 2022/05/13 08:04:45 UTC

[apisix] branch master updated: feat(redis): support pubsub (#7031)

This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new 025710564 feat(redis): support pubsub (#7031)
025710564 is described below

commit 025710564ac717f99fb96dd19dc1fdd1344bc7ef
Author: 罗泽轩 <sp...@gmail.com>
AuthorDate: Fri May 13 16:04:39 2022 +0800

    feat(redis): support pubsub (#7031)
    
    Signed-off-by: spacewander <sp...@gmail.com>
---
 apisix/stream/xrpc/protocols/redis/init.lua | 107 ++++++++++++--
 t/APISIX.pm                                 |   3 +
 t/xrpc/redis.t                              | 220 ++++++++++++++++++++++++++++
 3 files changed, 321 insertions(+), 9 deletions(-)

diff --git a/apisix/stream/xrpc/protocols/redis/init.lua b/apisix/stream/xrpc/protocols/redis/init.lua
index 24ca98f0c..35b8604f8 100644
--- a/apisix/stream/xrpc/protocols/redis/init.lua
+++ b/apisix/stream/xrpc/protocols/redis/init.lua
@@ -153,7 +153,12 @@ local function read_req(session, sk)
     end
 
     local s = ffi_str(p, n)
-    cmd_line[1] = s
+    local cmd = s:lower()
+    cmd_line[1] = cmd
+
+    if cmd == "subscribe" or cmd == "psubscribe" then
+        session.in_pub_sub = true
+    end
 
     local key_finder
     local matcher = session.matcher
@@ -209,7 +214,7 @@ local function read_req(session, sk)
     session.req_id_seq = session.req_id_seq + 1
     local ctx = sdk.get_req_ctx(session, session.req_id_seq)
     ctx.cmd_line = cmd_line
-    ctx.cmd = ctx.cmd_line[1]
+    ctx.cmd = cmd
 
     local pipelined = sk:has_pending_data()
 
@@ -228,7 +233,37 @@ local function read_req(session, sk)
 end
 
 
-local function read_reply(sk)
+local function read_subscribe_reply(sk)
+    local line, err, n = read_line(sk)
+    if not line then
+        return nil, err
+    end
+
+    local prefix = line[0]
+
+    if prefix == PREFIX_STR then    -- char '$'
+        local size = tonumber(ffi_str(line + 1, n - 1))
+        if size < 0 then
+            return true
+        end
+
+        local p, err = sk:read(size + 2)
+        if not p then
+            return nil, err
+        end
+
+        return ffi_str(p, size)
+
+    elseif prefix == PREFIX_INT then    -- char ':'
+        return tonumber(ffi_str(line + 1, n - 1))
+
+    else
+        return nil, str_fmt("unknown prefix: \"%s\"", prefix)
+    end
+end
+
+
+local function read_reply(sk, session)
     local line, err, n = read_line(sk)
     if not line then
         return nil, err
@@ -263,12 +298,49 @@ local function read_reply(sk)
             return true
         end
 
-        for i = 1, narr do
+        if session and session.in_pub_sub and (narr == 3 or narr == 4) then
+            local msg_type, err = read_subscribe_reply(sk)
+            if msg_type == nil then
+                return nil, err
+            end
+
+            session.pub_sub_msg_type = msg_type
+
             local res, err = read_reply(sk)
             if res == nil then
                 return nil, err
             end
+
+            if msg_type == "unsubscribe" or msg_type == "punsubscribe" then
+                local n_ch, err = read_subscribe_reply(sk)
+                if n_ch == nil then
+                    return nil, err
+                end
+
+                if n_ch == 0 then
+                    session.in_pub_sub = -1
+                    -- clear this flag later at the end of `handle_reply`
+                end
+
+            else
+                local n = msg_type == "pmessage" and 2 or 1
+                for i = 1, n do
+                    local res, err = read_reply(sk)
+                    if res == nil then
+                        return nil, err
+                    end
+                end
+            end
+
+        else
+            for i = 1, narr do
+                local res, err = read_reply(sk)
+                if res == nil then
+                    return nil, err
+                end
+            end
         end
+
         return true
 
     elseif prefix == PREFIX_INT then    -- char ':'
@@ -286,14 +358,31 @@ end
 
 
 local function handle_reply(session, sk)
-    local ok, err = read_reply(sk)
+    local ok, err = read_reply(sk, session)
     if not ok then
         return nil, err
     end
 
-    -- TODO: don't update resp_id_seq if the reply is subscribed msg
-    session.resp_id_seq = session.resp_id_seq + 1
-    local ctx = sdk.get_req_ctx(session, session.resp_id_seq)
+    local ctx
+    if session.in_pub_sub and session.pub_sub_msg_type then
+        local msg_type = session.pub_sub_msg_type
+        session.pub_sub_msg_type = nil
+        if session.resp_id_seq < session.req_id_seq then
+            local cur_ctx = sdk.get_req_ctx(session, session.resp_id_seq + 1)
+            local cmd = cur_ctx.cmd
+            if cmd == msg_type then
+                ctx = cur_ctx
+                session.resp_id_seq = session.resp_id_seq + 1
+            end
+        end
+
+        if session.in_pub_sub == -1 then
+            session.in_pub_sub = nil
+        end
+    else
+        session.resp_id_seq = session.resp_id_seq + 1
+        ctx = sdk.get_req_ctx(session, session.resp_id_seq)
+    end
 
     return ctx
 end
@@ -371,7 +460,7 @@ end
 
 function _M.from_upstream(session, downstream, upstream)
     local ctx, err = handle_reply(session, upstream)
-    if ctx == nil then
+    if err then
         core.log.error("failed to handle upstream: ", err)
         return DECLINED
     end
diff --git a/t/APISIX.pm b/t/APISIX.pm
index 21bcedceb..b8e87a859 100644
--- a/t/APISIX.pm
+++ b/t/APISIX.pm
@@ -376,9 +376,12 @@ _EOC_
         apisix.stream_init(args)
 _EOC_
 
+    my $stream_extra_init_by_lua = $block->stream_extra_init_by_lua // "";
+
     $stream_config .= <<_EOC_;
     init_by_lua_block {
         $stream_init_by_lua_block
+        $stream_extra_init_by_lua
     }
     init_worker_by_lua_block {
         apisix.stream_init_worker()
diff --git a/t/xrpc/redis.t b/t/xrpc/redis.t
index a3a9ec9ca..054971e08 100644
--- a/t/xrpc/redis.t
+++ b/t/xrpc/redis.t
@@ -565,3 +565,223 @@ passed
 --- response_body
 ok
 --- stream_conf_enable
+
+
+
+=== TEST 12: publish & subscribe
+--- stream_extra_init_by_lua
+            local cjson = require "cjson"
+            local redis_proto = require("apisix.stream.xrpc.protocols.redis")
+            redis_proto.log = function(sess, ctx)
+                ngx.log(ngx.WARN, "log redis request ", cjson.encode(ctx.cmd_line))
+            end
+
+--- config
+    location /t {
+        content_by_lua_block {
+            local cjson = require "cjson"
+            local redis = require "resty.redis"
+
+            local red = redis:new()
+            local red2 = redis:new()
+
+            red:set_timeout(1000) -- 1 sec
+            red2:set_timeout(1000) -- 1 sec
+
+            local ok, err = red:connect("127.0.0.1", $TEST_NGINX_REDIS_PORT)
+            if not ok then
+                ngx.say("1: failed to connect: ", err)
+                return
+            end
+
+            ok, err = red2:connect("127.0.0.1", $TEST_NGINX_REDIS_PORT)
+            if not ok then
+                ngx.say("2: failed to connect: ", err)
+                return
+            end
+
+            local res, err = red:subscribe("dog")
+            if not res then
+                ngx.say("1: failed to subscribe: ", err)
+                return
+            end
+
+            ngx.say("1: subscribe dog: ", cjson.encode(res))
+
+            res, err = red:subscribe("cat")
+            if not res then
+                ngx.say("1: failed to subscribe: ", err)
+                return
+            end
+
+            ngx.say("1: subscribe cat: ", cjson.encode(res))
+
+            res, err = red2:publish("dog", "Hello")
+            if not res then
+                ngx.say("2: failed to publish: ", err)
+                return
+            end
+
+            ngx.say("2: publish: ", cjson.encode(res))
+
+            res, err = red:read_reply()
+            if not res then
+                ngx.say("1: failed to read reply: ", err)
+            else
+                ngx.say("1: receive: ", cjson.encode(res))
+            end
+
+            red:set_timeout(10) -- 10ms
+            res, err = red:read_reply()
+            if not res then
+                ngx.say("1: failed to read reply: ", err)
+            else
+                ngx.say("1: receive: ", cjson.encode(res))
+            end
+            red:set_timeout(1000) -- 1s
+
+            res, err = red:unsubscribe()
+            if not res then
+                ngx.say("1: failed to unscribe: ", err)
+            else
+                ngx.say("1: unsubscribe: ", cjson.encode(res))
+            end
+
+            res, err = red:read_reply()
+            if not res then
+                ngx.say("1: failed to read reply: ", err)
+            else
+                ngx.say("1: receive: ", cjson.encode(res))
+            end
+
+            red:set_timeout(10) -- 10ms
+            res, err = red:read_reply()
+            if not res then
+                ngx.say("1: failed to read reply: ", err)
+            else
+                ngx.say("1: receive: ", cjson.encode(res))
+            end
+            red:set_timeout(1000) -- 1s
+
+            res, err = red:set("dog", 1)
+            if not res then
+                ngx.say("1: failed to set: ", err)
+            else
+                ngx.say("1: receive: ", cjson.encode(res))
+            end
+
+            red:close()
+            red2:close()
+        }
+    }
+--- response_body_like chop
+^1: subscribe dog: \["subscribe","dog",1\]
+1: subscribe cat: \["subscribe","cat",2\]
+2: publish: 1
+1: receive: \["message","dog","Hello"\]
+1: failed to read reply: timeout
+1: unsubscribe: \[\["unsubscribe","(?:dog|cat)",1\],\["unsubscribe","(?:dog|cat)",0\]\]
+1: failed to read reply: not subscribed
+1: failed to read reply: not subscribed
+1: receive: "OK"
+$
+--- stream_conf_enable
+--- grep_error_log eval
+qr/log redis request \[[^]]+\]/
+--- grep_error_log_out
+log redis request ["subscribe","dog"]
+log redis request ["subscribe","cat"]
+log redis request ["publish","dog","Hello"]
+log redis request ["unsubscribe"]
+log redis request ["set","dog","1"]
+
+
+
+=== TEST 13: psubscribe & punsubscribe
+--- stream_extra_init_by_lua
+            local cjson = require "cjson"
+            local redis_proto = require("apisix.stream.xrpc.protocols.redis")
+            redis_proto.log = function(sess, ctx)
+                ngx.log(ngx.WARN, "log redis request ", cjson.encode(ctx.cmd_line))
+            end
+
+--- config
+    location /t {
+        content_by_lua_block {
+            local cjson = require "cjson"
+            local redis = require "resty.redis"
+
+            local red = redis:new()
+            local red2 = redis:new()
+
+            red:set_timeout(1000) -- 1 sec
+            red2:set_timeout(1000) -- 1 sec
+
+            local ok, err = red:connect("127.0.0.1", $TEST_NGINX_REDIS_PORT)
+            if not ok then
+                ngx.say("1: failed to connect: ", err)
+                return
+            end
+
+            ok, err = red2:connect("127.0.0.1", $TEST_NGINX_REDIS_PORT)
+            if not ok then
+                ngx.say("2: failed to connect: ", err)
+                return
+            end
+
+            local res, err = red:psubscribe("dog*", "cat*")
+            if not res then
+                ngx.say("1: failed to subscribe: ", err)
+                return
+            end
+
+            ngx.say("1: psubscribe: ", cjson.encode(res))
+
+            res, err = red2:publish("dog1", "Hello")
+            if not res then
+                ngx.say("2: failed to publish: ", err)
+                return
+            end
+
+            ngx.say("2: publish: ", cjson.encode(res))
+
+            res, err = red:read_reply()
+            if not res then
+                ngx.say("1: failed to read reply: ", err)
+            else
+                ngx.say("1: receive: ", cjson.encode(res))
+            end
+
+            res, err = red:punsubscribe("cat*", "dog*")
+            if not res then
+                ngx.say("1: failed to unscribe: ", err)
+            else
+                ngx.say("1: punsubscribe: ", cjson.encode(res))
+            end
+
+            res, err = red:set("dog", 1)
+            if not res then
+                ngx.say("1: failed to set: ", err)
+            else
+                ngx.say("1: receive: ", cjson.encode(res))
+            end
+
+            red:close()
+            red2:close()
+        }
+    }
+--- response_body_like chop
+^1: psubscribe: \[\["psubscribe","dog\*",1\],\["psubscribe","cat\*",2\]\]
+2: publish: 1
+1: receive: \["pmessage","dog\*","dog1","Hello"\]
+1: punsubscribe: \[\["punsubscribe","cat\*",1\],\["punsubscribe","dog\*",0\]\]
+1: receive: "OK"
+$
+--- stream_conf_enable
+--- grep_error_log eval
+qr/log redis request \[[^]]+\]/
+--- grep_error_log_out
+log redis request ["psubscribe","dog*","cat*"]
+log redis request ["publish","dog1","Hello"]
+log redis request ["punsubscribe","cat*","dog*"]
+log redis request ["set","dog","1"]