You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2017/10/17 16:39:12 UTC

[1/2] kudu git commit: thread: improve performance of starting threads

Repository: kudu
Updated Branches:
  refs/heads/master fe23710c6 -> d76220245


thread: improve performance of starting threads

This improves the performance of starting a new thread using
kudu::Thread::StartThread(). Previously, the process of starting the
thread was a little bit more complicated than necessary, involving a
sequence of the child waiting for the parent to do something, then the
parent waiting for the child to do something. Now, neither one has to
wait for the other to proceed.

The one wrinkle here is that for the parent to know the child's tid, it
does have to wait for it to publish it. However, usage of the 'tid()'
function is relatively rare, so we can defer that waiting until someone
accesses it, and then only wait in the odd case that it hasn't yet
published.

This patch includes a very simple benchmark which starts 1000 threads
and then checks the pid for each.

Timings based on the 'Time spent' log output:
---------------------------------------------
Before:
      wall              user             sys
 Min.   :0.06500   Min.   :0.0080   Min.   :0.0160
 1st Qu.:0.06975   1st Qu.:0.0160   1st Qu.:0.0200
 Median :0.07100   Median :0.0200   Median :0.0240
 Mean   :0.07080   Mean   :0.0198   Mean   :0.0234
 3rd Qu.:0.07200   3rd Qu.:0.0240   3rd Qu.:0.0280
 Max.   :0.08200   Max.   :0.0280   Max.   :0.0320

After:
      wall              user             sys
 Min.   :0.01800   Min.   :0.0040   Min.   :0.000
 1st Qu.:0.02575   1st Qu.:0.0080   1st Qu.:0.012
 Median :0.02700   Median :0.0120   Median :0.016
 Mean   :0.02680   Mean   :0.0126   Mean   :0.016
 3rd Qu.:0.02925   3rd Qu.:0.0160   3rd Qu.:0.020
 Max.   :0.03200   Max.   :0.0280   Max.   :0.028

perf-stat results
------------------
Before:
 Performance counter stats for 'build/latest/bin/thread-test --gtest_filter=*Benchmark* --gtest_repeat=10' (10 runs):

       1095.617237      task-clock (msec)         #    0.863 CPUs utilized            ( +-  0.54% )
            29,552      context-switches          #    0.027 M/sec                    ( +-  0.75% )
               354      cpu-migrations            #    0.323 K/sec                    ( +-  4.55% )
            20,848      page-faults               #    0.019 M/sec                    ( +-  0.11% )
     2,424,700,945      cycles                    #    2.213 GHz                      ( +-  0.35% )
     1,369,266,675      instructions              #    0.56  insn per cycle           ( +-  0.12% )
       286,851,480      branches                  #  261.817 M/sec                    ( +-  0.11% )
         5,926,839      branch-misses             #    2.07% of all branches          ( +-  0.39% )

       1.269172342 seconds time elapsed                                          ( +-  0.33% )

After:
 Performance counter stats for 'build/latest/bin/thread-test --gtest_filter=*Benchmark* --gtest_repeat=10' (10 runs):

        790.618424      task-clock (msec)         #    1.677 CPUs utilized            ( +-  0.77% )
            17,178      context-switches          #    0.022 M/sec                    ( +-  0.91% )
             3,869      cpu-migrations            #    0.005 M/sec                    ( +-  0.76% )
            20,921      page-faults               #    0.026 M/sec                    ( +-  0.05% )
     2,149,909,940      cycles                    #    2.719 GHz                      ( +-  0.41% )
     1,149,982,930      instructions              #    0.53  insn per cycle           ( +-  0.09% )
       239,776,764      branches                  #  303.277 M/sec                    ( +-  0.08% )
         4,890,927      branch-misses             #    2.04% of all branches          ( +-  0.62% )

       0.471358038 seconds time elapsed                                          ( +-  0.76% )

Change-Id: I5ce7f409ed33548142d1180c9f9653a8f51a7879
Reviewed-on: http://gerrit.cloudera.org:8080/8257
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/9fa9cf18
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/9fa9cf18
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/9fa9cf18

Branch: refs/heads/master
Commit: 9fa9cf181ded1736c6014792fe619f5e0b9a94b4
Parents: fe23710
Author: Todd Lipcon <to...@apache.org>
Authored: Tue Oct 10 20:38:03 2017 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Tue Oct 17 16:36:27 2017 +0000

----------------------------------------------------------------------
 src/kudu/client/client-test.cc |  4 ++
 src/kudu/util/thread-test.cc   | 22 ++++++++-
 src/kudu/util/thread.cc        | 98 ++++++++++++++++++++-----------------
 src/kudu/util/thread.h         | 34 ++++++++++---
 4 files changed, 105 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/9fa9cf18/src/kudu/client/client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 450449e..2b8ae5f 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -4799,6 +4799,10 @@ TEST_F(ClientTest, TestServerTooBusyRetry) {
                                    &ClientTest::CheckRowCount, this, client_table_.get(), kNumRows,
                                    &thread));
     threads.push_back(thread);
+    // Don't start threads too fast - otherwise we could accumulate tens or hundreds
+    // of threads before any of them starts their actual scans, and then it would
+    // take a long time to join on them all eventually finishing down below.
+    SleepFor(MonoDelta::FromMilliseconds(100));
 
     for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
       scoped_refptr<Counter> counter = METRIC_rpcs_queue_overflow.Instantiate(

http://git-wip-us.apache.org/repos/asf/kudu/blob/9fa9cf18/src/kudu/util/thread-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/thread-test.cc b/src/kudu/util/thread-test.cc
index a7d3d9c..69b9cf0 100644
--- a/src/kudu/util/thread-test.cc
+++ b/src/kudu/util/thread-test.cc
@@ -22,6 +22,7 @@
 
 #include <ostream>
 #include <string>
+#include <vector>
 
 #include <glog/logging.h>
 #include <gtest/gtest.h>
@@ -33,8 +34,9 @@
 #include "kudu/util/countdown_latch.h"
 #include "kudu/util/env.h"
 #include "kudu/util/status.h"
-#include "kudu/util/test_util.h"
+#include "kudu/util/stopwatch.h"
 #include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
 #include "kudu/util/thread_restrictions.h"
 
 using std::string;
@@ -117,6 +119,24 @@ TEST_F(ThreadTest, TestCallOnExit) {
   ASSERT_EQ("hello 1, hello 2", s);
 }
 
+TEST_F(ThreadTest, ThreadStartBenchmark) {
+  std::vector<scoped_refptr<Thread>> threads(1000);
+  LOG_TIMING(INFO, "starting threads") {
+    for (auto& t : threads) {
+      ASSERT_OK(Thread::Create("test", "TestCallOnExit", usleep, 0, &t));
+    }
+  }
+  LOG_TIMING(INFO, "waiting for all threads to publish TIDs") {
+    for (auto& t : threads) {
+      t->tid();
+    }
+  }
+
+  for (auto& t : threads) {
+    t->Join();
+  }
+}
+
 // The following tests only run in debug mode, since thread restrictions are no-ops
 // in release builds.
 #ifndef NDEBUG

http://git-wip-us.apache.org/repos/asf/kudu/blob/9fa9cf18/src/kudu/util/thread.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/thread.cc b/src/kudu/util/thread.cc
index 3674f06..0378bb6 100644
--- a/src/kudu/util/thread.cc
+++ b/src/kudu/util/thread.cc
@@ -50,7 +50,6 @@
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/debug-util.h"
-#include "kudu/util/errno.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/kernel_stack_watchdog.h"
 #include "kudu/util/logging.h"
@@ -58,6 +57,7 @@
 #include "kudu/util/monotime.h"
 #include "kudu/util/mutex.h"
 #include "kudu/util/os-util.h"
+#include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/stopwatch.h"
 #include "kudu/util/trace.h"
 #include "kudu/util/url-coding.h"
@@ -515,20 +515,57 @@ void Thread::CallAtExit(const Closure& cb) {
 }
 
 std::string Thread::ToString() const {
-  return Substitute("Thread $0 (name: \"$1\", category: \"$2\")", tid_, name_, category_);
+  return Substitute("Thread $0 (name: \"$1\", category: \"$2\")", tid(), name_, category_);
 }
 
+int64_t Thread::WaitForTid() const {
+  const string log_prefix = Substitute("$0 ($1) ", name_, category_);
+  SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500 /* ms */, log_prefix,
+                                   "waiting for new thread to publish its TID");
+  int loop_count = 0;
+  while (true) {
+    int64_t t = Acquire_Load(&tid_);
+    if (t != PARENT_WAITING_TID) return t;
+    boost::detail::yield(loop_count++);
+  }
+}
+
+
 Status Thread::StartThread(const std::string& category, const std::string& name,
                            const ThreadFunctor& functor, uint64_t flags,
                            scoped_refptr<Thread> *holder) {
   TRACE_COUNTER_INCREMENT("threads_started", 1);
   TRACE_COUNTER_SCOPE_LATENCY_US("thread_start_us");
+  GoogleOnceInit(&once, &InitThreading);
+
   const string log_prefix = Substitute("$0 ($1) ", name, category);
   SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500 /* ms */, log_prefix, "starting thread");
 
   // Temporary reference for the duration of this function.
   scoped_refptr<Thread> t(new Thread(category, name, functor));
 
+  // Optional, and only set if the thread was successfully created.
+  //
+  // We have to set this before we even start the thread because it's
+  // allowed for the thread functor to access 'holder'.
+  if (holder) {
+    *holder = t;
+  }
+
+  t->tid_ = PARENT_WAITING_TID;
+
+  // Add a reference count to the thread since SuperviseThread() needs to
+  // access the thread object, and we have no guarantee that our caller
+  // won't drop the reference as soon as we return. This is dereferenced
+  // in FinishThread().
+  t->AddRef();
+
+  auto cleanup = MakeScopedCleanup([&]() {
+      // If we failed to create the thread, we need to undo all of our prep work.
+      t->tid_ = INVALID_TID;
+      t->Release();
+    });
+
   if (PREDICT_FALSE(FLAGS_thread_inject_start_latency_ms > 0)) {
     LOG(INFO) << "Injecting " << FLAGS_thread_inject_start_latency_ms << "ms sleep on thread start";
     SleepFor(MonoDelta::FromMilliseconds(FLAGS_thread_inject_start_latency_ms));
@@ -549,28 +586,7 @@ Status Thread::StartThread(const std::string& category, const std::string& name,
   // (or someone communicating with the parent) can join, so joinable must
   // be set before the parent returns.
   t->joinable_ = true;
-
-  // Optional, and only set if the thread was successfully created.
-  if (holder) {
-    *holder = t;
-  }
-
-  // The tid_ member goes through the following states:
-  // 1  CHILD_WAITING_TID: the child has just been spawned and is waiting
-  //    for the parent to finish writing to caller state (i.e. 'holder').
-  // 2. PARENT_WAITING_TID: the parent has updated caller state and is now
-  //    waiting for the child to write the tid.
-  // 3. <value>: both the parent and the child are free to continue. If the
-  //    value is INVALID_TID, the child could not discover its tid.
-  Release_Store(&t->tid_, PARENT_WAITING_TID);
-  {
-    SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500 /* ms */, log_prefix,
-                                     "waiting for new thread to publish its TID");
-    int loop_count = 0;
-    while (Acquire_Load(&t->tid_) == PARENT_WAITING_TID) {
-      boost::detail::yield(loop_count++);
-    }
-  }
+  cleanup.cancel();
 
   VLOG(2) << "Started thread " << t->tid()<< " - " << category << ":" << name;
   return Status::OK();
@@ -579,14 +595,9 @@ Status Thread::StartThread(const std::string& category, const std::string& name,
 void* Thread::SuperviseThread(void* arg) {
   Thread* t = static_cast<Thread*>(arg);
   int64_t system_tid = Thread::CurrentThreadId();
-  if (system_tid == -1) {
-    string error_msg = ErrnoToString(errno);
-    KLOG_EVERY_N(INFO, 100) << "Could not determine thread ID: " << error_msg;
-  }
-  string name = strings::Substitute("$0-$1", t->name(), system_tid);
+  PCHECK(system_tid != -1);
 
   // Take an additional reference to the thread manager, which we'll need below.
-  GoogleOnceInit(&once, &InitThreading);
   ANNOTATE_IGNORE_SYNC_BEGIN();
   shared_ptr<ThreadMgr> thread_mgr_ref = thread_manager;
   ANNOTATE_IGNORE_SYNC_END();
@@ -594,21 +605,17 @@ void* Thread::SuperviseThread(void* arg) {
   // Set up the TLS.
   //
   // We could store a scoped_refptr in the TLS itself, but as its
-  // lifecycle is poorly defined, we'll use a bare pointer and take an
-  // additional reference on t out of band, in thread_ref.
-  scoped_refptr<Thread> thread_ref = t;
-  t->tls_ = t;
+  // lifecycle is poorly defined, we'll use a bare pointer. We
+  // already incremented the reference count in StartThread.
+  Thread::tls_ = t;
 
-  // Wait until the parent has updated all caller-visible state, then write
-  // the TID to 'tid_', thus completing the parent<-->child handshake.
-  int loop_count = 0;
-  while (Acquire_Load(&t->tid_) == CHILD_WAITING_TID) {
-    boost::detail::yield(loop_count++);
-  }
+  // Publish our tid to 'tid_', which unblocks any callers waiting in
+  // WaitForTid().
   Release_Store(&t->tid_, system_tid);
 
-  thread_manager->SetThreadName(name, t->tid());
-  thread_manager->AddThread(pthread_self(), name, t->category(), t->tid());
+  string name = strings::Substitute("$0-$1", t->name(), system_tid);
+  thread_manager->SetThreadName(name, t->tid_);
+  thread_manager->AddThread(pthread_self(), name, t->category(), t->tid_);
 
   // FinishThread() is guaranteed to run (even if functor_ throws an
   // exception) because pthread_cleanup_push() creates a scoped object
@@ -636,8 +643,11 @@ void Thread::FinishThread(void* arg) {
   // Signal any Joiner that we're done.
   t->done_.CountDown();
 
-  VLOG(2) << "Ended thread " << t->tid() << " - "
-          << t->category() << ":" << t->name();
+  VLOG(2) << "Ended thread " << t->tid_ << " - " << t->category() << ":" << t->name();
+  t->Release();
+  // NOTE: the above 'Release' call could be the last reference to 'this',
+  // so 'this' could be destructed at this point. Do not add any code
+  // following here!
 }
 
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/9fa9cf18/src/kudu/util/thread.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/thread.h b/src/kudu/util/thread.h
index c8b5191..f404c30 100644
--- a/src/kudu/util/thread.h
+++ b/src/kudu/util/thread.h
@@ -36,6 +36,7 @@
 #include <boost/bind.hpp>     // IWYU pragma: keep
 #include <boost/function.hpp> // IWYU pragma: keep
 
+#include "kudu/gutil/atomicops.h"
 #include "kudu/gutil/callback.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
@@ -215,9 +216,18 @@ class Thread : public RefCountedThreadSafe<Thread> {
   // This callback is guaranteed to be called except in the case of a process crash.
   void CallAtExit(const Closure& cb);
 
-  // The thread ID assigned to this thread by the operating system. If the OS does not
-  // support retrieving the tid, returns Thread::INVALID_TID.
-  int64_t tid() const { return tid_; }
+  // The thread ID assigned to this thread by the operating system. If the thread
+  // has not yet started running, returns INVALID_TID.
+  //
+  // NOTE: this may block for a short amount of time if the thread has just been
+  // started.
+  int64_t tid() const {
+    int64_t t = base::subtle::Acquire_Load(&tid_);
+    if (t != PARENT_WAITING_TID) {
+      return tid_;
+    }
+    return WaitForTid();
+  }
 
   // Returns the thread's pthread ID.
   pthread_t pthread_id() const { return thread_; }
@@ -279,12 +289,10 @@ class Thread : public RefCountedThreadSafe<Thread> {
  private:
   friend class ThreadJoiner;
 
-  // The various special values for tid_ that describe the various steps
-  // in the parent<-->child handshake.
+  // See 'tid_' docs.
   enum {
     INVALID_TID = -1,
-    CHILD_WAITING_TID = -2,
-    PARENT_WAITING_TID = -3,
+    PARENT_WAITING_TID = -2,
   };
 
   // Function object that wraps the user-supplied function to run in a separate thread.
@@ -294,7 +302,7 @@ class Thread : public RefCountedThreadSafe<Thread> {
       : thread_(0),
         category_(std::move(category)),
         name_(std::move(name)),
-        tid_(CHILD_WAITING_TID),
+        tid_(INVALID_TID),
         functor_(std::move(functor)),
         done_(1),
         joinable_(false) {}
@@ -308,6 +316,13 @@ class Thread : public RefCountedThreadSafe<Thread> {
 
   // OS-specific thread ID. Once the constructor finishes StartThread(),
   // guaranteed to be set either to a non-negative integer, or to INVALID_TID.
+  //
+  // The tid_ member goes through the following states:
+  // 1. INVALID_TID: the thread has not been started, or has already exited.
+  // 2. PARENT_WAITING_TID: the parent has started the thread, but the
+  //    thread has not yet begun running. Therefore the TID is not yet known
+  //    but it will be set once the thread starts.
+  // 3. <positive value>: the thread is running.
   int64_t tid_;
 
   // User function to be executed by this thread.
@@ -328,6 +343,9 @@ class Thread : public RefCountedThreadSafe<Thread> {
 
   std::vector<Closure> exit_callbacks_;
 
+  // Wait for the running thread to publish its tid.
+  int64_t WaitForTid() const;
+
   // Starts the thread running SuperviseThread(), and returns once that thread has
   // initialised and its TID has been read. Waits for notification from the started
   // thread that initialisation is complete before returning. On success, stores a


[2/2] kudu git commit: once: report init_succeeded instead of initted

Posted by ad...@apache.org.
once: report init_succeeded instead of initted

We use KuduOnceDynamic to lazily open various objects, ensuring various
initializations happen once and only once. By design, after the first
call to its member function, further calls to Init() will no-op, and the
KuduOnceDynamic will report that it has been initted. This can be
problematic if the function fails, as in a few places, we condition on
this initted state, assuming "initted" means "init succeeded".

This patch changes KuduOnceDynamic's API to report whether the init
succeeded, still maintaining the property that the function is run once.

One codepath that is particularly vulnerable to this is the InitOnce of
DeltaFileReader, which reads some stats from disk into memory, and can
go un-initialized until scan-time. Before, if a deltafile were corrupted
and scanned, the scan would return with an error upon initializing the
DeltaFileReader (which would at that point be considered "initted"). If
it were scanned a second time, seeing that it had been initted, the
DeltaFileReader would attempt to access the in-memory stats and hit a
nullptr error. A test is added to tablet_server-test to demonstrate
that this fails reliably.

Change-Id: I7ac7131144392d673e0a72a1ba9920bcf9fd991c
Reviewed-on: http://gerrit.cloudera.org:8080/8276
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/d7622024
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/d7622024
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/d7622024

Branch: refs/heads/master
Commit: d7622024579b93956a86dbe1b61dedd890adcbfc
Parents: 9fa9cf1
Author: Andrew Wong <aw...@cloudera.com>
Authored: Fri Oct 13 14:22:12 2017 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Tue Oct 17 16:38:40 2017 +0000

----------------------------------------------------------------------
 src/kudu/cfile/bloomfile.cc                    |  2 +-
 src/kudu/cfile/cfile-test.cc                   | 23 ++------
 src/kudu/cfile/cfile_reader.cc                 |  6 +--
 src/kudu/cfile/cfile_reader.h                  |  8 +--
 src/kudu/fs/fs-test-util.h                     | 30 +++++++++++
 src/kudu/tablet/deltafile.cc                   |  4 +-
 src/kudu/tablet/deltafile.h                    |  6 +--
 src/kudu/tserver/tablet_copy_source_session.cc |  6 +--
 src/kudu/tserver/tablet_copy_source_session.h  |  8 +--
 src/kudu/tserver/tablet_server-test.cc         | 59 +++++++++++++++++++++
 src/kudu/util/once-test.cc                     |  8 +--
 src/kudu/util/once.h                           | 27 ++++++----
 12 files changed, 134 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/d7622024/src/kudu/cfile/bloomfile.cc
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/bloomfile.cc b/src/kudu/cfile/bloomfile.cc
index fdbaad7..4277045 100644
--- a/src/kudu/cfile/bloomfile.cc
+++ b/src/kudu/cfile/bloomfile.cc
@@ -283,7 +283,7 @@ Status BloomFileReader::ParseBlockHeader(const Slice &block,
 
 Status BloomFileReader::CheckKeyPresent(const BloomKeyProbe &probe,
                                         bool *maybe_present) {
-  DCHECK(init_once_.initted());
+  DCHECK(init_once_.init_succeeded());
 
   // Since we frequently will access the same BloomFile many times in a row
   // when processing a batch of operations, we put our state in a small thread-local

http://git-wip-us.apache.org/repos/asf/kudu/blob/d7622024/src/kudu/cfile/cfile-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/cfile-test.cc b/src/kudu/cfile/cfile-test.cc
index f7ffe85..c9f0a09 100644
--- a/src/kudu/cfile/cfile-test.cc
+++ b/src/kudu/cfile/cfile-test.cc
@@ -96,6 +96,7 @@ namespace cfile {
 
 using fs::BlockManager;
 using fs::CountingReadableBlock;
+using fs::CreateCorruptBlock;
 using fs::ReadableBlock;
 using fs::WritableBlock;
 
@@ -362,27 +363,13 @@ class TestCFile : public CFileTestBase {
 
   Status CorruptAndReadBlock(const BlockId block_id, const uint64_t corrupt_offset,
                              uint8_t flip_bit) {
-    // Read the input block
-    unique_ptr<ReadableBlock> source;
-    RETURN_NOT_OK(fs_manager_->OpenBlock(block_id, &source));
-    uint64_t file_size;
-    RETURN_NOT_OK(source->Size(&file_size));
-    uint8_t data_scratch[file_size];
-    Slice data(data_scratch, file_size);
-    RETURN_NOT_OK(source->Read(0, data));
-
-    // Corrupt the data and write to a new block
-    uint8_t orig = data.data()[corrupt_offset];
-    uint8_t corrupt = orig ^ (static_cast<uint8_t>(1) << flip_bit);
-    data.mutable_data()[corrupt_offset] = corrupt;
-    unique_ptr<fs::WritableBlock> writer;
-    RETURN_NOT_OK(fs_manager_->CreateNewBlock({}, &writer));
-    RETURN_NOT_OK(writer->Append(data));
-    RETURN_NOT_OK(writer->Close());
+    BlockId new_id;
+    RETURN_NOT_OK(CreateCorruptBlock(
+        fs_manager_.get(), block_id, corrupt_offset, flip_bit, &new_id));
 
     // Open and read the corrupt block with the CFileReader
     unique_ptr<ReadableBlock> corrupt_source;
-    RETURN_NOT_OK(fs_manager_->OpenBlock(writer->id(), &corrupt_source));
+    RETURN_NOT_OK(fs_manager_->OpenBlock(new_id, &corrupt_source));
     unique_ptr<CFileReader> reader;
     RETURN_NOT_OK(CFileReader::Open(std::move(corrupt_source), ReaderOptions(), &reader));
     gscoped_ptr<IndexTreeIterator> iter;

http://git-wip-us.apache.org/repos/asf/kudu/blob/d7622024/src/kudu/cfile/cfile_reader.cc
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/cfile_reader.cc b/src/kudu/cfile/cfile_reader.cc
index f69a377..c6e163a 100644
--- a/src/kudu/cfile/cfile_reader.cc
+++ b/src/kudu/cfile/cfile_reader.cc
@@ -202,7 +202,7 @@ Status CFileReader::Init() {
 Status CFileReader::ReadAndParseHeader() {
   TRACE_EVENT1("io", "CFileReader::ReadAndParseHeader",
                "cfile", ToString());
-  DCHECK(!init_once_.initted());
+  DCHECK(!init_once_.init_succeeded());
 
   // First read and parse the "pre-header", which lets us know
   // that it is indeed a CFile and tells us the length of the
@@ -254,7 +254,7 @@ Status CFileReader::ReadAndParseHeader() {
 Status CFileReader::ReadAndParseFooter() {
   TRACE_EVENT1("io", "CFileReader::ReadAndParseFooter",
                "cfile", ToString());
-  DCHECK(!init_once_.initted());
+  DCHECK(!init_once_.init_succeeded());
   CHECK_GT(file_size_, kMagicAndLengthSize) <<
     "file too short: " << file_size_;
 
@@ -419,7 +419,7 @@ class ScratchMemory {
 
 Status CFileReader::ReadBlock(const BlockPointer &ptr, CacheControl cache_control,
                               BlockHandle *ret) const {
-  DCHECK(init_once_.initted());
+  DCHECK(init_once_.init_succeeded());
   CHECK(ptr.offset() > 0 &&
         ptr.offset() + ptr.size() < file_size_) <<
     "bad offset " << ptr.ToString() << " in file of size "

http://git-wip-us.apache.org/repos/asf/kudu/blob/d7622024/src/kudu/cfile/cfile_reader.h
----------------------------------------------------------------------
diff --git a/src/kudu/cfile/cfile_reader.h b/src/kudu/cfile/cfile_reader.h
index 16e54ae..42d7b0b 100644
--- a/src/kudu/cfile/cfile_reader.h
+++ b/src/kudu/cfile/cfile_reader.h
@@ -132,12 +132,12 @@ class CFileReader {
   }
 
   const TypeInfo *type_info() const {
-    DCHECK(init_once_.initted());
+    DCHECK(init_once_.init_succeeded());
     return type_info_;
   }
 
   const TypeEncodingInfo *type_encoding_info() const {
-    DCHECK(init_once_.initted());
+    DCHECK(init_once_.init_succeeded());
     return type_encoding_info_;
   }
 
@@ -146,12 +146,12 @@ class CFileReader {
   }
 
   const CFileHeaderPB &header() const {
-    DCHECK(init_once_.initted());
+    DCHECK(init_once_.init_succeeded());
     return *DCHECK_NOTNULL(header_.get());
   }
 
   const CFileFooterPB &footer() const {
-    DCHECK(init_once_.initted());
+    DCHECK(init_once_.init_succeeded());
     return *DCHECK_NOTNULL(footer_.get());
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/d7622024/src/kudu/fs/fs-test-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/fs/fs-test-util.h b/src/kudu/fs/fs-test-util.h
index 7fa8ee2..a8e7257 100644
--- a/src/kudu/fs/fs-test-util.h
+++ b/src/kudu/fs/fs-test-util.h
@@ -23,8 +23,10 @@
 #include <vector>
 
 #include "kudu/fs/block_manager.h"
+#include "kudu/fs/fs_manager.h"
 #include "kudu/util/array_view.h"
 #include "kudu/util/malloc.h"
+#include "kudu/util/slice.h"
 
 namespace kudu {
 namespace fs {
@@ -88,6 +90,34 @@ class CountingReadableBlock : public ReadableBlock {
   size_t* bytes_read_;
 };
 
+// Creates a copy of the specified block and corrupts a byte of its data at the
+// given 'corrupt_offset' by flipping a bit at offset 'flip_bit'. Returns the
+// block id of the corrupted block. Does not change the original block.
+inline Status CreateCorruptBlock(FsManager* fs_manager, const BlockId in_id,
+    const uint64_t corrupt_offset, uint8_t flip_bit, BlockId* out_id) {
+  DCHECK_LT(flip_bit, 8);
+
+  // Read the input block
+  std::unique_ptr<ReadableBlock> source;
+  RETURN_NOT_OK(fs_manager->OpenBlock(in_id, &source));
+  uint64_t file_size;
+  RETURN_NOT_OK(source->Size(&file_size));
+  uint8_t data_scratch[file_size];
+  Slice data(data_scratch, file_size);
+  RETURN_NOT_OK(source->Read(0, data));
+
+  // Corrupt the data and write to a new block
+  uint8_t orig = data.data()[corrupt_offset];
+  uint8_t corrupt = orig ^ (static_cast<uint8_t>(1) << flip_bit);
+  data.mutable_data()[corrupt_offset] = corrupt;
+  std::unique_ptr<WritableBlock> writer;
+  RETURN_NOT_OK(fs_manager->CreateNewBlock({}, &writer));
+  RETURN_NOT_OK(writer->Append(data));
+  RETURN_NOT_OK(writer->Close());
+  *out_id = writer->id();
+  return Status::OK();
+}
+
 } // namespace fs
 } // namespace kudu
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/d7622024/src/kudu/tablet/deltafile.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltafile.cc b/src/kudu/tablet/deltafile.cc
index f4a391d..60ddedb 100644
--- a/src/kudu/tablet/deltafile.cc
+++ b/src/kudu/tablet/deltafile.cc
@@ -268,7 +268,7 @@ Status DeltaFileReader::ReadDeltaStats() {
 }
 
 bool DeltaFileReader::IsRelevantForSnapshot(const MvccSnapshot& snap) const {
-  if (!init_once_.initted()) {
+  if (!init_once_.init_succeeded()) {
     // If we're not initted, it means we have no delta stats and must
     // assume that this file is relevant for every snapshot.
     return true;
@@ -298,7 +298,7 @@ Status DeltaFileReader::NewDeltaIterator(const Schema *projection,
                                          DeltaIterator** iterator) const {
   if (IsRelevantForSnapshot(snap)) {
     if (VLOG_IS_ON(2)) {
-      if (!init_once_.initted()) {
+      if (!init_once_.init_succeeded()) {
         TRACE_COUNTER_INCREMENT("delta_iterators_lazy_initted", 1);
 
         VLOG(2) << (delta_type_ == REDO ? "REDO" : "UNDO") << " delta " << ToString()

http://git-wip-us.apache.org/repos/asf/kudu/blob/d7622024/src/kudu/tablet/deltafile.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltafile.h b/src/kudu/tablet/deltafile.h
index 75a6b79..8a2d703 100644
--- a/src/kudu/tablet/deltafile.h
+++ b/src/kudu/tablet/deltafile.h
@@ -154,7 +154,7 @@ class DeltaFileReader : public DeltaStore,
   virtual Status Init() OVERRIDE;
 
   virtual bool Initted() OVERRIDE {
-    return init_once_.initted();
+    return init_once_.init_succeeded();
   }
 
   // See DeltaStore::NewDeltaIterator(...)
@@ -170,12 +170,12 @@ class DeltaFileReader : public DeltaStore,
   const BlockId& block_id() const { return reader_->block_id(); }
 
   virtual const DeltaStats& delta_stats() const OVERRIDE {
-    DCHECK(init_once_.initted());
+    DCHECK(init_once_.init_succeeded());
     return *delta_stats_;
   }
 
   virtual std::string ToString() const OVERRIDE {
-    if (!init_once_.initted()) return reader_->ToString();
+    if (!init_once_.init_succeeded()) return reader_->ToString();
     return strings::Substitute("$0 ($1)", reader_->ToString(), delta_stats_->ToString());
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/d7622024/src/kudu/tserver/tablet_copy_source_session.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_source_session.cc b/src/kudu/tserver/tablet_copy_source_session.cc
index 8c491f9..229fe3b 100644
--- a/src/kudu/tserver/tablet_copy_source_session.cc
+++ b/src/kudu/tserver/tablet_copy_source_session.cc
@@ -299,7 +299,7 @@ Status TabletCopySourceSession::GetBlockPiece(const BlockId& block_id,
                                              uint64_t offset, int64_t client_maxlen,
                                              string* data, int64_t* block_file_size,
                                              TabletCopyErrorPB::Code* error_code) {
-  DCHECK(init_once_.initted());
+  DCHECK(init_once_.init_succeeded());
   ImmutableReadableBlockInfo* block_info;
   RETURN_NOT_OK(FindBlock(block_id, &block_info, error_code));
 
@@ -319,7 +319,7 @@ Status TabletCopySourceSession::GetLogSegmentPiece(uint64_t segment_seqno,
                                                    uint64_t offset, int64_t client_maxlen,
                                                    std::string* data, int64_t* log_file_size,
                                                    TabletCopyErrorPB::Code* error_code) {
-  DCHECK(init_once_.initted());
+  DCHECK(init_once_.init_succeeded());
   ImmutableRandomAccessFileInfo* file_info;
   RETURN_NOT_OK(FindLogSegment(segment_seqno, &file_info, error_code));
   RETURN_NOT_OK(ReadFileChunkToBuf(file_info, offset, client_maxlen,
@@ -332,7 +332,7 @@ Status TabletCopySourceSession::GetLogSegmentPiece(uint64_t segment_seqno,
 }
 
 bool TabletCopySourceSession::IsBlockOpenForTests(const BlockId& block_id) const {
-  DCHECK(init_once_.initted());
+  DCHECK(init_once_.init_succeeded());
   return ContainsKey(blocks_, block_id);
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/d7622024/src/kudu/tserver/tablet_copy_source_session.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_source_session.h b/src/kudu/tserver/tablet_copy_source_session.h
index 324a8d0..4f6c350 100644
--- a/src/kudu/tserver/tablet_copy_source_session.h
+++ b/src/kudu/tserver/tablet_copy_source_session.h
@@ -111,7 +111,7 @@ class TabletCopySourceSession : public RefCountedThreadSafe<TabletCopySourceSess
 
   // Returns true if this session has been initialized.
   bool IsInitialized() const {
-    return init_once_.initted();
+    return init_once_.init_succeeded();
   }
 
   // Return ID of tablet corresponding to this session.
@@ -142,17 +142,17 @@ class TabletCopySourceSession : public RefCountedThreadSafe<TabletCopySourceSess
                             TabletCopyErrorPB::Code* error_code);
 
   const tablet::TabletSuperBlockPB& tablet_superblock() const {
-    DCHECK(init_once_.initted());
+    DCHECK(init_once_.init_succeeded());
     return tablet_superblock_;
   }
 
   const consensus::ConsensusStatePB& initial_cstate() const {
-    DCHECK(init_once_.initted());
+    DCHECK(init_once_.init_succeeded());
     return initial_cstate_;
   }
 
   const log::SegmentSequence& log_segments() const {
-    DCHECK(init_once_.initted());
+    DCHECK(init_once_.init_succeeded());
     return log_segments_;
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/d7622024/src/kudu/tserver/tablet_server-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc
index 8016d47..bb3b13f 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -49,6 +49,8 @@
 #include "kudu/consensus/log-test-base.h"
 #include "kudu/consensus/log.h"
 #include "kudu/consensus/metadata.pb.h"
+#include "kudu/fs/block_id.h"
+#include "kudu/fs/fs-test-util.h"
 #include "kudu/fs/fs.pb.h"
 #include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/callback.h"
@@ -101,11 +103,13 @@
 using google::protobuf::util::MessageDifferencer;
 using kudu::clock::Clock;
 using kudu::clock::HybridClock;
+using kudu::fs::CreateCorruptBlock;
 using kudu::pb_util::SecureDebugString;
 using kudu::pb_util::SecureShortDebugString;
 using kudu::rpc::Messenger;
 using kudu::rpc::MessengerBuilder;
 using kudu::rpc::RpcController;
+using kudu::tablet::RowSetDataPB;
 using kudu::tablet::Tablet;
 using kudu::tablet::TabletReplica;
 using kudu::tablet::TabletSuperBlockPB;
@@ -1243,6 +1247,61 @@ TEST_F(TabletServerTest, TestExpiredScanner) {
   ASSERT_EQ(TabletServerErrorPB::SCANNER_EXPIRED, resp.error().code());
 }
 
+TEST_F(TabletServerTest, TestScanCorruptedDeltas) {
+  // Ensure some rows get to disk with deltas.
+  InsertTestRowsDirect(0, 100);
+  ASSERT_OK(tablet_replica_->tablet()->Flush());
+  UpdateTestRowRemote(0, 1, 100);
+  ASSERT_OK(tablet_replica_->tablet()->Flush());
+
+  // Fudge with some delta blocks.
+  TabletSuperBlockPB superblock_pb;
+  tablet_replica_->tablet()->metadata()->ToSuperBlock(&superblock_pb);
+  FsManager* fs_manager = mini_server_->server()->fs_manager();
+  for (int rowset_no = 0; rowset_no < superblock_pb.rowsets_size(); rowset_no++) {
+    RowSetDataPB* rowset_pb = superblock_pb.mutable_rowsets(rowset_no);
+    for (int id = 0; id < rowset_pb->undo_deltas_size(); id++) {
+      BlockId block_id(rowset_pb->undo_deltas(id).block().id());
+      BlockId new_block_id;
+      // Make a copy of each block and rewrite the superblock to include these
+      // newly corrupted blocks.
+      ASSERT_OK(CreateCorruptBlock(fs_manager, block_id, 0, 0, &new_block_id));
+      rowset_pb->mutable_undo_deltas(id)->mutable_block()->set_id(new_block_id.id());
+    }
+  }
+  // Grab the deltafiles and corrupt them.
+  const string& meta_path = fs_manager->GetTabletMetadataPath(tablet_replica_->tablet_id());
+  ShutdownTablet();
+
+  // Flush the corruption and rebuild the server with the corrupt data.
+  ASSERT_OK(pb_util::WritePBContainerToPath(env_,
+      meta_path, superblock_pb, pb_util::OVERWRITE, pb_util::SYNC));
+  ASSERT_OK(ShutdownAndRebuildTablet());
+  LOG(INFO) << Substitute("Rebuilt tablet $0 with broken blocks", tablet_replica_->tablet_id());
+
+  // Now open a scanner for the server.
+  ScanRequestPB req;
+  ScanResponsePB resp;
+  RpcController rpc;
+  NewScanRequestPB* scan = req.mutable_new_scan_request();
+  scan->set_tablet_id(kTabletId);
+  ASSERT_OK(SchemaToColumnPBs(schema_, scan->mutable_projected_columns()));
+
+  // Send the call. This first call should attempt to init the corrupted
+  // deltafiles and return with an error. The second call should see that the
+  // previous call to init failed and should return the failed status.
+  req.set_batch_size_bytes(10000);
+  for (int i = 0; i < 2; i++) {
+    rpc.Reset();
+    SCOPED_TRACE(SecureDebugString(req));
+    ASSERT_OK(proxy_->Scan(req, &resp, &rpc));
+    SCOPED_TRACE(SecureDebugString(resp));
+    ASSERT_TRUE(resp.has_error());
+    ASSERT_EQ(resp.error().status().code(), AppStatusPB::CORRUPTION);
+    ASSERT_STR_CONTAINS(resp.error().status().message(), "failed to init CFileReader");
+  }
+}
+
 TEST_F(TabletServerTest, TestScannerOpenWhenServerShutsDown) {
   InsertTestRowsDirect(0, 1);
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/d7622024/src/kudu/util/once-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/once-test.cc b/src/kudu/util/once-test.cc
index e5eecc7..c0a79b1 100644
--- a/src/kudu/util/once-test.cc
+++ b/src/kudu/util/once-test.cc
@@ -64,12 +64,12 @@ TEST(TestOnce, KuduOnceDynamicTest) {
   {
     Thing t(false);
     ASSERT_EQ(0, t.value_);
-    ASSERT_FALSE(t.once_.initted());
+    ASSERT_FALSE(t.once_.init_succeeded());
 
     for (int i = 0; i < 2; i++) {
       ASSERT_OK(t.Init());
       ASSERT_EQ(1, t.value_);
-      ASSERT_TRUE(t.once_.initted());
+      ASSERT_TRUE(t.once_.init_succeeded());
     }
   }
 
@@ -78,7 +78,7 @@ TEST(TestOnce, KuduOnceDynamicTest) {
     for (int i = 0; i < 2; i++) {
       ASSERT_TRUE(t.Init().IsIllegalState());
       ASSERT_EQ(0, t.value_);
-      ASSERT_TRUE(t.once_.initted());
+      ASSERT_FALSE(t.once_.init_succeeded());
     }
   }
 }
@@ -88,7 +88,7 @@ static void InitOrGetInitted(Thing* t, int i) {
     LOG(INFO) << "Thread " << i << " initting";
     t->Init();
   } else {
-    LOG(INFO) << "Thread " << i << " value: " << t->once_.initted();
+    LOG(INFO) << "Thread " << i << " value: " << t->once_.init_succeeded();
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/d7622024/src/kudu/util/once.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/once.h b/src/kudu/util/once.h
index da26107..522d800 100644
--- a/src/kudu/util/once.h
+++ b/src/kudu/util/once.h
@@ -42,19 +42,24 @@ template<typename T>
 void InitCb(void* arg) {
   MemberFunc<T>* mf = reinterpret_cast<MemberFunc<T>*>(arg);
   mf->once->status_ = (mf->instance->*mf->member_func)();
-  mf->once->set_initted();
+  if (PREDICT_TRUE(mf->once->status_.ok())) {
+    mf->once->set_init_succeeded();
+  }
 }
 
 } // namespace internal
 
 // More versatile version of GoogleOnceDynamic, including the following:
-// 1. Can be used with single-arg, non-static member functions.
-// 2. Retains results and overall initialization state for repeated access.
-// 3. Access to initialization state is safe for concurrent use.
+// - Non-static member functions are registered and run via Init().
+// - The first time Init() is called, the registered function is run and the
+//   resulting status is stored.
+// - Regardless of whether Init() succeeded, the function will cease to run on
+//   subsequent calls to Init(), and the stored result will be returned instead.
+// - Access to initialization state is safe for concurrent use.
 class KuduOnceDynamic {
  public:
   KuduOnceDynamic()
-    : initted_(false) {
+    : init_succeeded_(false) {
   }
 
   // If the underlying GoogleOnceDynamic has yet to be invoked, invokes the
@@ -78,13 +83,13 @@ class KuduOnceDynamic {
     return status_;
   }
 
-  // kMemOrderAcquire ensures that loads/stores that come after initted()
+  // kMemOrderAcquire ensures that loads/stores that come after init_succeeded()
   // aren't reordered to come before it instead. kMemOrderRelease ensures
-  // the opposite (i.e. loads/stores before set_initted() aren't reordered
+  // the opposite (i.e. loads/stores before set_init_succeeded() aren't reordered
   // to come after it).
   //
-  // Taken together, threads can safely synchronize on initted_.
-  bool initted() const { return initted_.Load(kMemOrderAcquire); }
+  // Taken together, threads can safely synchronize on init_succeeded_.
+  bool init_succeeded() const { return init_succeeded_.Load(kMemOrderAcquire); }
 
   // Returns the memory usage of this object without the object itself. Should
   // be used when embedded inside another object.
@@ -98,9 +103,9 @@ class KuduOnceDynamic {
   template<typename T>
   friend void internal::InitCb(void* arg);
 
-  void set_initted() { initted_.Store(true, kMemOrderRelease); }
+  void set_init_succeeded() { init_succeeded_.Store(true, kMemOrderRelease); }
 
-  AtomicBool initted_;
+  AtomicBool init_succeeded_;
   GoogleOnceDynamic once_;
   Status status_;
 };