You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@brpc.apache.org by ja...@apache.org on 2020/11/29 08:07:44 UTC
[incubator-brpc] branch master updated: Support pass down request_id
This is an automated email from the ASF dual-hosted git repository.
jamesge pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 20f2b4a Support pass down request_id
20f2b4a is described below
commit 20f2b4a887bb693382b47b7ff24a726d29eb224d
Author: jamesge <jg...@gmail.com>
AuthorDate: Sun Nov 29 16:07:25 2020 +0800
Support pass down request_id
---
example/cascade_echo_c++/client.cpp | 10 +++++----
example/cascade_echo_c++/server.cpp | 10 ++++-----
src/brpc/controller.cpp | 23 +++++++++++++-------
src/brpc/controller.h | 39 +++++++++++++++++++++++++++++-----
src/brpc/policy/baidu_rpc_meta.proto | 1 +
src/brpc/policy/baidu_rpc_protocol.cpp | 6 ++++++
src/brpc/policy/http_rpc_protocol.cpp | 16 +++++++++++---
src/butil/fast_rand.cpp | 15 +++++++++++++
src/butil/fast_rand.h | 7 +++++-
test/brpc_controller_unittest.cpp | 2 +-
10 files changed, 101 insertions(+), 28 deletions(-)
diff --git a/example/cascade_echo_c++/client.cpp b/example/cascade_echo_c++/client.cpp
index 1bb7005..cb545b6 100644
--- a/example/cascade_echo_c++/client.cpp
+++ b/example/cascade_echo_c++/client.cpp
@@ -26,8 +26,9 @@
#include <brpc/server.h>
#include "echo.pb.h"
#include <bvar/bvar.h>
+#include <butil/fast_rand.h>
-DEFINE_int32(thread_num, 4, "Number of threads to send requests");
+DEFINE_int32(thread_num, 2, "Number of threads to send requests");
DEFINE_bool(use_bthread, false, "Use bthread to send requests");
DEFINE_string(attachment, "foo", "Carry this along with requests");
DEFINE_string(connection_type, "", "Connection type. Available values: single, pooled, short");
@@ -38,7 +39,7 @@ DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
DEFINE_string(protocol, "baidu_std", "Protocol type. Defined in src/brpc/options.proto");
DEFINE_int32(depth, 0, "number of loop calls");
// Don't send too frequently in this example
-DEFINE_int32(sleep_ms, 100, "milliseconds to sleep after each RPC");
+DEFINE_int32(sleep_ms, 1000, "milliseconds to sleep after each RPC");
DEFINE_int32(dummy_port, -1, "Launch dummy server at this port");
bvar::LatencyRecorder g_latency_recorder("client");
@@ -50,7 +51,6 @@ void* sender(void* arg) {
example::EchoService_Stub stub(chan);
// Send a request and wait for the response every 1 second.
- int log_id = 0;
while (!brpc::IsAskedToQuit()) {
// We will receive response synchronously, safe to put variables
// on stack.
@@ -63,7 +63,9 @@ void* sender(void* arg) {
request.set_depth(FLAGS_depth);
}
- cntl.set_log_id(log_id ++); // set by user
+ // Set request_id to be a random string
+ cntl.set_request_id(butil::fast_rand_printable(9));
+
// Set attachment which is wired to network directly instead of
// being serialized into protobuf messages.
cntl.request_attachment().append(FLAGS_attachment);
diff --git a/example/cascade_echo_c++/server.cpp b/example/cascade_echo_c++/server.cpp
index b8f1c04..47f1dd9 100644
--- a/example/cascade_echo_c++/server.cpp
+++ b/example/cascade_echo_c++/server.cpp
@@ -52,27 +52,25 @@ public:
static_cast<brpc::Controller*>(cntl_base);
if (request->depth() > 0) {
- TRACEPRINTF("I'm about to call myself for another time, depth=%d",
- request->depth());
+ CLOGI(cntl) << "I'm about to call myself for another time, depth=" << request->depth();
example::EchoService_Stub stub(&channel);
example::EchoRequest request2;
example::EchoResponse response2;
- brpc::Controller cntl2;
+ brpc::Controller cntl2(cntl->inheritable());
request2.set_message(request->message());
request2.set_depth(request->depth() - 1);
- cntl2.set_log_id(cntl->log_id());
cntl2.set_timeout_ms(FLAGS_timeout_ms);
cntl2.set_max_retry(FLAGS_max_retry);
stub.Echo(&cntl2, &request2, &response2, NULL);
if (cntl2.Failed()) {
- LOG(ERROR) << "Fail to send EchoRequest, " << cntl2.ErrorText();
+ CLOGE(&cntl2) << "Fail to send EchoRequest, " << cntl2.ErrorText();
cntl->SetFailed(cntl2.ErrorCode(), "%s", cntl2.ErrorText().c_str());
return;
}
response->set_message(response2.message());
} else {
- TRACEPRINTF("I'm the last call");
+ CLOGI(cntl) << "I'm the last call";
response->set_message(request->message());
}
diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp
index a3c77e0..d16f35d 100644
--- a/src/brpc/controller.cpp
+++ b/src/brpc/controller.cpp
@@ -83,7 +83,6 @@ namespace brpc {
DEFINE_bool(graceful_quit_on_sigterm, false,
"Register SIGTERM handle func to quit graceful");
-DEFINE_string(request_id_header, "x-request-id", "The http header to mark a session");
const IdlNames idl_single_req_single_res = { "req", "res" };
const IdlNames idl_single_req_multi_res = { "req", "" };
@@ -130,6 +129,13 @@ Controller::Controller() {
ResetPods();
}
+Controller::Controller(const Inheritable& parent_ctx) {
+ CHECK_EQ(0, pthread_once(&s_create_vars_once, CreateVars));
+ *g_ncontroller << 1;
+ ResetPods();
+ _inheritable = parent_ctx;
+}
+
struct SessionKVFlusher {
Controller* cntl;
};
@@ -247,7 +253,7 @@ void Controller::ResetPods() {
_response_compress_type = COMPRESS_TYPE_NONE;
_fail_limit = UNSET_MAGIC_NUM;
_pipelined_count = 0;
- _log_id = 0;
+ _inheritable.Reset();
_pchan_sub_count = 0;
_response = NULL;
_done = NULL;
@@ -330,7 +336,7 @@ void Controller::set_max_retry(int max_retry) {
void Controller::set_log_id(uint64_t log_id) {
add_flag(FLAGS_LOG_ID);
- _log_id = log_id;
+ _inheritable.log_id = log_id;
}
@@ -1249,7 +1255,7 @@ void Controller::SaveClientSettings(ClientSettings* s) const {
s->tos = _tos;
s->connection_type = _connection_type;
s->request_compress_type = _request_compress_type;
- s->log_id = _log_id;
+ s->log_id = log_id();
s->has_request_code = has_request_code();
s->request_code = _request_code;
}
@@ -1517,8 +1523,8 @@ void Controller::FlushSessionKV(std::ostream& os) {
}
const std::string* pRID = nullptr;
- if (_http_request) {
- pRID = _http_request->GetHeader(FLAGS_request_id_header);
+ if (!request_id().empty()) {
+ pRID = &request_id();
}
if (FLAGS_log_as_json) {
@@ -1544,10 +1550,11 @@ std::ostream& operator<<(std::ostream& os, const Controller::LogPrefixDummy& p)
p.DoPrintLogPrefix(os);
return os;
}
+
void Controller::DoPrintLogPrefix(std::ostream& os) const {
const std::string* pRID = nullptr;
- if (_http_request) {
- pRID = _http_request->GetHeader(FLAGS_request_id_header);
+ if (!request_id().empty()) {
+ pRID = &request_id();
if (pRID) {
if (FLAGS_log_as_json) {
os << BRPC_REQ_ID "\":\"" << *pRID << "\",";
diff --git a/src/brpc/controller.h b/src/brpc/controller.h
index 9118633..e4e3533 100755
--- a/src/brpc/controller.h
+++ b/src/brpc/controller.h
@@ -23,6 +23,7 @@
// on internal structures, use opaque pointers instead.
#include <gflags/gflags.h> // Users often need gflags
+#include <string>
#include "butil/intrusive_ptr.hpp" // butil::intrusive_ptr
#include "bthread/errno.h" // Redefine errno
#include "butil/endpoint.h" // butil::EndPoint
@@ -142,9 +143,22 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
static const uint32_t FLAGS_ENABLED_CIRCUIT_BREAKER = (1 << 17);
static const uint32_t FLAGS_ALWAYS_PRINT_PRIMITIVE_FIELDS = (1 << 18);
static const uint32_t FLAGS_HEALTH_CHECK_CALL = (1 << 19);
-
+
+public:
+ struct Inheritable {
+ Inheritable() : log_id(0) {}
+ void Reset() {
+ log_id = 0;
+ request_id.clear();
+ }
+
+ uint64_t log_id;
+ std::string request_id;
+ };
+
public:
Controller();
+ Controller(const Inheritable& parent_ctx);
~Controller();
// ------------------------------------------------------------------
@@ -200,6 +214,8 @@ public:
// queries following the topology of servers) with a same log_id.
void set_log_id(uint64_t log_id);
+ void set_request_id(std::string request_id) { _inheritable.request_id = request_id; }
+
// Set type of service: http://en.wikipedia.org/wiki/Type_of_service
// Current implementation has limits: If the connection is already
// established, this setting has no effect until the connection is broken
@@ -470,8 +486,10 @@ public:
int ErrorCode() const { return _error_code; }
// Getters:
+ const Inheritable& inheritable() { return _inheritable; }
bool has_log_id() const { return has_flag(FLAGS_LOG_ID); }
- uint64_t log_id() const { return _log_id; }
+ uint64_t log_id() const { return _inheritable.log_id; }
+ const std::string& request_id() const { return _inheritable.request_id; }
CompressType request_compress_type() const { return _request_compress_type; }
CompressType response_compress_type() const { return _response_compress_type; }
const HttpHeader& http_request() const
@@ -731,7 +749,7 @@ private:
int _preferred_index;
CompressType _request_compress_type;
CompressType _response_compress_type;
- uint64_t _log_id;
+ Inheritable _inheritable;
int _pchan_sub_count;
google::protobuf::Message* _response;
google::protobuf::Closure* _done;
@@ -814,8 +832,19 @@ std::ostream& operator<<(std::ostream& os, const Controller::LogPrefixDummy& p);
} // namespace brpc
-// Print contextual logs with @rid which is got from "x-request-id"(changable
-// by -request_id_header) in http header by default
+// Print contextual logs prefixed with "@rid=REQUEST_ID" which marks a session
+// and eases debugging. The REQUEST_ID is carried in http/rpc request or
+// inherited from another controller.
+// As a server:
+// Call CLOG*(cntl) << ... to log instead of LOG(*) << ..
+// As a client:
+// Inside a service:
+// Use Controller(service_cntl->inheritable()) to create controllers which
+// inherit session info from the service's requests
+// Standalone brpc client:
+// Set cntl->set_request_id(REQUEST_ID);
+// Standalone http client:
+// Set header 'X-REQUEST-ID'
#define CLOGD(cntl) LOG(DEBUG) << (cntl)->LogPrefix()
#define CLOGI(cntl) LOG(INFO) << (cntl)->LogPrefix()
#define CLOGW(cntl) LOG(WARNING) << (cntl)->LogPrefix()
diff --git a/src/brpc/policy/baidu_rpc_meta.proto b/src/brpc/policy/baidu_rpc_meta.proto
index 5ba335c..dfff8ed 100644
--- a/src/brpc/policy/baidu_rpc_meta.proto
+++ b/src/brpc/policy/baidu_rpc_meta.proto
@@ -41,6 +41,7 @@ message RpcRequestMeta {
optional int64 trace_id = 4;
optional int64 span_id = 5;
optional int64 parent_span_id = 6;
+ optional string request_id = 7; // correspond to x-request-id in http header
}
message RpcResponseMeta {
diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp
index 88197f6..fb91f5e 100644
--- a/src/brpc/policy/baidu_rpc_protocol.cpp
+++ b/src/brpc/policy/baidu_rpc_protocol.cpp
@@ -343,6 +343,9 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
if (request_meta.has_log_id()) {
cntl->set_log_id(request_meta.log_id());
}
+ if (request_meta.has_request_id()) {
+ cntl->set_request_id(request_meta.request_id());
+ }
cntl->set_request_compress_type((CompressType)meta.compress_type());
accessor.set_server(server)
.set_security_mode(security_mode)
@@ -648,6 +651,9 @@ void PackRpcRequest(butil::IOBuf* req_buf,
if (cntl->has_log_id()) {
request_meta->set_log_id(cntl->log_id());
}
+ if (!cntl->request_id().empty()) {
+ request_meta->set_request_id(cntl->request_id());
+ }
meta.set_correlation_id(correlation_id);
StreamId request_stream_id = accessor.request_stream();
if (request_stream_id != INVALID_STREAM_ID) {
diff --git a/src/brpc/policy/http_rpc_protocol.cpp b/src/brpc/policy/http_rpc_protocol.cpp
index c175f4f..7bd06ec 100644
--- a/src/brpc/policy/http_rpc_protocol.cpp
+++ b/src/brpc/policy/http_rpc_protocol.cpp
@@ -67,10 +67,13 @@ DEFINE_string(http_header_of_user_ip, "", "http requests sent by proxies may "
"brpc will read ip:port from the specified header for "
"authorization and set Controller::remote_side()");
-DEFINE_bool(pb_enum_as_number, false, "[Not recommended] Convert enums in "
+DEFINE_bool(pb_enum_as_number, false,
+ "[Not recommended] Convert enums in "
"protobuf to json as numbers, affecting both client-side and "
"server-side");
+DEFINE_string(request_id_header, "x-request-id", "The http header to mark a session");
+
// Read user address from the header specified by -http_header_of_user_ip
static bool GetUserAddressFromHeaderImpl(const HttpHeader& headers,
butil::EndPoint* user_addr) {
@@ -566,8 +569,10 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/,
// Fill log-id if user set it.
if (cntl->has_log_id()) {
hreq.SetHeader(common->LOG_ID,
- butil::string_printf(
- "%llu", (unsigned long long)cntl->log_id()));
+ butil::string_printf("%llu", (unsigned long long)cntl->log_id()));
+ }
+ if (!cntl->request_id().empty()) {
+ hreq.SetHeader(FLAGS_request_id_header, cntl->request_id());
}
if (!is_http2) {
@@ -1264,6 +1269,11 @@ void ProcessHttpRequest(InputMessageBase *msg) {
}
}
+ const std::string* request_id = req_header.GetHeader(FLAGS_request_id_header);
+ if (request_id) {
+ cntl->set_request_id(*request_id);
+ }
+
// Tag the bthread with this server's key for
// thread_local_data().
if (server->thread_local_options().thread_local_data_factory) {
diff --git a/src/butil/fast_rand.cpp b/src/butil/fast_rand.cpp
index 1bd80c9..36e0e83 100644
--- a/src/butil/fast_rand.cpp
+++ b/src/butil/fast_rand.cpp
@@ -178,4 +178,19 @@ void fast_rand_bytes(void* output, size_t output_length) {
}
}
+std::string fast_rand_printable(size_t length) {
+ std::string result(length, 0);
+ const size_t halflen = length/2;
+ fast_rand_bytes(&result[0], halflen);
+ for (size_t i = 0; i < halflen; ++i) {
+ const uint8_t b = result[halflen - 1 - i];
+ result[length - 1 - 2*i] = 'A' + (b & 0xF);
+ result[length - 2 - 2*i] = 'A' + (b >> 4);
+ }
+ if (halflen * 2 != length) {
+ result[0] = 'A' + (fast_rand() % 16);
+ }
+ return result;
+}
+
} // namespace butil
diff --git a/src/butil/fast_rand.h b/src/butil/fast_rand.h
index ed941d6..b2ccb4f 100644
--- a/src/butil/fast_rand.h
+++ b/src/butil/fast_rand.h
@@ -20,7 +20,9 @@
#ifndef BUTIL_FAST_RAND_H
#define BUTIL_FAST_RAND_H
+#include <cstddef>
#include <stdint.h>
+#include <string>
namespace butil {
@@ -66,7 +68,10 @@ template <typename T> T fast_rand_in(T min, T max) {
double fast_rand_double();
// Fills |output_length| bytes of |output| with random data.
-void fast_rand_bytes(void* output, size_t output_length, uint8_t min);
+void fast_rand_bytes(void *output, size_t output_length);
+
+// Generate a random printable string of |length| bytes
+std::string fast_rand_printable(size_t length);
}
diff --git a/test/brpc_controller_unittest.cpp b/test/brpc_controller_unittest.cpp
index 0def06d..f733328 100644
--- a/test/brpc_controller_unittest.cpp
+++ b/test/brpc_controller_unittest.cpp
@@ -122,7 +122,7 @@ TEST_F(ControllerTest, SessionKV) {
ASSERT_TRUE(startsWith(sink1, "W")) << sink1;
sink1.clear();
- cntl.http_request().SetHeader("x-request-id", "abcdEFG-456");
+ cntl.set_request_id("abcdEFG-456");
CLOGE(&cntl) << "My ERROR Log";
ASSERT_TRUE(endsWith(sink1, "] @rid=abcdEFG-456 My ERROR Log")) << sink1;
ASSERT_TRUE(startsWith(sink1, "E")) << sink1;
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org