You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2016/12/05 17:46:45 UTC

[1/5] kudu git commit: exactly_once_rpc-test: fix gc stress test flakiness

Repository: kudu
Updated Branches:
  refs/heads/master 0ac14a564 -> e14b82496


exactly_once_rpc-test: fix gc stress test flakiness

This test involves two threads:

1) the 'stubborn writer' thread retries a request with the same sequence
number over and over. It expects that eventually the cached result will
go stale, and then later that the client will be entirely GCed and thus
the request will start to succeed again.

2) the 'long write' thread, which uses the normal RetriableRpc mechanism
to send requests, each with increasing sequence numbers. We expect that,
since each of these requests is a new one, and isn't retried once it's
successful, we won't see any 'stale' responses.

The test was flaky, however, because the 'stubborn writer' thread was
always sending its own sequence number as the last_incomplete sequence
number, and we also didn't ensure that it started before the 'long
write' thread. Given that, it was possible to have this interleaving:

  1) start the 'long write' thread, which is assigned seq number 1
  2) before the write is sent, the 'stubborn writer' thread assigns
     itself seq number 2, and sends a request indicating last_incomplete=2.
  3) when the 'long write' thread sends its request, it immediately gets
     a 'stale' response, causing a test failure.

One fix would have been to make the 'stubborn writer' thread send the
first_incomplete calculated by the RequestTracker. However, that would
have involved modifying a bunch of other tests to properly update the
RequestTracker.

So instead this test takes the approach of assigning the 'stubborn
writer's sequence number before starting the 'long writer' thread. This
ensures that the 'stubborn writer' won't explicitly GC any request made
by the 'long writer'.

With the patch, I looped this test 500 times and it passed[1]. Without
the patch, it failed 64/500[2].

[1] http://dist-test.cloudera.org//job?job_id=todd.1480926593.3793
[2] http://dist-test.cloudera.org//job?job_id=todd.1480926999.4126

Change-Id: I30a7d06928973964c5285e5e86503e5871ea5995
Reviewed-on: http://gerrit.cloudera.org:8080/5358
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <dr...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/0a898331
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/0a898331
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/0a898331

Branch: refs/heads/master
Commit: 0a898331120aabd37c82a18259dc498f27a6527a
Parents: 0ac14a5
Author: Todd Lipcon <to...@apache.org>
Authored: Mon Dec 5 16:27:40 2016 +0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Mon Dec 5 14:20:14 2016 +0000

----------------------------------------------------------------------
 src/kudu/rpc/exactly_once_rpc-test.cc | 35 ++++++++++++++++++++----------
 1 file changed, 23 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/0a898331/src/kudu/rpc/exactly_once_rpc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/exactly_once_rpc-test.cc b/src/kudu/rpc/exactly_once_rpc-test.cc
index 707d06e..fc1f07f 100644
--- a/src/kudu/rpc/exactly_once_rpc-test.cc
+++ b/src/kudu/rpc/exactly_once_rpc-test.cc
@@ -279,12 +279,11 @@ class ExactlyOnceRpcTest : public RpcTestBase {
   // Stubbornly sends the same request to the server, this should observe three states.
   // The request should be successful at first, then its result should be GCed and the
   // client should be GCed.
-  void StubbornlyWriteTheSameRequestThread(MonoDelta run_for) {
+  void StubbornlyWriteTheSameRequestThread(ResultTracker::SequenceNumber sequence_number,
+                                           MonoDelta run_for) {
     MonoTime run_until = MonoTime::Now();
     run_until.AddDelta(run_for);
     // Make an initial request, so that we get a response to compare to.
-    ResultTracker::SequenceNumber sequence_number;
-    CHECK_OK(request_tracker_->NewSeqNo(&sequence_number));
     ExactlyOnceResponsePB original_response;
     CHECK_OK(MakeAddCall(sequence_number, 0, &original_response));
 
@@ -459,10 +458,12 @@ TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsWithConcurrentUpdaters) {
     vector<unique_ptr<SimultaneousExactlyOnceAdder>> adders;
     for (int j = 0; j < kNumThreads; j++) {
       unique_ptr<SimultaneousExactlyOnceAdder> adder(
-          new SimultaneousExactlyOnceAdder(proxy_.get(), i, 1,
-                                           rand() % 20,
-                                           rand() % 10,
-                                           attempt_nos_.fetch_add(1)));
+          new SimultaneousExactlyOnceAdder(proxy_.get(),
+                                           i, // sequence number
+                                           1, // value
+                                           rand() % 20, // client_sleep
+                                           rand() % 10, // server_sleep
+                                           attempt_nos_.fetch_add(1))); // attempt number
       adders.push_back(std::move(adder));
       adders[j]->Start();
     }
@@ -550,15 +551,25 @@ TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsGarbageCollectionStressTest)
     stubborn_run_for = MonoDelta::FromSeconds(11);
   }
 
-  scoped_refptr<kudu::Thread> write_thread;
-  scoped_refptr<kudu::Thread> stubborn_thread;
   result_tracker_->StartGCThread();
+
+  // Assign the first sequence number (0) to the 'stubborn writes' thread.
+  // This thread will keep making RPCs with this sequence number while
+  // the 'write_thread' will make normal requests with increasing sequence
+  // numbers.
+  ResultTracker::SequenceNumber stubborn_req_seq_num;
+  CHECK_OK(request_tracker_->NewSeqNo(&stubborn_req_seq_num));
+  ASSERT_EQ(stubborn_req_seq_num, 0);
+
+  scoped_refptr<kudu::Thread> stubborn_thread;
+  CHECK_OK(kudu::Thread::Create(
+      "stubborn", "stubborn", &ExactlyOnceRpcTest::StubbornlyWriteTheSameRequestThread,
+      this, stubborn_req_seq_num, stubborn_run_for, &stubborn_thread));
+
+  scoped_refptr<kudu::Thread> write_thread;
   CHECK_OK(kudu::Thread::Create(
       "write", "write", &ExactlyOnceRpcTest::DoLongWritesThread,
       this, writes_run_for, &write_thread));
-  CHECK_OK(kudu::Thread::Create(
-      "stubborn", "stubborn", &ExactlyOnceRpcTest::StubbornlyWriteTheSameRequestThread,
-      this, stubborn_run_for, &stubborn_thread));
 
   write_thread->Join();
   stubborn_thread->Join();


[2/5] kudu git commit: rpc-test: fix flakiness in tests that expect timeouts

Posted by mp...@apache.org.
rpc-test: fix flakiness in tests that expect timeouts

The DoTestExpectTimeout() utility function would set a timeout to some
number of milliseconds N, and then ask the server to sleep for (N + 50)
milliseconds, expecting a timeout. It would then assert that the timeout
was returned after some amount of time between N and N+50, but no more
than N+50ms.

This would be flaky under concurrent load (eg stress threads) because
the sleep(50ms) might sometimes actually sleep for an extra 50-100ms.

This just changes the test to ask the server to sleep for n+500ms,
giving it a lot more budget for sloppiness.

I looped TestCallTimeout/0 with 4 stress threads 1000 times in TSAN.
Before[1] it failed 4/1000. After[2] it didn't fail.

[1] http://dist-test.cloudera.org//job?job_id=todd.1480912345.2054
[2] http://dist-test.cloudera.org//job?job_id=todd.1480912461.12007

Change-Id: Ifff555634968bc92f453b25af4d5c15da21edf7c
Reviewed-on: http://gerrit.cloudera.org:8080/5356
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/916c21be
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/916c21be
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/916c21be

Branch: refs/heads/master
Commit: 916c21be1846effb5325614f278eb4ace23f7f37
Parents: 0a89833
Author: Todd Lipcon <to...@apache.org>
Authored: Mon Dec 5 12:34:18 2016 +0800
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Mon Dec 5 17:32:38 2016 +0000

----------------------------------------------------------------------
 src/kudu/rpc/rpc-test-base.h | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/916c21be/src/kudu/rpc/rpc-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h
index 133cc14..c6c87a8 100644
--- a/src/kudu/rpc/rpc-test-base.h
+++ b/src/kudu/rpc/rpc-test-base.h
@@ -483,8 +483,8 @@ class RpcTestBase : public KuduTest {
   void DoTestExpectTimeout(const Proxy &p, const MonoDelta &timeout) {
     SleepRequestPB req;
     SleepResponsePB resp;
-    // Sleep for 50ms longer than the call timeout.
-    int sleep_micros = timeout.ToMicroseconds() + 50000;
+    // Sleep for 500ms longer than the call timeout.
+    int sleep_micros = timeout.ToMicroseconds() + 500 * 1000;
     req.set_sleep_micros(sleep_micros);
 
     RpcController c;


[4/5] kudu git commit: fault_injection: use _exit instead of exit

Posted by mp...@apache.org.
fault_injection: use _exit instead of exit

80ac8bae335b490c7b75351e6d4c321a58183c73 caused some tests to fail in
TSAN due to the atexit handlers running when a process crashed with an
injected fault.

This switches to using _exit() instead of exit(), which is equivalent
except that atexit handlers do not run.

Change-Id: I72e30bfa750ace22e1e736e258b3f5720b25a651
Reviewed-on: http://gerrit.cloudera.org:8080/5361
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/4a8c86b4
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/4a8c86b4
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/4a8c86b4

Branch: refs/heads/master
Commit: 4a8c86b409367df27a743920b74cd93a54c499d2
Parents: 85b86d1
Author: Todd Lipcon <to...@apache.org>
Authored: Mon Dec 5 22:34:00 2016 +0800
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Mon Dec 5 17:34:50 2016 +0000

----------------------------------------------------------------------
 src/kudu/util/fault_injection.cc | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/4a8c86b4/src/kudu/util/fault_injection.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/fault_injection.cc b/src/kudu/util/fault_injection.cc
index 6a359a6..a14c8f3 100644
--- a/src/kudu/util/fault_injection.cc
+++ b/src/kudu/util/fault_injection.cc
@@ -17,6 +17,7 @@
 
 #include "kudu/util/fault_injection.h"
 
+#include <stdlib.h>
 #include <sys/time.h>
 
 #include "kudu/gutil/once.h"
@@ -51,7 +52,9 @@ void DoMaybeFault(const char* fault_str, double fraction) {
     return;
   }
   LOG(ERROR) << "Injecting fault: " << fault_str << " (process will exit)";
-  exit(kExitStatus);
+  // _exit will exit the program without running atexit handlers. This more
+  // accurately simiulates a crash.
+  _exit(kExitStatus);
 }
 
 void DoInjectRandomLatency(double max_latency_ms) {


[3/5] kudu git commit: KUDU-695. Avoid glog contention by deferring log writes to dedicated threads

Posted by mp...@apache.org.
KUDU-695. Avoid glog contention by deferring log writes to dedicated threads

This patch changes the logging init sequence to install a wrapper around
the built-in glog Loggers. The wrapper starts a new thread which does
double-buffering: log entries are buffered in a vector, and the thread
is woken up. The thread swaps in a clean buffer to avoid delaying
application threads while it flushes the original buffer to disk.

It's hard to test the end-to-end integration in a unit test, since the
unit tests disable logging to files, and this path only affects
file-based logging. However, I ran an earlier version of this patch in a
stress test environment and it seemed to reduce the frequency with which
I saw threads blocked on glog.

The patch does, however, have a test which exercises the new code paths,
including the blocking path. I looped the new test 500 times in TSAN
mode with success.

The new feature is enabled by default, but I left a hidden flag to
disable it in case we have any issues.

Change-Id: Ie22a0a29fa00a988a53a15d2c726ce5d49018f4d
Reviewed-on: http://gerrit.cloudera.org:8080/5321
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/85b86d12
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/85b86d12
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/85b86d12

Branch: refs/heads/master
Commit: 85b86d12c7ee1238a23f7745bb477b66b000a236
Parents: 916c21b
Author: Todd Lipcon <to...@apache.org>
Authored: Thu Dec 1 22:12:13 2016 -0800
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Mon Dec 5 17:34:10 2016 +0000

----------------------------------------------------------------------
 src/kudu/util/CMakeLists.txt             |   1 +
 src/kudu/util/async_logger.cc            | 149 +++++++++++++++++++
 src/kudu/util/async_logger.h             | 202 ++++++++++++++++++++++++++
 src/kudu/util/debug/leakcheck_disabler.h |   1 -
 src/kudu/util/logging-test.cc            |  81 ++++++++++-
 src/kudu/util/logging.cc                 |  29 ++++
 6 files changed, 461 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/85b86d12/src/kudu/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt
index 0977461..a89cc0b 100644
--- a/src/kudu/util/CMakeLists.txt
+++ b/src/kudu/util/CMakeLists.txt
@@ -108,6 +108,7 @@ if (${OPENSSL_VERSION} VERSION_LESS "1.0.2")
 endif()
 
 set(UTIL_SRCS
+  async_logger.cc
   atomic.cc
   bitmap.cc
   bloom_filter.cc

http://git-wip-us.apache.org/repos/asf/kudu/blob/85b86d12/src/kudu/util/async_logger.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/async_logger.cc b/src/kudu/util/async_logger.cc
new file mode 100644
index 0000000..b6456de
--- /dev/null
+++ b/src/kudu/util/async_logger.cc
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/async_logger.h"
+
+#include <algorithm>
+#include <string>
+#include <thread>
+
+#include "kudu/util/locks.h"
+
+using std::string;
+
+namespace kudu {
+
+AsyncLogger::AsyncLogger(google::base::Logger* wrapped,
+                         int max_buffer_bytes) :
+    max_buffer_bytes_(max_buffer_bytes),
+    wrapped_(DCHECK_NOTNULL(wrapped)),
+    wake_flusher_cond_(&lock_),
+    free_buffer_cond_(&lock_),
+    flush_complete_cond_(&lock_),
+    active_buf_(new Buffer()),
+    flushing_buf_(new Buffer()) {
+  DCHECK_GT(max_buffer_bytes_, 0);
+}
+
+AsyncLogger::~AsyncLogger() {}
+
+void AsyncLogger::Start() {
+  CHECK_EQ(state_, INITTED);
+  state_ = RUNNING;
+  thread_ = std::thread(&AsyncLogger::RunThread, this);
+}
+
+void AsyncLogger::Stop() {
+  {
+    MutexLock l(lock_);
+    CHECK_EQ(state_, RUNNING);
+    state_ = STOPPED;
+    wake_flusher_cond_.Signal();
+  }
+  thread_.join();
+  CHECK(active_buf_->messages.empty());
+  CHECK(flushing_buf_->messages.empty());
+}
+
+void AsyncLogger::Write(bool force_flush,
+                        time_t timestamp,
+                        const char* message,
+                        int message_len) {
+  {
+    MutexLock l(lock_);
+    DCHECK_EQ(state_, RUNNING);
+    while (BufferFull(*active_buf_)) {
+      app_threads_blocked_count_for_tests_++;
+      free_buffer_cond_.Wait();
+    }
+    active_buf_->add(Msg(timestamp, string(message, message_len)),
+                     force_flush);
+    wake_flusher_cond_.Signal();
+  }
+
+  // In most cases, we take the 'force_flush' argument to mean that we'll let the logger
+  // thread do the flushing for us, but not block the application. However, for the
+  // special case of a FATAL log message, we really want to make sure that our message
+  // hits the log before we continue, or else it's likely that the application will exit
+  // while it's still in our buffer.
+  //
+  // NOTE: even if the application doesn't wrap the FATAL-level logger, log messages at
+  // FATAL are also written to all other log files with lower levels. So, a FATAL message
+  // will force a synchronous flush of all lower-level logs before exiting.
+  //
+  // Unfortunately, the underlying log level isn't passed through to this interface, so we
+  // have to use this hack: messages from FATAL errors start with the character 'F'.
+  if (message_len > 0 && message[0] == 'F') {
+    Flush();
+  }
+}
+
+void AsyncLogger::Flush() {
+  MutexLock l(lock_);
+  DCHECK_EQ(state_, RUNNING);
+
+  // Wake up the writer thread at least twice.
+  // This ensures that it has completely flushed both buffers.
+  uint64_t orig_flush_count = flush_count_;
+  while (flush_count_ < orig_flush_count + 2 &&
+         state_ == RUNNING) {
+    active_buf_->flush = true;
+    wake_flusher_cond_.Signal();
+    flush_complete_cond_.Wait();
+  }
+}
+
+uint32 AsyncLogger::LogSize() {
+  return wrapped_->LogSize();
+}
+
+void AsyncLogger::RunThread() {
+  MutexLock l(lock_);
+  while (state_ == RUNNING || active_buf_->needs_flush_or_write()) {
+    while (!active_buf_->needs_flush_or_write() && state_ == RUNNING) {
+      wake_flusher_cond_.Wait();
+    }
+
+    active_buf_.swap(flushing_buf_);
+    // If the buffer that we are about to flush was full, then
+    // we may have other threads which were blocked that we now
+    // need to wake up.
+    if (BufferFull(*flushing_buf_)) {
+      free_buffer_cond_.Broadcast();
+    }
+    l.Unlock();
+
+    for (const auto& msg : flushing_buf_->messages) {
+      wrapped_->Write(false, msg.ts, msg.message.data(), msg.message.size());
+    }
+    if (flushing_buf_->flush) {
+      wrapped_->Flush();
+    }
+    flushing_buf_->clear();
+
+    l.Lock();
+    flush_count_++;
+    flush_complete_cond_.Broadcast();
+  }
+}
+
+bool AsyncLogger::BufferFull(const Buffer& buf) const {
+  // We evenly divide our total buffer space between the two buffers.
+  return buf.size > (max_buffer_bytes_ / 2);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/85b86d12/src/kudu/util/async_logger.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/async_logger.h b/src/kudu/util/async_logger.h
new file mode 100644
index 0000000..2804a9b
--- /dev/null
+++ b/src/kudu/util/async_logger.h
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "kudu/gutil/macros.h"
+
+#include <memory>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include "kudu/util/locks.h"
+
+namespace kudu {
+
+// Wrapper for a glog Logger which asynchronously writes log messages.
+// This class starts a new thread responsible for forwarding the messages
+// to the logger, and performs double buffering. Writers append to the
+// current buffer and then wake up the logger thread. The logger swaps in
+// a new buffer and writes any accumulated messages to the wrapped
+// Logger.
+//
+// This double-buffering behavior dramatically improves performance, especially
+// for logging messages which require flushing the underlying file (i.e WARNING
+// and above for default). The flush can take a couple of milliseconds, and in
+// some cases can even block for hundreds of milliseconds or more. With the
+// double-buffered approach, threads can proceed with useful work while the IO
+// thread blocks.
+//
+// The semantics provided by this wrapper are slightly weaker than the default
+// glog semantics. By default, glog will immediately (synchronously) flush WARNING
+// and above to the underlying file, whereas here we are deferring that flush to
+// the separate thread. This means that a crash just after a 'LOG(WARNING)' would
+// may be missing the message in the logs, but the perf benefit is probably
+// worth it. We do take care that a glog FATAL message flushes all buffered log
+// messages before exiting.
+//
+// NOTE: the logger limits the total amount of buffer space, so if the underlying
+// log blocks for too long, eventually the threads generating the log messages
+// will block as well. This prevents runaway memory usage.
+class AsyncLogger : public google::base::Logger {
+ public:
+  AsyncLogger(google::base::Logger* wrapped,
+              int max_buffer_bytes);
+  ~AsyncLogger();
+
+  void Start();
+
+  // Stop the thread. Flush() and Write() must not be called after this.
+  //
+  // NOTE: this is currently only used in tests: in real life, we enable async
+  // logging once when the program starts and then never disable it.
+  //
+  // REQUIRES: Start() must have been called.
+  void Stop();
+
+  // Write a message to the log.
+  //
+  // 'force_flush' is set by the GLog library based on the configured '--logbuflevel'
+  // flag. Any messages logged at the configured level or higher result in 'force_flush'
+  // being set to true, indicating that the message should be immediately written to the
+  // log rather than buffered in memory. See the class-level docs above for more detail
+  // about the implementation provided here.
+  //
+  // REQUIRES: Start() must have been called.
+  void Write(bool force_flush,
+             time_t timestamp,
+             const char* message,
+             int message_len) override;
+
+  // Flush any buffered messages.
+  void Flush() override;
+
+  // Get the current LOG file size.
+  // The returned value is approximate since some
+  // logged data may not have been flushed to disk yet.
+  uint32 LogSize() override;
+
+  // Return a count of how many times an application thread was
+  // blocked due to the buffers being full and the writer thread
+  // not keeping up.
+  int app_threads_blocked_count_for_tests() const {
+    MutexLock l(lock_);
+    return app_threads_blocked_count_for_tests_;
+  }
+
+ private:
+  // A buffered message.
+  //
+  // TODO(todd): using std::string for buffered messages is convenient but not
+  // as efficient as it could be. Better would be to make the buffers just be
+  // Arenas and allocate both the message data and Msg struct from them, forming
+  // a linked list.
+  struct Msg {
+    time_t ts;
+    std::string message;
+
+    Msg(time_t ts, std::string message)
+        : ts(ts),
+          message(std::move(message)) {
+    }
+  };
+
+  // A buffer of messages waiting to be flushed.
+  struct Buffer {
+    std::vector<Msg> messages;
+
+    // Estimate of the size of 'messages'.
+    int size = 0;
+
+    // Whether this buffer needs an explicit flush of the
+    // underlying logger.
+    bool flush = false;
+
+    Buffer() {}
+
+    void clear() {
+      messages.clear();
+      size = 0;
+      flush = false;
+    }
+
+    void add(Msg msg, bool flush) {
+      size += sizeof(msg) + msg.message.size();
+      messages.emplace_back(std::move(msg));
+      this->flush |= flush;
+    }
+
+    bool needs_flush_or_write() const {
+      return flush || !messages.empty();
+    }
+
+   private:
+    DISALLOW_COPY_AND_ASSIGN(Buffer);
+  };
+
+  bool BufferFull(const Buffer& buf) const;
+  void RunThread();
+
+  // The maximum number of bytes used by the entire class.
+  const int max_buffer_bytes_;
+  google::base::Logger* const wrapped_;
+  std::thread thread_;
+
+  // Count of how many times an application thread was blocked due to
+  // a full buffer.
+  int app_threads_blocked_count_for_tests_ = 0;
+
+  // Count of how many times the writer thread has flushed the buffers.
+  // 64 bits should be enough to never worry about overflow.
+  uint64_t flush_count_ = 0;
+
+  // Protects buffers as well as 'state_'.
+  mutable Mutex lock_;
+
+  // Signaled by app threads to wake up the flusher, either for new
+  // data or because 'state_' changed.
+  ConditionVariable wake_flusher_cond_;
+
+  // Signaled by the flusher thread when the flusher has swapped in
+  // a free buffer to write to.
+  ConditionVariable free_buffer_cond_;
+
+  // Signaled by the flusher thread when it has completed flushing
+  // the current buffer.
+  ConditionVariable flush_complete_cond_;
+
+  // The buffer to which application threads append new log messages.
+  std::unique_ptr<Buffer> active_buf_;
+
+  // The buffer currently being flushed by the logger thread, cleared
+  // after a successful flush.
+  std::unique_ptr<Buffer> flushing_buf_;
+
+  // Trigger for the logger thread to stop.
+  enum State {
+    INITTED,
+    RUNNING,
+    STOPPED
+  };
+  State state_ = INITTED;
+
+  DISALLOW_COPY_AND_ASSIGN(AsyncLogger);
+};
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/85b86d12/src/kudu/util/debug/leakcheck_disabler.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/debug/leakcheck_disabler.h b/src/kudu/util/debug/leakcheck_disabler.h
index bab724f..815f818 100644
--- a/src/kudu/util/debug/leakcheck_disabler.h
+++ b/src/kudu/util/debug/leakcheck_disabler.h
@@ -17,7 +17,6 @@
 #ifndef KUDU_UTIL_DEBUG_LEAKCHECK_DISABLER_H_
 #define KUDU_UTIL_DEBUG_LEAKCHECK_DISABLER_H_
 
-#include <gperftools/heap-checker.h>
 #include "kudu/gutil/macros.h"
 #include "kudu/util/debug/leak_annotations.h"
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/85b86d12/src/kudu/util/logging-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/logging-test.cc b/src/kudu/util/logging-test.cc
index 85ad3cd..08fc27d 100644
--- a/src/kudu/util/logging-test.cc
+++ b/src/kudu/util/logging-test.cc
@@ -18,11 +18,17 @@
 #include <glog/logging.h>
 #include <gmock/gmock.h>
 #include <string>
+#include <thread>
 #include <vector>
 
-#include "kudu/util/logging_test_util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/async_logger.h"
+#include "kudu/util/barrier.h"
+#include "kudu/util/locks.h"
 #include "kudu/util/logging.h"
+#include "kudu/util/logging_test_util.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/stopwatch.h"
 
 using std::string;
 using std::vector;
@@ -81,4 +87,77 @@ TEST(LoggingTest, TestAdvancedThrottling) {
   EXPECT_THAT(msgs[2], testing::ContainsRegex("test b$"));
 }
 
+// Test Logger implementation that just counts the number of messages
+// and flushes.
+//
+// This is purposefully thread-unsafe because we expect that the
+// AsyncLogger is only accessing the underlying logger from a single
+// thhread.
+class CountingLogger : public google::base::Logger {
+ public:
+  void Write(bool force_flush,
+             time_t /*timestamp*/,
+             const char* /*message*/,
+             int /*message_len*/) override {
+    message_count_++;
+    if (force_flush) {
+      Flush();
+    }
+  }
+
+  void Flush() override {
+    // Simulate a slow disk.
+    SleepFor(MonoDelta::FromMilliseconds(5));
+    flush_count_++;
+  }
+
+  uint32 LogSize() override {
+    return 0;
+  }
+
+  int flush_count_ = 0;
+  int message_count_ = 0;
+};
+
+TEST(LoggingTest, TestAsyncLogger) {
+  const int kNumThreads = 4;
+  const int kNumMessages = 10000;
+  const int kBuffer = 10000;
+  CountingLogger base;
+  AsyncLogger async(&base, kBuffer);
+  async.Start();
+
+  vector<std::thread> threads;
+  Barrier go_barrier(kNumThreads + 1);
+  // Start some threads writing log messages.
+  for (int i = 0; i < kNumThreads; i++) {
+    threads.emplace_back([&]() {
+        go_barrier.Wait();
+        for (int m = 0; m < kNumMessages; m++) {
+          async.Write(true, m, "x", 1);
+        }
+      });
+  }
+
+  // And a thread calling Flush().
+  threads.emplace_back([&]() {
+      go_barrier.Wait();
+      for (int i = 0; i < 10; i++) {
+        async.Flush();
+        SleepFor(MonoDelta::FromMilliseconds(3));
+      }
+    });
+
+  for (auto& t : threads) {
+    t.join();
+  }
+  async.Stop();
+  ASSERT_EQ(base.message_count_, kNumMessages * kNumThreads);
+  // The async logger should only flush once per "batch" rather than
+  // once per message, even though we wrote every message with
+  // 'flush' set to true.
+  ASSERT_LT(base.flush_count_, kNumMessages * kNumThreads);
+  ASSERT_GT(async.app_threads_blocked_count_for_tests(), 0);
+}
+
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/85b86d12/src/kudu/util/logging.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/logging.cc b/src/kudu/util/logging.cc
index 1032f97..d89988c 100644
--- a/src/kudu/util/logging.cc
+++ b/src/kudu/util/logging.cc
@@ -28,7 +28,9 @@
 
 #include "kudu/gutil/callback.h"
 #include "kudu/gutil/spinlock.h"
+#include "kudu/util/async_logger.h"
 #include "kudu/util/debug-util.h"
+#include "kudu/util/debug/leakcheck_disabler.h"
 #include "kudu/util/flag_tags.h"
 
 DEFINE_string(log_filename, "",
@@ -36,6 +38,16 @@ DEFINE_string(log_filename, "",
     "full path is <log_dir>/<log_filename>.[INFO|WARN|ERROR|FATAL]");
 TAG_FLAG(log_filename, stable);
 
+DEFINE_bool(log_async, true,
+            "Enable asynchronous writing to log files. This improves "
+            "latency and stability.");
+TAG_FLAG(log_async, hidden);
+
+DEFINE_int32(log_async_buffer_bytes_per_level, 2 * 1024 * 1024,
+             "The number of bytes of buffer space used by each log "
+             "level. Only relevant when --log_async is enabled.");
+TAG_FLAG(log_async_buffer_bytes_per_level, hidden);
+
 #define PROJ_NAME "kudu"
 
 bool logging_initialized = false;
@@ -100,6 +112,18 @@ SimpleSink* registered_sink = nullptr;
 // Protected by 'logging_mutex'.
 int initial_stderr_severity;
 
+void EnableAsyncLogging() {
+  debug::ScopedLeakCheckDisabler leaky;
+
+  // Enable Async for every level except for FATAL. Fatal should be synchronous
+  // to ensure that we get the fatal log message written before exiting.
+  for (auto level : { google::INFO, google::WARNING, google::ERROR }) {
+    auto* orig = google::base::GetLogger(level);
+    auto* async = new AsyncLogger(orig, FLAGS_log_async_buffer_bytes_per_level);
+    async->Start();
+    google::base::SetLogger(level, async);
+  }
+}
 
 void UnregisterLoggingCallbackUnlocked() {
   CHECK(logging_mutex.IsHeld());
@@ -219,6 +243,11 @@ void InitGoogleLoggingSafe(const char* arg) {
   // Stderr logging threshold: FLAGS_stderrthreshold.
   // Sink logging: off.
   initial_stderr_severity = FLAGS_stderrthreshold;
+
+  if (FLAGS_log_async) {
+    EnableAsyncLogging();
+  }
+
   logging_initialized = true;
 }
 


[5/5] kudu git commit: raft_consensus: avoid some unecessary allocations in hot path

Posted by mp...@apache.org.
raft_consensus: avoid some unecessary allocations in hot path

In the stress cluster I noticed that UpdateConsensus does a lot of
allocation in LockForUpdate. In particular, it was constructing a
ConsensusStatusPB just to check whether it was currently a voter, and
that only to provide a log message. That PB has a lot of various strings
and other objects inside of it which caused unnecessary allocation.

We can already get this same information from the existing RaftConfigPB
object in ConsensusMeta, and a lot cheaper.

Change-Id: I75abcaaaed281e5ac1768ea3014064925db6c030
Reviewed-on: http://gerrit.cloudera.org:8080/5344
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mp...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/e14b8249
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/e14b8249
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/e14b8249

Branch: refs/heads/master
Commit: e14b82496da992c40af3fbf2c24a97f38b1d96cc
Parents: 4a8c86b
Author: Todd Lipcon <to...@apache.org>
Authored: Fri Dec 2 18:15:10 2016 -0800
Committer: Mike Percy <mp...@apache.org>
Committed: Mon Dec 5 17:39:32 2016 +0000

----------------------------------------------------------------------
 src/kudu/consensus/raft_consensus_state.cc | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/e14b8249/src/kudu/consensus/raft_consensus_state.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_state.cc b/src/kudu/consensus/raft_consensus_state.cc
index a0b38d4..af5943b 100644
--- a/src/kudu/consensus/raft_consensus_state.cc
+++ b/src/kudu/consensus/raft_consensus_state.cc
@@ -145,7 +145,7 @@ Status ReplicaState::LockForUpdate(UniqueLock* lock) const {
   if (PREDICT_FALSE(state_ != kRunning)) {
     return Status::IllegalState("Replica not in running state");
   }
-  if (!IsRaftConfigVoter(peer_uuid_, ConsensusStateUnlocked(CONSENSUS_CONFIG_ACTIVE).config())) {
+  if (!IsRaftConfigVoter(peer_uuid_, cmeta_->active_config())) {
     LOG_WITH_PREFIX_UNLOCKED(INFO) << "Allowing update even though not a member of the config";
   }
   lock->swap(l);