You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@brpc.apache.org by GitBox <gi...@apache.org> on 2022/10/11 08:47:54 UTC

[GitHub] [incubator-brpc] wwbmmm commented on a diff in pull request #1947: split large data when using brpc streaming

wwbmmm commented on code in PR #1947:
URL: https://github.com/apache/incubator-brpc/pull/1947#discussion_r992022722


##########
src/brpc/stream.cpp:
##########
@@ -35,7 +35,8 @@
 namespace brpc {
 
 DECLARE_bool(usercode_in_pthread);
-
+DEFINE_uint64(max_trans_unit_size, 64 * 1024 * 1024,

Review Comment:
   max_stream_data_frame_size



##########
docs/en/streaming_rpc.md:
##########
@@ -16,8 +16,7 @@ Streaming RPC ensures/provides:
 - Full duplex
 - Flow control
 - Notification on timeout
-
-We do not support segment large messages automatically so that multiple Streams on a single TCP connection may lead to [Head-of-line blocking](https://en.wikipedia.org/wiki/Head-of-line_blocking) problem. Please avoid putting huge data into single message until we provide automatic segmentation.
+- support segment large messages automaticall

Review Comment:
   automatically



##########
src/brpc/stream.cpp:
##########
@@ -140,20 +141,30 @@ ssize_t Stream::CutMessageIntoFileDescriptor(int /*fd*/,
         errno = EBADF;
         return -1;
     }
-    butil::IOBuf out;
     ssize_t len = 0;
     for (size_t i = 0; i < size; ++i) {
+      butil::IOBuf *data = data_list[i];
+      size_t length = data->length();
+      uint64_t trans_unit = FLAGS_max_trans_unit_size;
+      int packet_num = ceil((double)length / (double)trans_unit);

Review Comment:
   trans_unit可能为0



##########
src/brpc/stream.cpp:
##########
@@ -140,20 +141,30 @@ ssize_t Stream::CutMessageIntoFileDescriptor(int /*fd*/,
         errno = EBADF;
         return -1;
     }
-    butil::IOBuf out;
     ssize_t len = 0;
     for (size_t i = 0; i < size; ++i) {
+      butil::IOBuf *data = data_list[i];
+      size_t length = data->length();
+      uint64_t trans_unit = FLAGS_max_trans_unit_size;
+      int packet_num = ceil((double)length / (double)trans_unit);
+
+      butil::IOBuf split_data;
+      for (int j = 0; j < packet_num; j++) {

Review Comment:
   用while (!data->empty()) 来判断是不是更直接。也不用计算packet_num了。



##########
src/brpc/stream.cpp:
##########
@@ -140,20 +141,30 @@ ssize_t Stream::CutMessageIntoFileDescriptor(int /*fd*/,
         errno = EBADF;
         return -1;
     }
-    butil::IOBuf out;
     ssize_t len = 0;
     for (size_t i = 0; i < size; ++i) {
+      butil::IOBuf *data = data_list[i];
+      size_t length = data->length();
+      uint64_t trans_unit = FLAGS_max_trans_unit_size;
+      int packet_num = ceil((double)length / (double)trans_unit);
+
+      butil::IOBuf split_data;
+      for (int j = 0; j < packet_num; j++) {
+        butil::IOBuf out;
+        data->cutn(&split_data, trans_unit);
+        bool has_continuation = (j != packet_num - 1);
         StreamFrameMeta fm;
         fm.set_stream_id(_remote_settings.stream_id());
         fm.set_source_stream_id(id());
         fm.set_frame_type(FRAME_TYPE_DATA);
-        // TODO: split large data
-        fm.set_has_continuation(false);
-        policy::PackStreamMessage(&out, fm, data_list[i]);
-        len += data_list[i]->length();
-        data_list[i]->clear();
+        fm.set_has_continuation(has_continuation);
+        policy::PackStreamMessage(&out, fm, &split_data);
+        WriteToHostSocket(&out);

Review Comment:
   如果has_continuation再Write
   否则放到最后统一Write



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org