You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@brpc.apache.org by ja...@apache.org on 2020/11/26 06:15:00 UTC

[incubator-brpc] 03/05: add Controller.SessionKV() to record and print session-level KV; Add LOGD/I/W/E/F to print contextual log; Add flag -log_as_json to print logs as valid JSON

This is an automated email from the ASF dual-hosted git repository.

jamesge pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-brpc.git

commit 86acfa893d8f7776a7085701cf9ae5721434e2b6
Author: jamesge <jg...@gmail.com>
AuthorDate: Thu Nov 26 14:11:20 2020 +0800

    add Controller.SessionKV() to record and print session-level KV; Add LOGD/I/W/E/F to print contextual log; Add flag -log_as_json to print logs as valid JSON
---
 src/brpc/controller.cpp             |  83 +++++++++++++++++++++++++-
 src/brpc/controller.h               |  27 +++++++++
 src/brpc/{session_log.h => kvmap.h} |  21 +++----
 src/butil/logging.cc                | 115 ++++++++++++++++++++++++++++++++----
 src/butil/logging.h                 |   1 +
 test/brpc_controller_unittest.cpp   |  55 +++++++++++++++++
 6 files changed, 278 insertions(+), 24 deletions(-)

diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp
index ace380f..5c061ee 100644
--- a/src/brpc/controller.cpp
+++ b/src/brpc/controller.cpp
@@ -79,7 +79,9 @@ BAIDU_REGISTER_ERRNO(brpc::EITP, "Bad Itp response");
 
 namespace brpc {
 
-DEFINE_bool(graceful_quit_on_sigterm, false, "Register SIGTERM handle func to quit graceful");
+DEFINE_bool(graceful_quit_on_sigterm, false,
+            "Register SIGTERM handle func to quit graceful");
+DEFINE_string(request_id_header, "x-request-id", "The http header to mark a session");
 
 const IdlNames idl_single_req_single_res = { "req", "res" };
 const IdlNames idl_single_req_multi_res = { "req", "" };
@@ -128,6 +130,7 @@ Controller::Controller() {
 
 Controller::~Controller() {
     *g_ncontroller << -1;
+    FlushSessionKV(LOG_STREAM(INFO));
     ResetNonPods();
 }
 
@@ -1485,4 +1488,82 @@ google::protobuf::Closure* DoNothing() {
     return butil::get_leaky_singleton<DoNothingClosure>();
 }
 
+KVMap& Controller::SessionKV() {
+    if (_session_kv == nullptr) {
+        _session_kv.reset(new KVMap);
+    }
+    return *_session_kv.get();
+}
+
+#define BRPC_SESSION_END_MSG "Session ends"
+#define BRPC_REQ_ID "@rid"
+#define BRPC_KV_SEP ":"
+
+void Controller::FlushSessionKV(std::ostream& os) {
+    if (_session_kv == nullptr || _session_kv->Count() == 0) {
+        return;
+    }
+
+    const std::string* pRID = nullptr;
+    if (_http_request) {
+        pRID = _http_request->GetHeader(FLAGS_request_id_header);
+    }
+
+    if (logging::FLAGS_log_as_json) {
+        os << "\"M\":\"" BRPC_SESSION_END_MSG "\"";
+        if (pRID) {
+            os << ",\"" BRPC_REQ_ID "\":\"" << *pRID << '"';
+        }
+        for (auto it = _session_kv->Begin(); it != _session_kv->End(); ++it) {
+            os << ",\"" << it->first << "\":\"" << it->second << '"';
+        }
+    } else {
+        os << BRPC_SESSION_END_MSG;
+        if (pRID) {
+            os << " " BRPC_REQ_ID BRPC_KV_SEP << *pRID;
+        }
+        for (auto it = _session_kv->Begin(); it != _session_kv->End(); ++it) {
+            os << ' ' << it->first << BRPC_KV_SEP << it->second;
+        }
+    }
+}
+
+Controller::LogPostfixDummy::~LogPostfixDummy() {
+    *osptr << postfix;
+}
+
+std::ostream& operator<<(std::ostream& os, const Controller::LogPostfixDummy& p) {
+    const_cast<brpc::Controller::LogPostfixDummy&>(p).osptr = &os;
+    if (logging::FLAGS_log_as_json) {
+        os << "\"M\":\"";
+    }
+    return os;
+}
+
+
+Controller::LogPostfixDummy Controller::LogPostfix() const {
+    Controller::LogPostfixDummy result;
+    std::string& p = result.postfix;
+    if (logging::FLAGS_log_as_json) {
+        p.push_back('"');
+    }
+    const std::string* pRID = nullptr;
+    if (_http_request) {
+        pRID = _http_request->GetHeader(FLAGS_request_id_header);
+        if (pRID) {
+            if (logging::FLAGS_log_as_json) {
+                p.append(",\"" BRPC_REQ_ID "\":\"");
+                p.append(*pRID);
+                p.push_back('"');
+            } else {
+                p.reserve(5 + pRID->size());
+                p.append(" " BRPC_REQ_ID BRPC_KV_SEP);
+                p.append(*pRID);
+            }
+
+        }
+    }
+    return result;
+}
+
 } // namespace brpc
diff --git a/src/brpc/controller.h b/src/brpc/controller.h
index 19a352e..fc39e52 100755
--- a/src/brpc/controller.h
+++ b/src/brpc/controller.h
@@ -43,6 +43,7 @@
 #include "brpc/progressive_attachment.h"       // ProgressiveAttachment
 #include "brpc/progressive_reader.h"           // ProgressiveReader
 #include "brpc/grpc.h"
+#include "brpc/kvmap.h"
 
 // EAUTH is defined in MAC
 #ifndef EAUTH
@@ -482,6 +483,19 @@ public:
     const butil::IOBuf& request_attachment() const { return _request_attachment; }
     const butil::IOBuf& response_attachment() const { return _response_attachment; }
 
+    // Get the object to write key/value which will be flushed into
+    // LOG(INFO) when this controller is deleted.
+    KVMap& SessionKV();
+
+    // Contextual prefixes for LOGD/LOGI/LOGW/LOGE/LOGF macros
+    struct LogPostfixDummy {
+        LogPostfixDummy() : osptr(nullptr) {}
+        ~LogPostfixDummy();
+        std::string postfix;
+        std::ostream* osptr;
+    };
+    LogPostfixDummy LogPostfix() const;
+
     // Return true if the remote side creates a stream.
     bool has_remote_stream() { return _remote_stream_settings != NULL; }
 
@@ -660,6 +674,9 @@ private:
     std::string& protocol_param() { return _thrift_method_name; }
     const std::string& protocol_param() const { return _thrift_method_name; }
 
+    // Flush this->SessionKV() into `os'
+    void FlushSessionKV(std::ostream& os);
+
 private:
     // NOTE: align and group fields to make Controller as compact as possible.
 
@@ -739,6 +756,8 @@ private:
     HttpHeader* _http_request;
     HttpHeader* _http_response;
 
+    std::unique_ptr<KVMap> _session_kv;
+
     // Fields with large size but low access frequency 
     butil::IOBuf _request_attachment;
     butil::IOBuf _response_attachment;
@@ -787,7 +806,15 @@ bool IsAskedToQuit();
 // Send Ctrl-C to current process.
 void AskToQuit();
 
+std::ostream& operator<<(std::ostream& os, const Controller::LogPostfixDummy& p);
+
 } // namespace brpc
 
+// Print contextual logs
+#define LOGD(cntl) LOG(DEBUG) << (cntl)->LogPostfix()
+#define LOGI(cntl) LOG(INFO) << (cntl)->LogPostfix()
+#define LOGW(cntl) LOG(WARNING) << (cntl)->LogPostfix()
+#define LOGE(cntl) LOG(ERROR) << (cntl)->LogPostfix()
+#define LOGF(cntl) LOG(FATAL) << (cntl)->LogPostfix()
 
 #endif  // BRPC_CONTROLLER_H
diff --git a/src/brpc/session_log.h b/src/brpc/kvmap.h
similarity index 85%
rename from src/brpc/session_log.h
rename to src/brpc/kvmap.h
index 049232a..1fd7041 100644
--- a/src/brpc/session_log.h
+++ b/src/brpc/kvmap.h
@@ -15,28 +15,23 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef BRPC_SESSION_LOG_H
-#define BRPC_SESSION_LOG_H
+#ifndef BRPC_KVMAP_H
+#define BRPC_KVMAP_H
 
 #include "butil/containers/flat_map.h"
 
 namespace brpc {
     
-class SessionLog {
+// Remember Key/Values in string
+class KVMap {
 public:
-    class Formatter {
-    public:
-        virtual ~Formatter() {}
-        virtual void Print(std::ostream&, const SessionLog&) = 0;
-    };
-
     typedef butil::FlatMap<std::string, std::string> Map;
     typedef Map::const_iterator Iterator;
 
-    SessionLog() {}
+    KVMap() {}
 
-    // Exchange internal fields with another SessionLog.
-    void Swap(SessionLog &rhs) { _entries.swap(rhs._entries); }
+    // Exchange internal fields with another KVMap.
+    void Swap(KVMap &rhs) { _entries.swap(rhs._entries); }
 
     // Reset internal fields as if they're just default-constructed.
     void Clear() { _entries.clear(); }
@@ -77,4 +72,4 @@ private:
 
 } // namespace brpc
 
-#endif // BRPC_SESSION_LOG_H
+#endif // BRPC_KVMAP_H
diff --git a/src/butil/logging.cc b/src/butil/logging.cc
index 39b2a37..26e6259 100644
--- a/src/butil/logging.cc
+++ b/src/butil/logging.cc
@@ -127,7 +127,7 @@ DEFINE_string(vmodule, "", "per-module verbose level."
               " (that is, name ignoring .cpp/.h)."
               " LOG_LEVEL overrides any value given by --v.");
 
-DEFINE_bool(log_process_id, false, "Log process id");
+DEFINE_bool(log_pid, false, "Log process id");
 
 DEFINE_int32(minloglevel, 0, "Any log at or above this level will be "
              "displayed. Anything below this level will be silently ignored. "
@@ -139,6 +139,8 @@ DEFINE_bool(log_hostname, false, "Add host after pid in each log so"
 
 DEFINE_bool(log_year, false, "Log year in datetime part in each log");
 
+DEFINE_bool(log_as_json, false, "Print log as a valid JSON");
+
 namespace {
 
 LoggingDestination logging_destination = LOG_DEFAULT;
@@ -453,7 +455,7 @@ void SetLogAssertHandler(LogAssertHandler handler) {
 const char* const log_severity_names[LOG_NUM_SEVERITIES] = {
     "INFO", "NOTICE", "WARNING", "ERROR", "FATAL" };
 
-inline void log_severity_name(std::ostream& os, int severity) {
+static void PrintLogSeverity(std::ostream& os, int severity) {
     if (severity < 0) {
         // Add extra space to separate from following datetime.
         os << 'V' << -severity << ' ';
@@ -464,9 +466,9 @@ inline void log_severity_name(std::ostream& os, int severity) {
     }
 }
 
-void print_log_prefix(std::ostream& os,
-                      int severity, const char* file, int line) {
-    log_severity_name(os, severity);
+static void PrintLogPrefix(
+    std::ostream& os, int severity, const char* file, int line) {
+    PrintLogSeverity(os, severity);
 #if defined(OS_LINUX)
     timeval tv;
     gettimeofday(&tv, NULL);
@@ -492,7 +494,7 @@ void print_log_prefix(std::ostream& os,
 #if defined(OS_LINUX)
     os << '.' << std::setw(6) << tv.tv_usec;
 #endif
-    if (FLAGS_log_process_id) {
+    if (FLAGS_log_pid) {
         os << ' ' << std::setfill(' ') << std::setw(5) << CurrentProcessId();
     }
     os << ' ' << std::setfill(' ') << std::setw(5)
@@ -508,6 +510,62 @@ void print_log_prefix(std::ostream& os,
     os.fill(prev_fill);
 }
 
+static void PrintLogPrefixAsJSON(
+    std::ostream& os, int severity, const char* file, int line) {
+    // severity
+    os << "\"L\":\"";
+    if (severity < 0) {
+        os << 'V' << -severity;
+    } else if (severity < LOG_NUM_SEVERITIES) {
+        os << log_severity_names[severity][0];
+    } else {
+        os << 'U';
+    }
+    // time
+    os << "\",\"T\":\"";
+#if defined(OS_LINUX)
+    timeval tv;
+    gettimeofday(&tv, NULL);
+    time_t t = tv.tv_sec;
+#else
+    time_t t = time(NULL);
+#endif
+    struct tm local_tm = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, NULL};
+#if _MSC_VER >= 1400
+    localtime_s(&local_tm, &t);
+#else
+    localtime_r(&t, &local_tm);
+#endif
+    const char prev_fill = os.fill('0');
+    if (FLAGS_log_year) {
+        os << std::setw(4) << local_tm.tm_year + 1900;
+    }
+    os << std::setw(2) << local_tm.tm_mon + 1
+       << std::setw(2) << local_tm.tm_mday << ' '
+       << std::setw(2) << local_tm.tm_hour << ':'
+       << std::setw(2) << local_tm.tm_min << ':'
+       << std::setw(2) << local_tm.tm_sec;
+#if defined(OS_LINUX)
+    os << '.' << std::setw(6) << tv.tv_usec;
+#endif
+    os << "\",";
+    os.fill(prev_fill);
+
+    if (FLAGS_log_pid) {
+        os << "\"pid\":\"" << CurrentProcessId() << "\",";
+    }
+    os << "\"tid\":\"" << butil::PlatformThread::CurrentId() << "\",";
+    if (FLAGS_log_hostname) {
+        butil::StringPiece hostname(butil::my_hostname());
+        if (hostname.ends_with(".baidu.com")) { // make it shorter
+            hostname.remove_suffix(10);
+        }
+        os << "\"host\":\"" << hostname << "\",";
+    }
+    os << "\"C\":\"" << file << ':' << line << "\"";
+}
+
+
 // A log message handler that gets notified of every log message we process.
 class DoublyBufferedLogSink : public butil::DoublyBufferedData<LogSink*> {
 public:
@@ -612,13 +670,32 @@ void DisplayDebugMessageInDialog(const std::string& str) {
 bool StringSink::OnLogMessage(int severity, const char* file, int line, 
                               const butil::StringPiece& content) {
     std::ostringstream prefix_os;
-    print_log_prefix(prefix_os, severity, file, line);
+    bool pair_quote = false;
+    if (FLAGS_log_as_json) {
+        prefix_os << '{';
+        PrintLogPrefixAsJSON(prefix_os, severity, file, line);
+        if (content.empty() || content[0] != '"') {
+            // not a json, add 'M' field
+            prefix_os << ",\"M\":\"";
+            pair_quote = true;
+        } else {
+            prefix_os << ',';
+        }
+    } else {
+        PrintLogPrefix(prefix_os, severity, file, line);
+    }
     const std::string prefix = prefix_os.str();
     {
         butil::AutoLock lock_guard(_lock);
         reserve(size() + prefix.size() + content.size());
         append(prefix);
         append(content.data(), content.size());
+        if (FLAGS_log_as_json) {
+            if (pair_quote) {
+                push_back('"');
+            }
+            push_back('}');
+        }
     }
     return true;
 }
@@ -772,9 +849,27 @@ public:
         // A LogSink focused on performance should also be able to handle
         // non-continuous inputs which is a must to maximize performance.
         std::ostringstream os;
-        print_log_prefix(os, severity, file, line);
-        os.write(content.data(), content.size());
-        os << '\n';
+        if (!FLAGS_log_as_json) {
+            PrintLogPrefix(os, severity, file, line);
+            os.write(content.data(), content.size());
+            os << '\n';
+        } else {
+            os << '{';
+            PrintLogPrefixAsJSON(os, severity, file, line);
+            bool pair_quote = false;
+            if (content.empty() || content[0] != '"') {
+                // not a json, add a 'M' field
+                os << ",\"M\":\"";
+                pair_quote = true;
+            } else {
+                os << ',';
+            }
+            os.write(content.data(), content.size());
+            if (pair_quote) {
+                os << '"';
+            }
+            os << "}\n";
+        }
         std::string log = os.str();
         
         if ((logging_destination & LOG_TO_SYSTEM_DEBUG_LOG) != 0) {
diff --git a/src/butil/logging.h b/src/butil/logging.h
index f92cd78..20754b0 100644
--- a/src/butil/logging.h
+++ b/src/butil/logging.h
@@ -409,6 +409,7 @@ const LogSeverity BLOG_0 = BLOG_ERROR;
 #define VLOG_IS_ON(verbose_level) BAIDU_VLOG_IS_ON(verbose_level, __FILE__)
 
 DECLARE_int32(v);
+DECLARE_bool(log_as_json);
 
 extern const int VLOG_UNINITIALIZED;
 
diff --git a/test/brpc_controller_unittest.cpp b/test/brpc_controller_unittest.cpp
index ea628ef..bf79b97 100644
--- a/test/brpc_controller_unittest.cpp
+++ b/test/brpc_controller_unittest.cpp
@@ -21,6 +21,7 @@
 
 #include <gtest/gtest.h>
 #include <google/protobuf/stubs/common.h>
+#include "butil/logging.h"
 #include "butil/time.h"
 #include "butil/macros.h"
 #include "brpc/socket.h"
@@ -72,3 +73,57 @@ TEST_F(ControllerTest, notify_on_destruction) {
     delete cntl;
     ASSERT_TRUE(cancel);
 }
+
+/*
+class MyFormatter : public brpc::SessionLog::Formatter {
+    void Print(std::ostream& os, const brpc::SessionLog& log) override {
+        for (auto it = log.Begin(); it != log.End(); ++it) {
+            os << '"' << it->first << "\":\"" << it->second << "\",";
+        }
+    }
+};
+*/
+
+static bool endsWith(const std::string& s1, const butil::StringPiece& s2)  {
+    if (s1.size() < s2.size()) {
+        return false;
+    }
+    return memcmp(s1.data() + s1.size() - s2.size(), s2.data(), s2.size()) == 0;
+}
+static bool startsWith(const std::string& s1, const butil::StringPiece& s2)  {
+    if (s1.size() < s2.size()) {
+        return false;
+    }
+    return memcmp(s1.data(), s2.data(), s2.size()) == 0;
+}
+
+TEST_F(ControllerTest, SessionKV) {
+    logging::FLAGS_log_as_json = false;
+    logging::StringSink sink1;
+    auto oldSink = logging::SetLogSink(&sink1);
+    //brpc::SetGlobalSessionLogFormatter(new MyFormatter);
+    {
+        brpc::Controller cntl;
+        cntl.set_log_id(123); // not working now
+        cntl.SessionKV().Set("Apple", 1);    
+        cntl.SessionKV().Set("Baidu", "22");
+        cntl.SessionKV().Set("Cisco", 33.3);
+
+        LOGW(&cntl) << "My WARNING Log";
+        ASSERT_TRUE(endsWith(sink1, "] My WARNING Log")) << sink1;
+        ASSERT_TRUE(startsWith(sink1, "W")) << sink1;
+        sink1.clear();
+
+        cntl.http_request().SetHeader("x-request-id", "abcdEFG-456");
+        LOGE(&cntl) << "My ERROR Log";
+        ASSERT_TRUE(endsWith(sink1, "] My ERROR Log @rid:abcdEFG-456")) << sink1;
+        ASSERT_TRUE(startsWith(sink1, "E")) << sink1;
+        sink1.clear();
+
+        logging::FLAGS_log_as_json = true;
+    }
+    ASSERT_TRUE(endsWith(sink1, R"(,"M":"Session ends","@rid":"abcdEFG-456","Baidu":"22","Cisco":"33.300000","Apple":"1"})")) << sink1;
+    ASSERT_TRUE(startsWith(sink1, R"({"L":"I",)")) << sink1;
+
+    logging::SetLogSink(oldSink);
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org