You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2016/12/05 17:46:47 UTC
[3/5] kudu git commit: KUDU-695. Avoid glog contention by deferring
log writes to dedicated threads
KUDU-695. Avoid glog contention by deferring log writes to dedicated threads
This patch changes the logging init sequence to install a wrapper around
the built-in glog Loggers. The wrapper starts a new thread which does
double-buffering: log entries are buffered in a vector, and the thread
is woken up. The thread swaps in a clean buffer to avoid delaying
application threads while it flushes the original buffer to disk.
It's hard to test the end-to-end integration in a unit test, since the
unit tests disable logging to files, and this path only affects
file-based logging. However, I ran an earlier version of this patch in a
stress test environment and it seemed to reduce the frequency with which
I saw threads blocked on glog.
The patch does, however, have a test which exercises the new code paths,
including the blocking path. I looped the new test 500 times in TSAN
mode with success.
The new feature is enabled by default, but I left a hidden flag to
disable it in case we have any issues.
Change-Id: Ie22a0a29fa00a988a53a15d2c726ce5d49018f4d
Reviewed-on: http://gerrit.cloudera.org:8080/5321
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/85b86d12
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/85b86d12
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/85b86d12
Branch: refs/heads/master
Commit: 85b86d12c7ee1238a23f7745bb477b66b000a236
Parents: 916c21b
Author: Todd Lipcon <to...@apache.org>
Authored: Thu Dec 1 22:12:13 2016 -0800
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Mon Dec 5 17:34:10 2016 +0000
----------------------------------------------------------------------
src/kudu/util/CMakeLists.txt | 1 +
src/kudu/util/async_logger.cc | 149 +++++++++++++++++++
src/kudu/util/async_logger.h | 202 ++++++++++++++++++++++++++
src/kudu/util/debug/leakcheck_disabler.h | 1 -
src/kudu/util/logging-test.cc | 81 ++++++++++-
src/kudu/util/logging.cc | 29 ++++
6 files changed, 461 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/85b86d12/src/kudu/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt
index 0977461..a89cc0b 100644
--- a/src/kudu/util/CMakeLists.txt
+++ b/src/kudu/util/CMakeLists.txt
@@ -108,6 +108,7 @@ if (${OPENSSL_VERSION} VERSION_LESS "1.0.2")
endif()
set(UTIL_SRCS
+ async_logger.cc
atomic.cc
bitmap.cc
bloom_filter.cc
http://git-wip-us.apache.org/repos/asf/kudu/blob/85b86d12/src/kudu/util/async_logger.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/async_logger.cc b/src/kudu/util/async_logger.cc
new file mode 100644
index 0000000..b6456de
--- /dev/null
+++ b/src/kudu/util/async_logger.cc
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/async_logger.h"
+
+#include <algorithm>
+#include <string>
+#include <thread>
+
+#include "kudu/util/locks.h"
+
+using std::string;
+
+namespace kudu {
+
+AsyncLogger::AsyncLogger(google::base::Logger* wrapped,
+ int max_buffer_bytes) :
+ max_buffer_bytes_(max_buffer_bytes),
+ wrapped_(DCHECK_NOTNULL(wrapped)),
+ wake_flusher_cond_(&lock_),
+ free_buffer_cond_(&lock_),
+ flush_complete_cond_(&lock_),
+ active_buf_(new Buffer()),
+ flushing_buf_(new Buffer()) {
+ DCHECK_GT(max_buffer_bytes_, 0);
+}
+
+AsyncLogger::~AsyncLogger() {}
+
+void AsyncLogger::Start() {
+ CHECK_EQ(state_, INITTED);
+ state_ = RUNNING;
+ thread_ = std::thread(&AsyncLogger::RunThread, this);
+}
+
+void AsyncLogger::Stop() {
+ {
+ MutexLock l(lock_);
+ CHECK_EQ(state_, RUNNING);
+ state_ = STOPPED;
+ wake_flusher_cond_.Signal();
+ }
+ thread_.join();
+ CHECK(active_buf_->messages.empty());
+ CHECK(flushing_buf_->messages.empty());
+}
+
+void AsyncLogger::Write(bool force_flush,
+ time_t timestamp,
+ const char* message,
+ int message_len) {
+ {
+ MutexLock l(lock_);
+ DCHECK_EQ(state_, RUNNING);
+ while (BufferFull(*active_buf_)) {
+ app_threads_blocked_count_for_tests_++;
+ free_buffer_cond_.Wait();
+ }
+ active_buf_->add(Msg(timestamp, string(message, message_len)),
+ force_flush);
+ wake_flusher_cond_.Signal();
+ }
+
+ // In most cases, we take the 'force_flush' argument to mean that we'll let the logger
+ // thread do the flushing for us, but not block the application. However, for the
+ // special case of a FATAL log message, we really want to make sure that our message
+ // hits the log before we continue, or else it's likely that the application will exit
+ // while it's still in our buffer.
+ //
+ // NOTE: even if the application doesn't wrap the FATAL-level logger, log messages at
+ // FATAL are also written to all other log files with lower levels. So, a FATAL message
+ // will force a synchronous flush of all lower-level logs before exiting.
+ //
+ // Unfortunately, the underlying log level isn't passed through to this interface, so we
+ // have to use this hack: messages from FATAL errors start with the character 'F'.
+ if (message_len > 0 && message[0] == 'F') {
+ Flush();
+ }
+}
+
+void AsyncLogger::Flush() {
+ MutexLock l(lock_);
+ DCHECK_EQ(state_, RUNNING);
+
+ // Wake up the writer thread at least twice.
+ // This ensures that it has completely flushed both buffers.
+ uint64_t orig_flush_count = flush_count_;
+ while (flush_count_ < orig_flush_count + 2 &&
+ state_ == RUNNING) {
+ active_buf_->flush = true;
+ wake_flusher_cond_.Signal();
+ flush_complete_cond_.Wait();
+ }
+}
+
+uint32 AsyncLogger::LogSize() {
+ return wrapped_->LogSize();
+}
+
+void AsyncLogger::RunThread() {
+ MutexLock l(lock_);
+ while (state_ == RUNNING || active_buf_->needs_flush_or_write()) {
+ while (!active_buf_->needs_flush_or_write() && state_ == RUNNING) {
+ wake_flusher_cond_.Wait();
+ }
+
+ active_buf_.swap(flushing_buf_);
+ // If the buffer that we are about to flush was full, then
+ // we may have other threads which were blocked that we now
+ // need to wake up.
+ if (BufferFull(*flushing_buf_)) {
+ free_buffer_cond_.Broadcast();
+ }
+ l.Unlock();
+
+ for (const auto& msg : flushing_buf_->messages) {
+ wrapped_->Write(false, msg.ts, msg.message.data(), msg.message.size());
+ }
+ if (flushing_buf_->flush) {
+ wrapped_->Flush();
+ }
+ flushing_buf_->clear();
+
+ l.Lock();
+ flush_count_++;
+ flush_complete_cond_.Broadcast();
+ }
+}
+
+bool AsyncLogger::BufferFull(const Buffer& buf) const {
+ // We evenly divide our total buffer space between the two buffers.
+ return buf.size > (max_buffer_bytes_ / 2);
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/85b86d12/src/kudu/util/async_logger.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/async_logger.h b/src/kudu/util/async_logger.h
new file mode 100644
index 0000000..2804a9b
--- /dev/null
+++ b/src/kudu/util/async_logger.h
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "kudu/gutil/macros.h"
+
+#include <memory>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include "kudu/util/locks.h"
+
+namespace kudu {
+
+// Wrapper for a glog Logger which asynchronously writes log messages.
+// This class starts a new thread responsible for forwarding the messages
+// to the logger, and performs double buffering. Writers append to the
+// current buffer and then wake up the logger thread. The logger swaps in
+// a new buffer and writes any accumulated messages to the wrapped
+// Logger.
+//
+// This double-buffering behavior dramatically improves performance, especially
+// for logging messages which require flushing the underlying file (i.e WARNING
+// and above for default). The flush can take a couple of milliseconds, and in
+// some cases can even block for hundreds of milliseconds or more. With the
+// double-buffered approach, threads can proceed with useful work while the IO
+// thread blocks.
+//
+// The semantics provided by this wrapper are slightly weaker than the default
+// glog semantics. By default, glog will immediately (synchronously) flush WARNING
+// and above to the underlying file, whereas here we are deferring that flush to
+// the separate thread. This means that a crash just after a 'LOG(WARNING)' would
+// may be missing the message in the logs, but the perf benefit is probably
+// worth it. We do take care that a glog FATAL message flushes all buffered log
+// messages before exiting.
+//
+// NOTE: the logger limits the total amount of buffer space, so if the underlying
+// log blocks for too long, eventually the threads generating the log messages
+// will block as well. This prevents runaway memory usage.
+class AsyncLogger : public google::base::Logger {
+ public:
+ AsyncLogger(google::base::Logger* wrapped,
+ int max_buffer_bytes);
+ ~AsyncLogger();
+
+ void Start();
+
+ // Stop the thread. Flush() and Write() must not be called after this.
+ //
+ // NOTE: this is currently only used in tests: in real life, we enable async
+ // logging once when the program starts and then never disable it.
+ //
+ // REQUIRES: Start() must have been called.
+ void Stop();
+
+ // Write a message to the log.
+ //
+ // 'force_flush' is set by the GLog library based on the configured '--logbuflevel'
+ // flag. Any messages logged at the configured level or higher result in 'force_flush'
+ // being set to true, indicating that the message should be immediately written to the
+ // log rather than buffered in memory. See the class-level docs above for more detail
+ // about the implementation provided here.
+ //
+ // REQUIRES: Start() must have been called.
+ void Write(bool force_flush,
+ time_t timestamp,
+ const char* message,
+ int message_len) override;
+
+ // Flush any buffered messages.
+ void Flush() override;
+
+ // Get the current LOG file size.
+ // The returned value is approximate since some
+ // logged data may not have been flushed to disk yet.
+ uint32 LogSize() override;
+
+ // Return a count of how many times an application thread was
+ // blocked due to the buffers being full and the writer thread
+ // not keeping up.
+ int app_threads_blocked_count_for_tests() const {
+ MutexLock l(lock_);
+ return app_threads_blocked_count_for_tests_;
+ }
+
+ private:
+ // A buffered message.
+ //
+ // TODO(todd): using std::string for buffered messages is convenient but not
+ // as efficient as it could be. Better would be to make the buffers just be
+ // Arenas and allocate both the message data and Msg struct from them, forming
+ // a linked list.
+ struct Msg {
+ time_t ts;
+ std::string message;
+
+ Msg(time_t ts, std::string message)
+ : ts(ts),
+ message(std::move(message)) {
+ }
+ };
+
+ // A buffer of messages waiting to be flushed.
+ struct Buffer {
+ std::vector<Msg> messages;
+
+ // Estimate of the size of 'messages'.
+ int size = 0;
+
+ // Whether this buffer needs an explicit flush of the
+ // underlying logger.
+ bool flush = false;
+
+ Buffer() {}
+
+ void clear() {
+ messages.clear();
+ size = 0;
+ flush = false;
+ }
+
+ void add(Msg msg, bool flush) {
+ size += sizeof(msg) + msg.message.size();
+ messages.emplace_back(std::move(msg));
+ this->flush |= flush;
+ }
+
+ bool needs_flush_or_write() const {
+ return flush || !messages.empty();
+ }
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(Buffer);
+ };
+
+ bool BufferFull(const Buffer& buf) const;
+ void RunThread();
+
+ // The maximum number of bytes used by the entire class.
+ const int max_buffer_bytes_;
+ google::base::Logger* const wrapped_;
+ std::thread thread_;
+
+ // Count of how many times an application thread was blocked due to
+ // a full buffer.
+ int app_threads_blocked_count_for_tests_ = 0;
+
+ // Count of how many times the writer thread has flushed the buffers.
+ // 64 bits should be enough to never worry about overflow.
+ uint64_t flush_count_ = 0;
+
+ // Protects buffers as well as 'state_'.
+ mutable Mutex lock_;
+
+ // Signaled by app threads to wake up the flusher, either for new
+ // data or because 'state_' changed.
+ ConditionVariable wake_flusher_cond_;
+
+ // Signaled by the flusher thread when the flusher has swapped in
+ // a free buffer to write to.
+ ConditionVariable free_buffer_cond_;
+
+ // Signaled by the flusher thread when it has completed flushing
+ // the current buffer.
+ ConditionVariable flush_complete_cond_;
+
+ // The buffer to which application threads append new log messages.
+ std::unique_ptr<Buffer> active_buf_;
+
+ // The buffer currently being flushed by the logger thread, cleared
+ // after a successful flush.
+ std::unique_ptr<Buffer> flushing_buf_;
+
+ // Trigger for the logger thread to stop.
+ enum State {
+ INITTED,
+ RUNNING,
+ STOPPED
+ };
+ State state_ = INITTED;
+
+ DISALLOW_COPY_AND_ASSIGN(AsyncLogger);
+};
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/85b86d12/src/kudu/util/debug/leakcheck_disabler.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/debug/leakcheck_disabler.h b/src/kudu/util/debug/leakcheck_disabler.h
index bab724f..815f818 100644
--- a/src/kudu/util/debug/leakcheck_disabler.h
+++ b/src/kudu/util/debug/leakcheck_disabler.h
@@ -17,7 +17,6 @@
#ifndef KUDU_UTIL_DEBUG_LEAKCHECK_DISABLER_H_
#define KUDU_UTIL_DEBUG_LEAKCHECK_DISABLER_H_
-#include <gperftools/heap-checker.h>
#include "kudu/gutil/macros.h"
#include "kudu/util/debug/leak_annotations.h"
http://git-wip-us.apache.org/repos/asf/kudu/blob/85b86d12/src/kudu/util/logging-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/logging-test.cc b/src/kudu/util/logging-test.cc
index 85ad3cd..08fc27d 100644
--- a/src/kudu/util/logging-test.cc
+++ b/src/kudu/util/logging-test.cc
@@ -18,11 +18,17 @@
#include <glog/logging.h>
#include <gmock/gmock.h>
#include <string>
+#include <thread>
#include <vector>
-#include "kudu/util/logging_test_util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/async_logger.h"
+#include "kudu/util/barrier.h"
+#include "kudu/util/locks.h"
#include "kudu/util/logging.h"
+#include "kudu/util/logging_test_util.h"
#include "kudu/util/monotime.h"
+#include "kudu/util/stopwatch.h"
using std::string;
using std::vector;
@@ -81,4 +87,77 @@ TEST(LoggingTest, TestAdvancedThrottling) {
EXPECT_THAT(msgs[2], testing::ContainsRegex("test b$"));
}
+// Test Logger implementation that just counts the number of messages
+// and flushes.
+//
+// This is purposefully thread-unsafe because we expect that the
+// AsyncLogger is only accessing the underlying logger from a single
+// thhread.
+class CountingLogger : public google::base::Logger {
+ public:
+ void Write(bool force_flush,
+ time_t /*timestamp*/,
+ const char* /*message*/,
+ int /*message_len*/) override {
+ message_count_++;
+ if (force_flush) {
+ Flush();
+ }
+ }
+
+ void Flush() override {
+ // Simulate a slow disk.
+ SleepFor(MonoDelta::FromMilliseconds(5));
+ flush_count_++;
+ }
+
+ uint32 LogSize() override {
+ return 0;
+ }
+
+ int flush_count_ = 0;
+ int message_count_ = 0;
+};
+
+TEST(LoggingTest, TestAsyncLogger) {
+ const int kNumThreads = 4;
+ const int kNumMessages = 10000;
+ const int kBuffer = 10000;
+ CountingLogger base;
+ AsyncLogger async(&base, kBuffer);
+ async.Start();
+
+ vector<std::thread> threads;
+ Barrier go_barrier(kNumThreads + 1);
+ // Start some threads writing log messages.
+ for (int i = 0; i < kNumThreads; i++) {
+ threads.emplace_back([&]() {
+ go_barrier.Wait();
+ for (int m = 0; m < kNumMessages; m++) {
+ async.Write(true, m, "x", 1);
+ }
+ });
+ }
+
+ // And a thread calling Flush().
+ threads.emplace_back([&]() {
+ go_barrier.Wait();
+ for (int i = 0; i < 10; i++) {
+ async.Flush();
+ SleepFor(MonoDelta::FromMilliseconds(3));
+ }
+ });
+
+ for (auto& t : threads) {
+ t.join();
+ }
+ async.Stop();
+ ASSERT_EQ(base.message_count_, kNumMessages * kNumThreads);
+ // The async logger should only flush once per "batch" rather than
+ // once per message, even though we wrote every message with
+ // 'flush' set to true.
+ ASSERT_LT(base.flush_count_, kNumMessages * kNumThreads);
+ ASSERT_GT(async.app_threads_blocked_count_for_tests(), 0);
+}
+
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/85b86d12/src/kudu/util/logging.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/logging.cc b/src/kudu/util/logging.cc
index 1032f97..d89988c 100644
--- a/src/kudu/util/logging.cc
+++ b/src/kudu/util/logging.cc
@@ -28,7 +28,9 @@
#include "kudu/gutil/callback.h"
#include "kudu/gutil/spinlock.h"
+#include "kudu/util/async_logger.h"
#include "kudu/util/debug-util.h"
+#include "kudu/util/debug/leakcheck_disabler.h"
#include "kudu/util/flag_tags.h"
DEFINE_string(log_filename, "",
@@ -36,6 +38,16 @@ DEFINE_string(log_filename, "",
"full path is <log_dir>/<log_filename>.[INFO|WARN|ERROR|FATAL]");
TAG_FLAG(log_filename, stable);
+DEFINE_bool(log_async, true,
+ "Enable asynchronous writing to log files. This improves "
+ "latency and stability.");
+TAG_FLAG(log_async, hidden);
+
+DEFINE_int32(log_async_buffer_bytes_per_level, 2 * 1024 * 1024,
+ "The number of bytes of buffer space used by each log "
+ "level. Only relevant when --log_async is enabled.");
+TAG_FLAG(log_async_buffer_bytes_per_level, hidden);
+
#define PROJ_NAME "kudu"
bool logging_initialized = false;
@@ -100,6 +112,18 @@ SimpleSink* registered_sink = nullptr;
// Protected by 'logging_mutex'.
int initial_stderr_severity;
+void EnableAsyncLogging() {
+ debug::ScopedLeakCheckDisabler leaky;
+
+ // Enable Async for every level except for FATAL. Fatal should be synchronous
+ // to ensure that we get the fatal log message written before exiting.
+ for (auto level : { google::INFO, google::WARNING, google::ERROR }) {
+ auto* orig = google::base::GetLogger(level);
+ auto* async = new AsyncLogger(orig, FLAGS_log_async_buffer_bytes_per_level);
+ async->Start();
+ google::base::SetLogger(level, async);
+ }
+}
void UnregisterLoggingCallbackUnlocked() {
CHECK(logging_mutex.IsHeld());
@@ -219,6 +243,11 @@ void InitGoogleLoggingSafe(const char* arg) {
// Stderr logging threshold: FLAGS_stderrthreshold.
// Sink logging: off.
initial_stderr_severity = FLAGS_stderrthreshold;
+
+ if (FLAGS_log_async) {
+ EnableAsyncLogging();
+ }
+
logging_initialized = true;
}