You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@brpc.apache.org by ja...@apache.org on 2020/11/29 08:07:44 UTC

[incubator-brpc] branch master updated: Support pass down request_id

This is an automated email from the ASF dual-hosted git repository.

jamesge pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 20f2b4a  Support pass down request_id
20f2b4a is described below

commit 20f2b4a887bb693382b47b7ff24a726d29eb224d
Author: jamesge <jg...@gmail.com>
AuthorDate: Sun Nov 29 16:07:25 2020 +0800

    Support pass down request_id
---
 example/cascade_echo_c++/client.cpp    | 10 +++++----
 example/cascade_echo_c++/server.cpp    | 10 ++++-----
 src/brpc/controller.cpp                | 23 +++++++++++++-------
 src/brpc/controller.h                  | 39 +++++++++++++++++++++++++++++-----
 src/brpc/policy/baidu_rpc_meta.proto   |  1 +
 src/brpc/policy/baidu_rpc_protocol.cpp |  6 ++++++
 src/brpc/policy/http_rpc_protocol.cpp  | 16 +++++++++++---
 src/butil/fast_rand.cpp                | 15 +++++++++++++
 src/butil/fast_rand.h                  |  7 +++++-
 test/brpc_controller_unittest.cpp      |  2 +-
 10 files changed, 101 insertions(+), 28 deletions(-)

diff --git a/example/cascade_echo_c++/client.cpp b/example/cascade_echo_c++/client.cpp
index 1bb7005..cb545b6 100644
--- a/example/cascade_echo_c++/client.cpp
+++ b/example/cascade_echo_c++/client.cpp
@@ -26,8 +26,9 @@
 #include <brpc/server.h>
 #include "echo.pb.h"
 #include <bvar/bvar.h>
+#include <butil/fast_rand.h>
 
-DEFINE_int32(thread_num, 4, "Number of threads to send requests");
+DEFINE_int32(thread_num, 2, "Number of threads to send requests");
 DEFINE_bool(use_bthread, false, "Use bthread to send requests");
 DEFINE_string(attachment, "foo", "Carry this along with requests");
 DEFINE_string(connection_type, "", "Connection type. Available values: single, pooled, short");
@@ -38,7 +39,7 @@ DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
 DEFINE_string(protocol, "baidu_std", "Protocol type. Defined in src/brpc/options.proto");
 DEFINE_int32(depth, 0, "number of loop calls");
 // Don't send too frequently in this example
-DEFINE_int32(sleep_ms, 100, "milliseconds to sleep after each RPC");
+DEFINE_int32(sleep_ms, 1000, "milliseconds to sleep after each RPC");
 DEFINE_int32(dummy_port, -1, "Launch dummy server at this port");
 
 bvar::LatencyRecorder g_latency_recorder("client");
@@ -50,7 +51,6 @@ void* sender(void* arg) {
     example::EchoService_Stub stub(chan);
 
     // Send a request and wait for the response every 1 second.
-    int log_id = 0;
     while (!brpc::IsAskedToQuit()) {
         // We will receive response synchronously, safe to put variables
         // on stack.
@@ -63,7 +63,9 @@ void* sender(void* arg) {
             request.set_depth(FLAGS_depth);
         }
 
-        cntl.set_log_id(log_id ++);  // set by user
+        // Set request_id to be a random string
+        cntl.set_request_id(butil::fast_rand_printable(9));
+
         // Set attachment which is wired to network directly instead of 
         // being serialized into protobuf messages.
         cntl.request_attachment().append(FLAGS_attachment);
diff --git a/example/cascade_echo_c++/server.cpp b/example/cascade_echo_c++/server.cpp
index b8f1c04..47f1dd9 100644
--- a/example/cascade_echo_c++/server.cpp
+++ b/example/cascade_echo_c++/server.cpp
@@ -52,27 +52,25 @@ public:
             static_cast<brpc::Controller*>(cntl_base);
 
         if (request->depth() > 0) {
-            TRACEPRINTF("I'm about to call myself for another time, depth=%d",
-                        request->depth());
+            CLOGI(cntl) << "I'm about to call myself for another time, depth=" << request->depth();
             example::EchoService_Stub stub(&channel);
             example::EchoRequest request2;
             example::EchoResponse response2;
-            brpc::Controller cntl2;
+            brpc::Controller cntl2(cntl->inheritable());
             request2.set_message(request->message());
             request2.set_depth(request->depth() - 1);
 
-            cntl2.set_log_id(cntl->log_id());
             cntl2.set_timeout_ms(FLAGS_timeout_ms);
             cntl2.set_max_retry(FLAGS_max_retry);
             stub.Echo(&cntl2, &request2, &response2, NULL);
             if (cntl2.Failed()) {
-                LOG(ERROR) << "Fail to send EchoRequest, " << cntl2.ErrorText();
+                CLOGE(&cntl2) << "Fail to send EchoRequest, " << cntl2.ErrorText();
                 cntl->SetFailed(cntl2.ErrorCode(), "%s", cntl2.ErrorText().c_str());
                 return;
             }
             response->set_message(response2.message());
         } else {
-            TRACEPRINTF("I'm the last call");
+            CLOGI(cntl) << "I'm the last call";
             response->set_message(request->message());
         }
         
diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp
index a3c77e0..d16f35d 100644
--- a/src/brpc/controller.cpp
+++ b/src/brpc/controller.cpp
@@ -83,7 +83,6 @@ namespace brpc {
 
 DEFINE_bool(graceful_quit_on_sigterm, false,
             "Register SIGTERM handle func to quit graceful");
-DEFINE_string(request_id_header, "x-request-id", "The http header to mark a session");
 
 const IdlNames idl_single_req_single_res = { "req", "res" };
 const IdlNames idl_single_req_multi_res = { "req", "" };
@@ -130,6 +129,13 @@ Controller::Controller() {
     ResetPods();
 }
 
+Controller::Controller(const Inheritable& parent_ctx) {
+    CHECK_EQ(0, pthread_once(&s_create_vars_once, CreateVars));
+    *g_ncontroller << 1;
+    ResetPods();
+    _inheritable = parent_ctx;
+}
+
 struct SessionKVFlusher {
     Controller* cntl;
 };
@@ -247,7 +253,7 @@ void Controller::ResetPods() {
     _response_compress_type = COMPRESS_TYPE_NONE;
     _fail_limit = UNSET_MAGIC_NUM;
     _pipelined_count = 0;
-    _log_id = 0;
+    _inheritable.Reset();
     _pchan_sub_count = 0;
     _response = NULL;
     _done = NULL;
@@ -330,7 +336,7 @@ void Controller::set_max_retry(int max_retry) {
 
 void Controller::set_log_id(uint64_t log_id) {
     add_flag(FLAGS_LOG_ID);
-    _log_id = log_id;
+    _inheritable.log_id = log_id;
 }
 
 
@@ -1249,7 +1255,7 @@ void Controller::SaveClientSettings(ClientSettings* s) const {
     s->tos = _tos;
     s->connection_type = _connection_type;
     s->request_compress_type = _request_compress_type;
-    s->log_id = _log_id;
+    s->log_id = log_id();
     s->has_request_code = has_request_code();
     s->request_code = _request_code;
 }
@@ -1517,8 +1523,8 @@ void Controller::FlushSessionKV(std::ostream& os) {
     }
 
     const std::string* pRID = nullptr;
-    if (_http_request) {
-        pRID = _http_request->GetHeader(FLAGS_request_id_header);
+    if (!request_id().empty()) {
+        pRID = &request_id();
     }
 
     if (FLAGS_log_as_json) {
@@ -1544,10 +1550,11 @@ std::ostream& operator<<(std::ostream& os, const Controller::LogPrefixDummy& p)
     p.DoPrintLogPrefix(os);
     return os;
 }
+
 void Controller::DoPrintLogPrefix(std::ostream& os) const {
     const std::string* pRID = nullptr;
-    if (_http_request) {
-        pRID = _http_request->GetHeader(FLAGS_request_id_header);
+    if (!request_id().empty()) {
+        pRID = &request_id();
         if (pRID) {
             if (FLAGS_log_as_json) {
                 os << BRPC_REQ_ID "\":\"" << *pRID << "\",";
diff --git a/src/brpc/controller.h b/src/brpc/controller.h
index 9118633..e4e3533 100755
--- a/src/brpc/controller.h
+++ b/src/brpc/controller.h
@@ -23,6 +23,7 @@
 // on internal structures, use opaque pointers instead.
 
 #include <gflags/gflags.h>                     // Users often need gflags
+#include <string>
 #include "butil/intrusive_ptr.hpp"             // butil::intrusive_ptr
 #include "bthread/errno.h"                     // Redefine errno
 #include "butil/endpoint.h"                    // butil::EndPoint
@@ -142,9 +143,22 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
     static const uint32_t FLAGS_ENABLED_CIRCUIT_BREAKER = (1 << 17);
     static const uint32_t FLAGS_ALWAYS_PRINT_PRIMITIVE_FIELDS = (1 << 18);
     static const uint32_t FLAGS_HEALTH_CHECK_CALL = (1 << 19);
-    
+
+public:
+    struct Inheritable {
+        Inheritable() : log_id(0) {}
+        void Reset() {
+            log_id = 0;
+            request_id.clear();
+        }
+
+        uint64_t log_id;
+        std::string request_id;
+    };
+
 public:
     Controller();
+    Controller(const Inheritable& parent_ctx);
     ~Controller();
     
     // ------------------------------------------------------------------
@@ -200,6 +214,8 @@ public:
     // queries following the topology of servers) with a same log_id.
     void set_log_id(uint64_t log_id);
 
+    void set_request_id(std::string request_id) { _inheritable.request_id = request_id; }
+
     // Set type of service: http://en.wikipedia.org/wiki/Type_of_service
     // Current implementation has limits: If the connection is already
     // established, this setting has no effect until the connection is broken
@@ -470,8 +486,10 @@ public:
     int ErrorCode() const { return _error_code; }
 
     // Getters:
+    const Inheritable& inheritable() { return _inheritable; }
     bool has_log_id() const { return has_flag(FLAGS_LOG_ID); }
-    uint64_t log_id() const { return _log_id; }
+    uint64_t log_id() const { return _inheritable.log_id; }
+    const std::string& request_id() const { return _inheritable.request_id; }
     CompressType request_compress_type() const { return _request_compress_type; }
     CompressType response_compress_type() const { return _response_compress_type; }
     const HttpHeader& http_request() const 
@@ -731,7 +749,7 @@ private:
     int _preferred_index;
     CompressType _request_compress_type;
     CompressType _response_compress_type;
-    uint64_t _log_id;
+    Inheritable _inheritable;
     int _pchan_sub_count;
     google::protobuf::Message* _response;
     google::protobuf::Closure* _done;
@@ -814,8 +832,19 @@ std::ostream& operator<<(std::ostream& os, const Controller::LogPrefixDummy& p);
 
 } // namespace brpc
 
-// Print contextual logs with @rid which is got from "x-request-id"(changable
-// by -request_id_header) in http header by default
+// Print contextual logs prefixed with "@rid=REQUEST_ID" which marks a session
+// and eases debugging. The REQUEST_ID is carried in http/rpc request or 
+// inherited from another controller.
+// As a server:
+//   Call CLOG*(cntl) << ... to log instead of LOG(*) << ..
+// As a client:
+//   Inside a service:
+//     Use Controller(service_cntl->inheritable()) to create controllers which 
+//     inherit session info from the service's requests
+//   Standalone brpc client:
+//     Set cntl->set_request_id(REQUEST_ID);
+//   Standalone http client:
+//     Set header 'X-REQUEST-ID'
 #define CLOGD(cntl) LOG(DEBUG) << (cntl)->LogPrefix()
 #define CLOGI(cntl) LOG(INFO) << (cntl)->LogPrefix()
 #define CLOGW(cntl) LOG(WARNING) << (cntl)->LogPrefix()
diff --git a/src/brpc/policy/baidu_rpc_meta.proto b/src/brpc/policy/baidu_rpc_meta.proto
index 5ba335c..dfff8ed 100644
--- a/src/brpc/policy/baidu_rpc_meta.proto
+++ b/src/brpc/policy/baidu_rpc_meta.proto
@@ -41,6 +41,7 @@ message RpcRequestMeta {
     optional int64 trace_id = 4;
     optional int64 span_id = 5;
     optional int64 parent_span_id = 6;
+    optional string request_id = 7; // correspond to x-request-id in http header
 }
 
 message RpcResponseMeta {
diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp
index 88197f6..fb91f5e 100644
--- a/src/brpc/policy/baidu_rpc_protocol.cpp
+++ b/src/brpc/policy/baidu_rpc_protocol.cpp
@@ -343,6 +343,9 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
     if (request_meta.has_log_id()) {
         cntl->set_log_id(request_meta.log_id());
     }
+    if (request_meta.has_request_id()) {
+        cntl->set_request_id(request_meta.request_id());
+    }
     cntl->set_request_compress_type((CompressType)meta.compress_type());
     accessor.set_server(server)
         .set_security_mode(security_mode)
@@ -648,6 +651,9 @@ void PackRpcRequest(butil::IOBuf* req_buf,
     if (cntl->has_log_id()) {
         request_meta->set_log_id(cntl->log_id());
     }
+    if (!cntl->request_id().empty()) {
+        request_meta->set_request_id(cntl->request_id());
+    }
     meta.set_correlation_id(correlation_id);
     StreamId request_stream_id = accessor.request_stream();
     if (request_stream_id != INVALID_STREAM_ID) {
diff --git a/src/brpc/policy/http_rpc_protocol.cpp b/src/brpc/policy/http_rpc_protocol.cpp
index c175f4f..7bd06ec 100644
--- a/src/brpc/policy/http_rpc_protocol.cpp
+++ b/src/brpc/policy/http_rpc_protocol.cpp
@@ -67,10 +67,13 @@ DEFINE_string(http_header_of_user_ip, "", "http requests sent by proxies may "
               "brpc will read ip:port from the specified header for "
               "authorization and set Controller::remote_side()");
 
-DEFINE_bool(pb_enum_as_number, false, "[Not recommended] Convert enums in "
+DEFINE_bool(pb_enum_as_number, false,
+            "[Not recommended] Convert enums in "
             "protobuf to json as numbers, affecting both client-side and "
             "server-side");
 
+DEFINE_string(request_id_header, "x-request-id", "The http header to mark a session");
+
 // Read user address from the header specified by -http_header_of_user_ip
 static bool GetUserAddressFromHeaderImpl(const HttpHeader& headers,
                                          butil::EndPoint* user_addr) {
@@ -566,8 +569,10 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/,
     // Fill log-id if user set it.
     if (cntl->has_log_id()) {
         hreq.SetHeader(common->LOG_ID,
-                          butil::string_printf(
-                              "%llu", (unsigned long long)cntl->log_id()));
+                       butil::string_printf("%llu", (unsigned long long)cntl->log_id()));
+    }
+    if (!cntl->request_id().empty()) {
+        hreq.SetHeader(FLAGS_request_id_header, cntl->request_id());
     }
 
     if (!is_http2) {
@@ -1264,6 +1269,11 @@ void ProcessHttpRequest(InputMessageBase *msg) {
         }
     }
 
+    const std::string* request_id = req_header.GetHeader(FLAGS_request_id_header);
+    if (request_id) {
+        cntl->set_request_id(*request_id);
+    }
+
     // Tag the bthread with this server's key for
     // thread_local_data().
     if (server->thread_local_options().thread_local_data_factory) {
diff --git a/src/butil/fast_rand.cpp b/src/butil/fast_rand.cpp
index 1bd80c9..36e0e83 100644
--- a/src/butil/fast_rand.cpp
+++ b/src/butil/fast_rand.cpp
@@ -178,4 +178,19 @@ void fast_rand_bytes(void* output, size_t output_length) {
     }
 }
 
+std::string fast_rand_printable(size_t length) {
+    std::string result(length, 0);
+    const size_t halflen = length/2;
+    fast_rand_bytes(&result[0], halflen);
+    for (size_t i = 0; i < halflen; ++i) {
+        const uint8_t b = result[halflen - 1 - i];
+        result[length - 1 - 2*i] = 'A' + (b & 0xF);
+        result[length - 2 - 2*i] = 'A' + (b >> 4);
+    }
+    if (halflen * 2 != length) {
+        result[0] = 'A' + (fast_rand() % 16);
+    }
+    return result;
+}
+
 }  // namespace butil
diff --git a/src/butil/fast_rand.h b/src/butil/fast_rand.h
index ed941d6..b2ccb4f 100644
--- a/src/butil/fast_rand.h
+++ b/src/butil/fast_rand.h
@@ -20,7 +20,9 @@
 #ifndef BUTIL_FAST_RAND_H
 #define BUTIL_FAST_RAND_H
 
+#include <cstddef>
 #include <stdint.h>
+#include <string>
 
 namespace butil {
 
@@ -66,7 +68,10 @@ template <typename T> T fast_rand_in(T min, T max) {
 double fast_rand_double();
 
 // Fills |output_length| bytes of |output| with random data.
-void fast_rand_bytes(void* output, size_t output_length, uint8_t min);
+void fast_rand_bytes(void *output, size_t output_length);
+
+// Generate a random printable string of |length| bytes
+std::string fast_rand_printable(size_t length);
 
 }
 
diff --git a/test/brpc_controller_unittest.cpp b/test/brpc_controller_unittest.cpp
index 0def06d..f733328 100644
--- a/test/brpc_controller_unittest.cpp
+++ b/test/brpc_controller_unittest.cpp
@@ -122,7 +122,7 @@ TEST_F(ControllerTest, SessionKV) {
         ASSERT_TRUE(startsWith(sink1, "W")) << sink1;
         sink1.clear();
 
-        cntl.http_request().SetHeader("x-request-id", "abcdEFG-456");
+        cntl.set_request_id("abcdEFG-456");
         CLOGE(&cntl) << "My ERROR Log";
         ASSERT_TRUE(endsWith(sink1, "] @rid=abcdEFG-456 My ERROR Log")) << sink1;
         ASSERT_TRUE(startsWith(sink1, "E")) << sink1;


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org