You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by sp...@apache.org on 2022/05/12 09:52:12 UTC

[apisix] branch master updated: fix(xRPC): log down unfinished request (#7014)

This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new bcc7f3876 fix(xRPC): log down unfinished request (#7014)
bcc7f3876 is described below

commit bcc7f38766280cd64742165c8f7d75271062f1c8
Author: 罗泽轩 <sp...@gmail.com>
AuthorDate: Thu May 12 17:52:08 2022 +0800

    fix(xRPC): log down unfinished request (#7014)
    
    Signed-off-by: spacewander <sp...@gmail.com>
---
 apisix/stream/xrpc/runner.lua                      | 62 +++++++++++-----------
 .../apisix/stream/xrpc/protocols/pingpong/init.lua |  2 +-
 t/xrpc/pingpong.t                                  |  7 +++
 3 files changed, 40 insertions(+), 31 deletions(-)

diff --git a/apisix/stream/xrpc/runner.lua b/apisix/stream/xrpc/runner.lua
index 7296a69b7..8c082a196 100644
--- a/apisix/stream/xrpc/runner.lua
+++ b/apisix/stream/xrpc/runner.lua
@@ -46,31 +46,6 @@ local function open_session(conn_ctx)
 end
 
 
-local function close_session(session, protocol)
-    local upstream_ctx = session._upstream_ctx
-    if upstream_ctx then
-        upstream_ctx.closed = true
-
-        local up = upstream_ctx.upstream
-        protocol.disconnect_upstream(session, up)
-    end
-
-    local upstream_ctxs = session._upstream_ctxs
-    if upstream_ctxs then
-        for _, upstream_ctx in pairs(upstream_ctxs) do
-            upstream_ctx.closed = true
-
-            local up = upstream_ctx.upstream
-            protocol.disconnect_upstream(session, up)
-        end
-    end
-
-    for id in pairs(session._ctxs) do
-        core.log.notice("RPC is not finished, id: ", id)
-    end
-end
-
-
 local function put_req_ctx(session, ctx)
     local id = ctx._id
     session._ctxs[id] = nil
@@ -114,7 +89,7 @@ local function run_log_plugin(ctx, logger)
 end
 
 
-local function finish_req(protocol, session, ctx)
+local function finialize_req(protocol, session, ctx)
     ctx._rpc_end_time = ngx_now()
 
     local loggers = session.route.protocol.logger
@@ -134,6 +109,33 @@ local function finish_req(protocol, session, ctx)
 end
 
 
+local function close_session(session, protocol)
+    local upstream_ctx = session._upstream_ctx
+    if upstream_ctx then
+        upstream_ctx.closed = true
+
+        local up = upstream_ctx.upstream
+        protocol.disconnect_upstream(session, up)
+    end
+
+    local upstream_ctxs = session._upstream_ctxs
+    if upstream_ctxs then
+        for _, upstream_ctx in pairs(upstream_ctxs) do
+            upstream_ctx.closed = true
+
+            local up = upstream_ctx.upstream
+            protocol.disconnect_upstream(session, up)
+        end
+    end
+
+    for id, ctx in pairs(session._ctxs) do
+        core.log.notice("RPC is not finished, id: ", id)
+        ctx.unfinished = true
+        finialize_req(protocol, session, ctx)
+    end
+end
+
+
 local function open_upstream(protocol, session, ctx)
     local key = session._upstream_key
     session._upstream_key = nil
@@ -180,7 +182,7 @@ local function start_upstream_coroutine(session, protocol, downstream, up_ctx)
         local status, ctx = protocol.from_upstream(session, downstream, upstream)
         if status ~= OK then
             if ctx ~= nil then
-                finish_req(protocol, session, ctx)
+                finialize_req(protocol, session, ctx)
             end
 
             if status == DECLINED then
@@ -207,7 +209,7 @@ function _M.run(protocol, conn_ctx)
         local status, ctx = protocol.from_downstream(session, downstream)
         if status ~= OK then
             if ctx ~= nil then
-                finish_req(protocol, session, ctx)
+                finialize_req(protocol, session, ctx)
             end
 
             if status == DECLINED then
@@ -225,7 +227,7 @@ function _M.run(protocol, conn_ctx)
         local status, up_ctx = open_upstream(protocol, session, ctx)
         if status ~= OK then
             if ctx ~= nil then
-                finish_req(protocol, session, ctx)
+                finialize_req(protocol, session, ctx)
             end
 
             break
@@ -234,7 +236,7 @@ function _M.run(protocol, conn_ctx)
         status = protocol.to_upstream(session, ctx, downstream, up_ctx.upstream)
         if status ~= OK then
             if ctx ~= nil then
-                finish_req(protocol, session, ctx)
+                finialize_req(protocol, session, ctx)
             end
 
             if status == DECLINED then
diff --git a/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua b/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua
index 56b32fb31..013725832 100644
--- a/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua
+++ b/t/xrpc/apisix/stream/xrpc/protocols/pingpong/init.lua
@@ -276,7 +276,7 @@ end
 
 
 function _M.log(session, ctx)
-    core.log.info("call pingpong's log")
+    core.log.info("call pingpong's log, ctx unfinished: ", ctx.unfinished == true)
 end
 
 
diff --git a/t/xrpc/pingpong.t b/t/xrpc/pingpong.t
index 64e4e055a..c002d0960 100644
--- a/t/xrpc/pingpong.t
+++ b/t/xrpc/pingpong.t
@@ -359,6 +359,7 @@ passed
 "pp\x03\x00\x02\x00\x00\x00\x00\x04ABCD"
 --- error_log
 RPC is not finished
+call pingpong's log, ctx unfinished: true
 --- no_error_log
 [error]
 
@@ -410,6 +411,12 @@ RPC is not finished
 "pp\x03\x00\x03\x00\x00\x00\x00\x03ABC" .
 "pp\x03\x00\x02\x00\x00\x00\x00\x02AB" .
 "pp\x03\x00\x01\x00\x00\x00\x00\x01A"
+--- grep_error_log eval
+qr/call pingpong's log, ctx unfinished: \w+/
+--- grep_error_log_out
+call pingpong's log, ctx unfinished: false
+call pingpong's log, ctx unfinished: false
+call pingpong's log, ctx unfinished: false