You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@brpc.apache.org by GitBox <gi...@apache.org> on 2022/08/04 10:24:14 UTC

[GitHub] [incubator-brpc] iamwuyunfeng opened a new issue, #1875: brpc stream 存在接收端已断开连接但发送端完全未感知的可能

iamwuyunfeng opened a new issue, #1875:
URL: https://github.com/apache/incubator-brpc/issues/1875

   **Describe the bug (描述bug)**
   假设发送端和接收端的 max_body_size 都是 64M(FLAGS_max_body_size 默认值),发送端通过 stream 发送一个 128M 的消息可以成功,但是接收端接受会失败,并且接收端的 stream 会被 close,而发送端此时会无法发现 stream 已经 close 了,具体表现为:
   1. 发送端所注册 handler 的 on_closed 回调不会被调用
   2. 调用 brpc::StreamWrite 不会返回表示 stream 已经 close 的错误码 EINVAL,而是会返回 EAGAIN,这与接口文档说明不符,且文档中提到遇到 EAGAIN 可以调用 brpc::StreamWait 等待接收端消费缓冲区内消息
   3. 如果继续调用 brpc::StreamWait,在未设置超时时间时会卡死在 brpc::StreamWait 调用;在设置超时时间时,仍然不会返回表示 stream 已经 close 的错误码 EINVAL,而是会等到超时后返回 ETIMEOUT,用户此时如果循环重试 brpc::StreamWait 可能会死循环
   
   **To Reproduce (复现方法)**
   在 brpc_streaming_rpc_unittest.cpp 加上下面的单测重新编译
   `TEST_F(StreamingRpcTest, auto_close_if_receive_too_big_data) {
       HandlerControl hc;
       OrderedInputHandler handler(&hc);
       hc.block = true;
       brpc::StreamOptions opt;
       opt.handler = &handler;
       const int N = 10000;
       opt.max_buf_size = sizeof(uint32_t) *  N;
       brpc::Server server;
       MyServiceWithStream service(opt);
       ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE));
       ASSERT_EQ(0, server.Start(9007, NULL));
       brpc::Channel channel;
       ASSERT_EQ(0, channel.Init("127.0.0.1:9007", NULL));
       brpc::Controller cntl;
       brpc::StreamId request_stream;
       brpc::StreamOptions request_stream_options;
       request_stream_options.max_buf_size = sizeof(uint32_t) * N;
       ASSERT_EQ(0, StreamCreate(&request_stream, cntl, &request_stream_options));
       brpc::ScopedStream stream_guard(request_stream);
       test::EchoService_Stub stub(&channel);
       stub.Echo(&cntl, &request, &response, NULL);
       ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText() << " request_stream=" << request_stream;
       butil::IOBuf out;
       out.append(std::string(brpc::FLAGS_max_body_size, '0'));
       ASSERT_EQ(0, brpc::StreamWrite(request_stream, out));
       ASSERT_EQ(EAGAIN, brpc::StreamWrite(request_stream, out));
       timespec wait_timeout = butil::seconds_to_timespec(1);
       // FIXME: should notice that stream is closed and return EINVAL to avoid infinite loop
       while(brpc::StreamWait(request_stream, &wait_timeout) == ETIMEDOUT) {
         usleep(100);
       }
       while (!handler.stopped()) {
           usleep(100);
       }
       ASSERT_FALSE(handler.failed());
       ASSERT_EQ(0, handler.idle_times());
       ASSERT_EQ(0, handler._expected_next_value);
   }`
   运行
   `cd build; ./test/brpc_streaming_rpc_unittest --gtest_filter=StreamingRpcTest.auto_close_if_receive_too_big_data`
   
   **Expected behavior (期望行为)**
   能够通过上面的加上的单测
   
   **Versions (各种版本)**
   OS: CentOS7
   Compiler: gcc-9.3.1
   brpc: master (53f6436a63f4b4c23d22b9e07e5d6d9af8af7fe7)
   protobuf: 3.6.1
   
   **Additional context/screenshots (更多上下文/截图)**
   
   > [root@orkv-mgr incubator-brpc]# ./build/test/brpc_streaming_rpc_unittest --gtest_filter=StreamingRpcTest.auto_close_if_receive_too_big_data
   Running main() from gtest_main.cc
   Note: Google Test filter = StreamingRpcTest.auto_close_if_receive_too_big_data
   [==========] Running 1 test from 1 test case.
   [----------] Global test environment set-up.
   [----------] 1 test from StreamingRpcTest
   [ RUN      ] StreamingRpcTest.auto_close_if_receive_too_big_data
   I0804 18:21:34.318899 36258 /root/github/rpc/incubator-brpc/src/brpc/server.cpp:1046] Server[MyServiceWithStream] is serving on port=9007.
   I0804 18:21:34.319361 36258 /root/github/rpc/incubator-brpc/src/brpc/server.cpp:1049] Check out http://orkv-mgr:9007 in web browser.
   I0804 18:21:34.321214 36261 /root/github/rpc/incubator-brpc/test/brpc_streaming_rpc_unittest.cpp:78] Created response_stream=8589934595
   E0804 18:21:34.409823 36290 /root/github/rpc/incubator-brpc/src/brpc/input_messenger.cpp:115] A message from 127.0.0.1:46330(protocol=streaming_rpc) is bigger than 67108864 bytes, the connection will be closed. Set max_body_size to allow bigger messages
   W0804 18:21:34.410057 36290 /root/github/rpc/incubator-brpc/src/brpc/input_messenger.cpp:250] Close Socket{id=128 fd=11 addr=127.0.0.1:46330:9007} (0x3664000): too big data
   I0804 18:21:34.413154 36265 /root/github/rpc/incubator-brpc/test/brpc_streaming_rpc_unittest.cpp:42] Service side stream: 8589934595 is closed
   W0804 18:21:34.413194 36306 /root/github/rpc/incubator-brpc/src/brpc/input_messenger.cpp:214] Fail to read from Socket{id=1 fd=10 addr=127.0.0.1:9007:46330} (0x33ca200): Connection reset by peer
   W0804 18:21:34.413346 36272 /root/github/rpc/incubator-brpc/src/brpc/socket.cpp:1616] Fail to keep-write into Socket{id=1 fd=10 addr=127.0.0.1:9007:46330} (0x33ca200): Broken pipe
   I0804 18:21:34.513589 36281 /root/github/rpc/incubator-brpc/src/brpc/socket.cpp:2202] Checking Socket{id=1 addr=127.0.0.1:9007} (0x33ca200)
   I0804 18:21:34.513832 36300 /root/github/rpc/incubator-brpc/src/brpc/socket.cpp:2262] Revived Socket{id=1 addr=127.0.0.1:9007} (0x33ca200) (Connectable)
   
   上面举例的 128M 可能比较极端,但这个问题导致的主要风险在于:发送端一般情况下是不感知接收端的 FLAGS_max_body_size 是多少的,因此理论上发送端发送的任何消息都有可能超出接收端的 FLAGS_max_body_size,而这个后果却需要发送端来承担(因为感知不到连接断开,服务端可能一直在尝试写入)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org