You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2016/12/05 17:46:47 UTC

[3/5] kudu git commit: KUDU-695. Avoid glog contention by deferring log writes to dedicated threads

KUDU-695. Avoid glog contention by deferring log writes to dedicated threads

This patch changes the logging init sequence to install a wrapper around
the built-in glog Loggers. The wrapper starts a new thread which does
double-buffering: log entries are buffered in a vector, and the thread
is woken up. The thread swaps in a clean buffer to avoid delaying
application threads while it flushes the original buffer to disk.

It's hard to test the end-to-end integration in a unit test, since the
unit tests disable logging to files, and this path only affects
file-based logging. However, I ran an earlier version of this patch in a
stress test environment and it seemed to reduce the frequency with which
I saw threads blocked on glog.

The patch does, however, have a test which exercises the new code paths,
including the blocking path. I looped the new test 500 times in TSAN
mode with success.

The new feature is enabled by default, but I left a hidden flag to
disable it in case we have any issues.

Change-Id: Ie22a0a29fa00a988a53a15d2c726ce5d49018f4d
Reviewed-on: http://gerrit.cloudera.org:8080/5321
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/85b86d12
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/85b86d12
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/85b86d12

Branch: refs/heads/master
Commit: 85b86d12c7ee1238a23f7745bb477b66b000a236
Parents: 916c21b
Author: Todd Lipcon <to...@apache.org>
Authored: Thu Dec 1 22:12:13 2016 -0800
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Mon Dec 5 17:34:10 2016 +0000

----------------------------------------------------------------------
 src/kudu/util/CMakeLists.txt             |   1 +
 src/kudu/util/async_logger.cc            | 149 +++++++++++++++++++
 src/kudu/util/async_logger.h             | 202 ++++++++++++++++++++++++++
 src/kudu/util/debug/leakcheck_disabler.h |   1 -
 src/kudu/util/logging-test.cc            |  81 ++++++++++-
 src/kudu/util/logging.cc                 |  29 ++++
 6 files changed, 461 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/85b86d12/src/kudu/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt
index 0977461..a89cc0b 100644
--- a/src/kudu/util/CMakeLists.txt
+++ b/src/kudu/util/CMakeLists.txt
@@ -108,6 +108,7 @@ if (${OPENSSL_VERSION} VERSION_LESS "1.0.2")
 endif()
 
 set(UTIL_SRCS
+  async_logger.cc
   atomic.cc
   bitmap.cc
   bloom_filter.cc

http://git-wip-us.apache.org/repos/asf/kudu/blob/85b86d12/src/kudu/util/async_logger.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/async_logger.cc b/src/kudu/util/async_logger.cc
new file mode 100644
index 0000000..b6456de
--- /dev/null
+++ b/src/kudu/util/async_logger.cc
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/async_logger.h"
+
+#include <algorithm>
+#include <string>
+#include <thread>
+
+#include "kudu/util/locks.h"
+
+using std::string;
+
+namespace kudu {
+
+AsyncLogger::AsyncLogger(google::base::Logger* wrapped,
+                         int max_buffer_bytes) :
+    max_buffer_bytes_(max_buffer_bytes),
+    wrapped_(DCHECK_NOTNULL(wrapped)),
+    wake_flusher_cond_(&lock_),
+    free_buffer_cond_(&lock_),
+    flush_complete_cond_(&lock_),
+    active_buf_(new Buffer()),
+    flushing_buf_(new Buffer()) {
+  DCHECK_GT(max_buffer_bytes_, 0);
+}
+
+AsyncLogger::~AsyncLogger() {}
+
+void AsyncLogger::Start() {
+  CHECK_EQ(state_, INITTED);
+  state_ = RUNNING;
+  thread_ = std::thread(&AsyncLogger::RunThread, this);
+}
+
+void AsyncLogger::Stop() {
+  {
+    MutexLock l(lock_);
+    CHECK_EQ(state_, RUNNING);
+    state_ = STOPPED;
+    wake_flusher_cond_.Signal();
+  }
+  thread_.join();
+  CHECK(active_buf_->messages.empty());
+  CHECK(flushing_buf_->messages.empty());
+}
+
+void AsyncLogger::Write(bool force_flush,
+                        time_t timestamp,
+                        const char* message,
+                        int message_len) {
+  {
+    MutexLock l(lock_);
+    DCHECK_EQ(state_, RUNNING);
+    while (BufferFull(*active_buf_)) {
+      app_threads_blocked_count_for_tests_++;
+      free_buffer_cond_.Wait();
+    }
+    active_buf_->add(Msg(timestamp, string(message, message_len)),
+                     force_flush);
+    wake_flusher_cond_.Signal();
+  }
+
+  // In most cases, we take the 'force_flush' argument to mean that we'll let the logger
+  // thread do the flushing for us, but not block the application. However, for the
+  // special case of a FATAL log message, we really want to make sure that our message
+  // hits the log before we continue, or else it's likely that the application will exit
+  // while it's still in our buffer.
+  //
+  // NOTE: even if the application doesn't wrap the FATAL-level logger, log messages at
+  // FATAL are also written to all other log files with lower levels. So, a FATAL message
+  // will force a synchronous flush of all lower-level logs before exiting.
+  //
+  // Unfortunately, the underlying log level isn't passed through to this interface, so we
+  // have to use this hack: messages from FATAL errors start with the character 'F'.
+  if (message_len > 0 && message[0] == 'F') {
+    Flush();
+  }
+}
+
+void AsyncLogger::Flush() {
+  MutexLock l(lock_);
+  DCHECK_EQ(state_, RUNNING);
+
+  // Wake up the writer thread at least twice.
+  // This ensures that it has completely flushed both buffers.
+  uint64_t orig_flush_count = flush_count_;
+  while (flush_count_ < orig_flush_count + 2 &&
+         state_ == RUNNING) {
+    active_buf_->flush = true;
+    wake_flusher_cond_.Signal();
+    flush_complete_cond_.Wait();
+  }
+}
+
+uint32 AsyncLogger::LogSize() {
+  return wrapped_->LogSize();
+}
+
+void AsyncLogger::RunThread() {
+  MutexLock l(lock_);
+  while (state_ == RUNNING || active_buf_->needs_flush_or_write()) {
+    while (!active_buf_->needs_flush_or_write() && state_ == RUNNING) {
+      wake_flusher_cond_.Wait();
+    }
+
+    active_buf_.swap(flushing_buf_);
+    // If the buffer that we are about to flush was full, then
+    // we may have other threads which were blocked that we now
+    // need to wake up.
+    if (BufferFull(*flushing_buf_)) {
+      free_buffer_cond_.Broadcast();
+    }
+    l.Unlock();
+
+    for (const auto& msg : flushing_buf_->messages) {
+      wrapped_->Write(false, msg.ts, msg.message.data(), msg.message.size());
+    }
+    if (flushing_buf_->flush) {
+      wrapped_->Flush();
+    }
+    flushing_buf_->clear();
+
+    l.Lock();
+    flush_count_++;
+    flush_complete_cond_.Broadcast();
+  }
+}
+
+bool AsyncLogger::BufferFull(const Buffer& buf) const {
+  // We evenly divide our total buffer space between the two buffers.
+  return buf.size > (max_buffer_bytes_ / 2);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/85b86d12/src/kudu/util/async_logger.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/async_logger.h b/src/kudu/util/async_logger.h
new file mode 100644
index 0000000..2804a9b
--- /dev/null
+++ b/src/kudu/util/async_logger.h
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "kudu/gutil/macros.h"
+
+#include <memory>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include "kudu/util/locks.h"
+
+namespace kudu {
+
+// Wrapper for a glog Logger which asynchronously writes log messages.
+// This class starts a new thread responsible for forwarding the messages
+// to the logger, and performs double buffering. Writers append to the
+// current buffer and then wake up the logger thread. The logger swaps in
+// a new buffer and writes any accumulated messages to the wrapped
+// Logger.
+//
+// This double-buffering behavior dramatically improves performance, especially
+// for logging messages which require flushing the underlying file (i.e WARNING
+// and above for default). The flush can take a couple of milliseconds, and in
+// some cases can even block for hundreds of milliseconds or more. With the
+// double-buffered approach, threads can proceed with useful work while the IO
+// thread blocks.
+//
+// The semantics provided by this wrapper are slightly weaker than the default
+// glog semantics. By default, glog will immediately (synchronously) flush WARNING
+// and above to the underlying file, whereas here we are deferring that flush to
+// the separate thread. This means that a crash just after a 'LOG(WARNING)' would
+// may be missing the message in the logs, but the perf benefit is probably
+// worth it. We do take care that a glog FATAL message flushes all buffered log
+// messages before exiting.
+//
+// NOTE: the logger limits the total amount of buffer space, so if the underlying
+// log blocks for too long, eventually the threads generating the log messages
+// will block as well. This prevents runaway memory usage.
+class AsyncLogger : public google::base::Logger {
+ public:
+  AsyncLogger(google::base::Logger* wrapped,
+              int max_buffer_bytes);
+  ~AsyncLogger();
+
+  void Start();
+
+  // Stop the thread. Flush() and Write() must not be called after this.
+  //
+  // NOTE: this is currently only used in tests: in real life, we enable async
+  // logging once when the program starts and then never disable it.
+  //
+  // REQUIRES: Start() must have been called.
+  void Stop();
+
+  // Write a message to the log.
+  //
+  // 'force_flush' is set by the GLog library based on the configured '--logbuflevel'
+  // flag. Any messages logged at the configured level or higher result in 'force_flush'
+  // being set to true, indicating that the message should be immediately written to the
+  // log rather than buffered in memory. See the class-level docs above for more detail
+  // about the implementation provided here.
+  //
+  // REQUIRES: Start() must have been called.
+  void Write(bool force_flush,
+             time_t timestamp,
+             const char* message,
+             int message_len) override;
+
+  // Flush any buffered messages.
+  void Flush() override;
+
+  // Get the current LOG file size.
+  // The returned value is approximate since some
+  // logged data may not have been flushed to disk yet.
+  uint32 LogSize() override;
+
+  // Return a count of how many times an application thread was
+  // blocked due to the buffers being full and the writer thread
+  // not keeping up.
+  int app_threads_blocked_count_for_tests() const {
+    MutexLock l(lock_);
+    return app_threads_blocked_count_for_tests_;
+  }
+
+ private:
+  // A buffered message.
+  //
+  // TODO(todd): using std::string for buffered messages is convenient but not
+  // as efficient as it could be. Better would be to make the buffers just be
+  // Arenas and allocate both the message data and Msg struct from them, forming
+  // a linked list.
+  struct Msg {
+    time_t ts;
+    std::string message;
+
+    Msg(time_t ts, std::string message)
+        : ts(ts),
+          message(std::move(message)) {
+    }
+  };
+
+  // A buffer of messages waiting to be flushed.
+  struct Buffer {
+    std::vector<Msg> messages;
+
+    // Estimate of the size of 'messages'.
+    int size = 0;
+
+    // Whether this buffer needs an explicit flush of the
+    // underlying logger.
+    bool flush = false;
+
+    Buffer() {}
+
+    void clear() {
+      messages.clear();
+      size = 0;
+      flush = false;
+    }
+
+    void add(Msg msg, bool flush) {
+      size += sizeof(msg) + msg.message.size();
+      messages.emplace_back(std::move(msg));
+      this->flush |= flush;
+    }
+
+    bool needs_flush_or_write() const {
+      return flush || !messages.empty();
+    }
+
+   private:
+    DISALLOW_COPY_AND_ASSIGN(Buffer);
+  };
+
+  bool BufferFull(const Buffer& buf) const;
+  void RunThread();
+
+  // The maximum number of bytes used by the entire class.
+  const int max_buffer_bytes_;
+  google::base::Logger* const wrapped_;
+  std::thread thread_;
+
+  // Count of how many times an application thread was blocked due to
+  // a full buffer.
+  int app_threads_blocked_count_for_tests_ = 0;
+
+  // Count of how many times the writer thread has flushed the buffers.
+  // 64 bits should be enough to never worry about overflow.
+  uint64_t flush_count_ = 0;
+
+  // Protects buffers as well as 'state_'.
+  mutable Mutex lock_;
+
+  // Signaled by app threads to wake up the flusher, either for new
+  // data or because 'state_' changed.
+  ConditionVariable wake_flusher_cond_;
+
+  // Signaled by the flusher thread when the flusher has swapped in
+  // a free buffer to write to.
+  ConditionVariable free_buffer_cond_;
+
+  // Signaled by the flusher thread when it has completed flushing
+  // the current buffer.
+  ConditionVariable flush_complete_cond_;
+
+  // The buffer to which application threads append new log messages.
+  std::unique_ptr<Buffer> active_buf_;
+
+  // The buffer currently being flushed by the logger thread, cleared
+  // after a successful flush.
+  std::unique_ptr<Buffer> flushing_buf_;
+
+  // Trigger for the logger thread to stop.
+  enum State {
+    INITTED,
+    RUNNING,
+    STOPPED
+  };
+  State state_ = INITTED;
+
+  DISALLOW_COPY_AND_ASSIGN(AsyncLogger);
+};
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/85b86d12/src/kudu/util/debug/leakcheck_disabler.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/debug/leakcheck_disabler.h b/src/kudu/util/debug/leakcheck_disabler.h
index bab724f..815f818 100644
--- a/src/kudu/util/debug/leakcheck_disabler.h
+++ b/src/kudu/util/debug/leakcheck_disabler.h
@@ -17,7 +17,6 @@
 #ifndef KUDU_UTIL_DEBUG_LEAKCHECK_DISABLER_H_
 #define KUDU_UTIL_DEBUG_LEAKCHECK_DISABLER_H_
 
-#include <gperftools/heap-checker.h>
 #include "kudu/gutil/macros.h"
 #include "kudu/util/debug/leak_annotations.h"
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/85b86d12/src/kudu/util/logging-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/logging-test.cc b/src/kudu/util/logging-test.cc
index 85ad3cd..08fc27d 100644
--- a/src/kudu/util/logging-test.cc
+++ b/src/kudu/util/logging-test.cc
@@ -18,11 +18,17 @@
 #include <glog/logging.h>
 #include <gmock/gmock.h>
 #include <string>
+#include <thread>
 #include <vector>
 
-#include "kudu/util/logging_test_util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/async_logger.h"
+#include "kudu/util/barrier.h"
+#include "kudu/util/locks.h"
 #include "kudu/util/logging.h"
+#include "kudu/util/logging_test_util.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/stopwatch.h"
 
 using std::string;
 using std::vector;
@@ -81,4 +87,77 @@ TEST(LoggingTest, TestAdvancedThrottling) {
   EXPECT_THAT(msgs[2], testing::ContainsRegex("test b$"));
 }
 
+// Test Logger implementation that just counts the number of messages
+// and flushes.
+//
+// This is purposefully thread-unsafe because we expect that the
+// AsyncLogger is only accessing the underlying logger from a single
+// thhread.
+class CountingLogger : public google::base::Logger {
+ public:
+  void Write(bool force_flush,
+             time_t /*timestamp*/,
+             const char* /*message*/,
+             int /*message_len*/) override {
+    message_count_++;
+    if (force_flush) {
+      Flush();
+    }
+  }
+
+  void Flush() override {
+    // Simulate a slow disk.
+    SleepFor(MonoDelta::FromMilliseconds(5));
+    flush_count_++;
+  }
+
+  uint32 LogSize() override {
+    return 0;
+  }
+
+  int flush_count_ = 0;
+  int message_count_ = 0;
+};
+
+TEST(LoggingTest, TestAsyncLogger) {
+  const int kNumThreads = 4;
+  const int kNumMessages = 10000;
+  const int kBuffer = 10000;
+  CountingLogger base;
+  AsyncLogger async(&base, kBuffer);
+  async.Start();
+
+  vector<std::thread> threads;
+  Barrier go_barrier(kNumThreads + 1);
+  // Start some threads writing log messages.
+  for (int i = 0; i < kNumThreads; i++) {
+    threads.emplace_back([&]() {
+        go_barrier.Wait();
+        for (int m = 0; m < kNumMessages; m++) {
+          async.Write(true, m, "x", 1);
+        }
+      });
+  }
+
+  // And a thread calling Flush().
+  threads.emplace_back([&]() {
+      go_barrier.Wait();
+      for (int i = 0; i < 10; i++) {
+        async.Flush();
+        SleepFor(MonoDelta::FromMilliseconds(3));
+      }
+    });
+
+  for (auto& t : threads) {
+    t.join();
+  }
+  async.Stop();
+  ASSERT_EQ(base.message_count_, kNumMessages * kNumThreads);
+  // The async logger should only flush once per "batch" rather than
+  // once per message, even though we wrote every message with
+  // 'flush' set to true.
+  ASSERT_LT(base.flush_count_, kNumMessages * kNumThreads);
+  ASSERT_GT(async.app_threads_blocked_count_for_tests(), 0);
+}
+
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/85b86d12/src/kudu/util/logging.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/logging.cc b/src/kudu/util/logging.cc
index 1032f97..d89988c 100644
--- a/src/kudu/util/logging.cc
+++ b/src/kudu/util/logging.cc
@@ -28,7 +28,9 @@
 
 #include "kudu/gutil/callback.h"
 #include "kudu/gutil/spinlock.h"
+#include "kudu/util/async_logger.h"
 #include "kudu/util/debug-util.h"
+#include "kudu/util/debug/leakcheck_disabler.h"
 #include "kudu/util/flag_tags.h"
 
 DEFINE_string(log_filename, "",
@@ -36,6 +38,16 @@ DEFINE_string(log_filename, "",
     "full path is <log_dir>/<log_filename>.[INFO|WARN|ERROR|FATAL]");
 TAG_FLAG(log_filename, stable);
 
+DEFINE_bool(log_async, true,
+            "Enable asynchronous writing to log files. This improves "
+            "latency and stability.");
+TAG_FLAG(log_async, hidden);
+
+DEFINE_int32(log_async_buffer_bytes_per_level, 2 * 1024 * 1024,
+             "The number of bytes of buffer space used by each log "
+             "level. Only relevant when --log_async is enabled.");
+TAG_FLAG(log_async_buffer_bytes_per_level, hidden);
+
 #define PROJ_NAME "kudu"
 
 bool logging_initialized = false;
@@ -100,6 +112,18 @@ SimpleSink* registered_sink = nullptr;
 // Protected by 'logging_mutex'.
 int initial_stderr_severity;
 
+void EnableAsyncLogging() {
+  debug::ScopedLeakCheckDisabler leaky;
+
+  // Enable Async for every level except for FATAL. Fatal should be synchronous
+  // to ensure that we get the fatal log message written before exiting.
+  for (auto level : { google::INFO, google::WARNING, google::ERROR }) {
+    auto* orig = google::base::GetLogger(level);
+    auto* async = new AsyncLogger(orig, FLAGS_log_async_buffer_bytes_per_level);
+    async->Start();
+    google::base::SetLogger(level, async);
+  }
+}
 
 void UnregisterLoggingCallbackUnlocked() {
   CHECK(logging_mutex.IsHeld());
@@ -219,6 +243,11 @@ void InitGoogleLoggingSafe(const char* arg) {
   // Stderr logging threshold: FLAGS_stderrthreshold.
   // Sink logging: off.
   initial_stderr_severity = FLAGS_stderrthreshold;
+
+  if (FLAGS_log_async) {
+    EnableAsyncLogging();
+  }
+
   logging_initialized = true;
 }