You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2017/03/26 00:36:58 UTC

[1/2] kudu git commit: log: simplify log append code path, reduce contention

Repository: kudu
Updated Branches:
  refs/heads/master 286de5392 -> 86da259a6


log: simplify log append code path, reduce contention

Previously the log was trying to do some tricky sequence like:

- reserve a spot in the log queue and wake up the appender thread
- serialize the log entry
             - appender thread blocks waiting for the entry to be
               "ready"
- mark ready
             - appender thread continues and writes the data

I can't recall the original logic behind this, but looking at lock
contention profiles on a write-heavy workload shows that the appender
threads spinning waiting for entries to be ready wastes CPU.

The new design is much more straightforward: the writer serializes and
appends an entry which is already "ready" to write, so once the appender
thread wakes up, it's ready to go without any spinning.

In addition to being easier to follow, the new code performs slightly better.

with patch:

 Performance counter stats for './build/latest/bin/mt-log-test -num_batches_per_thread=100000
     -log_segment_size_mb=64 -num_reader_threads=0 -noverify_log -num_writer_threads=16' (5 runs):

      59890.395565 task-clock                #    3.753 CPUs utilized            ( +-  0.51% )
         1,619,439 context-switches          #    0.027 M/sec                    ( +-  0.28% )
             4,565 cpu-migrations            #    0.076 K/sec                    ( +- 14.44% )
           136,512 page-faults               #    0.002 M/sec                    ( +-  0.16% )
   169,286,052,105 cycles                    #    2.827 GHz                      ( +-  0.57% )
   <not supported> stalled-cycles-frontend
   <not supported> stalled-cycles-backend
   116,801,292,905 instructions              #    0.69  insns per cycle          ( +-  0.04% )
    22,368,661,704 branches                  #  373.493 M/sec                    ( +-  0.07% )
        90,076,839 branch-misses             #    0.40% of all branches          ( +-  0.26% )

      15.958703468 seconds time elapsed                                          ( +-  1.49% )

without patch:

 Performance counter stats for './build/latest/bin/mt-log-test -num_batches_per_thread=100000
     -log_segment_size_mb=64 -num_reader_threads=0 -noverify_log -num_writer_threads=16' (5 runs):

      60787.723645 task-clock                #    3.708 CPUs utilized            ( +-  0.34% )
         1,628,039 context-switches          #    0.027 M/sec                    ( +-  0.32% )
             4,259 cpu-migrations            #    0.070 K/sec                    ( +- 13.18% )
           136,827 page-faults               #    0.002 M/sec                    ( +-  0.09% )
   171,893,298,705 cycles                    #    2.828 GHz                      ( +-  0.33% )
   <not supported> stalled-cycles-frontend
   <not supported> stalled-cycles-backend
   117,302,850,747 instructions              #    0.68  insns per cycle          ( +-  0.05% )
    22,492,934,759 branches                  #  370.024 M/sec                    ( +-  0.08% )
        93,862,359 branch-misses             #    0.42% of all branches          ( +-  0.31% )

      16.392552672 seconds time elapsed                                          ( +-  0.93% )

Change-Id: I2a9154efbab2964a63745a70b47162e3f4200660
Reviewed-on: http://gerrit.cloudera.org:8080/6284
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/4d2ea24b
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/4d2ea24b
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/4d2ea24b

Branch: refs/heads/master
Commit: 4d2ea24b45aa35a1a6f17e17ccec19c106a4478e
Parents: 286de53
Author: Todd Lipcon <to...@apache.org>
Authored: Mon Mar 6 21:14:29 2017 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Fri Mar 24 23:48:17 2017 +0000

----------------------------------------------------------------------
 src/kudu/consensus/log.cc      | 131 +++++++++++-------------------------
 src/kudu/consensus/log.h       |  93 ++++++-------------------
 src/kudu/consensus/log_util.cc |   8 +--
 src/kudu/consensus/log_util.h  |   8 +--
 4 files changed, 64 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/4d2ea24b/src/kudu/consensus/log.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index d51e326..a145767 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -216,18 +216,17 @@ void Log::AppendThread::RunThread() {
 
     bool is_all_commits = true;
     for (LogEntryBatch* entry_batch : entry_batches) {
-      entry_batch->WaitForReady();
       TRACE_EVENT_FLOW_END0("log", "Batch", entry_batch);
       Status s = log_->DoAppend(entry_batch);
       if (PREDICT_FALSE(!s.ok())) {
         LOG_WITH_PREFIX(ERROR) << "Error appending to the log: " << s.ToString();
-        entry_batch->set_failed_to_append();
         // TODO(af): If a single transaction fails to append, should we
         // abort all subsequent transactions in this batch or allow
         // them to be appended? What about transactions in future
         // batches?
         if (!entry_batch->callback().is_null()) {
           entry_batch->callback().Run(s);
+          entry_batch->callback_.Reset();
         }
       }
       if (is_all_commits && entry_batch->type_ != COMMIT) {
@@ -251,8 +250,7 @@ void Log::AppendThread::RunThread() {
       VLOG_WITH_PREFIX(2) << "Synchronized " << entry_batches.size() << " entry batches";
       SCOPED_WATCH_STACK(100);
       for (LogEntryBatch* entry_batch : entry_batches) {
-        if (PREDICT_TRUE(!entry_batch->failed_to_append()
-                         && !entry_batch->callback().is_null())) {
+        if (PREDICT_TRUE(!entry_batch->callback().is_null())) {
           entry_batch->callback().Run(Status::OK());
         }
         // It's important to delete each batch as we see it, because
@@ -433,85 +431,54 @@ Status Log::RollOver() {
   return Status::OK();
 }
 
-Status Log::Reserve(LogEntryTypePB type,
-                    gscoped_ptr<LogEntryBatchPB> entry_batch,
-                    LogEntryBatch** reserved_entry) {
-  TRACE_EVENT0("log", "Log::Reserve");
-  DCHECK(reserved_entry != nullptr);
-  {
-    shared_lock<rw_spinlock> l(state_lock_.get_lock());
-    CHECK_EQ(kLogWriting, log_state_);
-  }
-
-  // In DEBUG builds, verify that all of the entries in the batch match the specified type.
-  // In non-debug builds the foreach loop gets optimized out.
-  #ifndef NDEBUG
-  for (const LogEntryPB& entry : entry_batch->entry()) {
-    DCHECK_EQ(entry.type(), type) << "Bad batch: " << SecureDebugString(*entry_batch);
-  }
-  #endif
-
-  int num_ops = entry_batch->entry_size();
-  gscoped_ptr<LogEntryBatch> new_entry_batch(new LogEntryBatch(
-      type, std::move(entry_batch), num_ops));
-  new_entry_batch->MarkReserved();
+Status Log::CreateBatchFromPB(LogEntryTypePB type,
+                              unique_ptr<LogEntryBatchPB> entry_batch_pb,
+                              unique_ptr<LogEntryBatch>* entry_batch) {
+  int num_ops = entry_batch_pb->entry_size();
+  unique_ptr<LogEntryBatch> new_entry_batch(new LogEntryBatch(
+      type, std::move(entry_batch_pb), num_ops));
+  new_entry_batch->Serialize();
+  TRACE("Serialized $0 byte log entry", new_entry_batch->total_size_bytes());
 
-  if (PREDICT_FALSE(!entry_batch_queue_.BlockingPut(new_entry_batch.get()))) {
-    return kLogShutdownStatus;
-  }
-
-  // Release the memory back to the caller: this will be freed when
-  // the entry is removed from the queue.
-  //
-  // TODO (perf) Use a ring buffer instead of a blocking queue and set
-  // 'reserved_entry' to a pre-allocated slot in the buffer.
-  *reserved_entry = new_entry_batch.release();
+  *entry_batch = std::move(new_entry_batch);
   return Status::OK();
 }
 
-void Log::AsyncAppend(LogEntryBatch* entry_batch, const StatusCallback& callback) {
+Status Log::AsyncAppend(unique_ptr<LogEntryBatch> entry_batch, const StatusCallback& callback) {
   TRACE_EVENT0("log", "Log::AsyncAppend");
-  {
-    shared_lock<rw_spinlock> l(state_lock_.get_lock());
-    CHECK_EQ(kLogWriting, log_state_);
-  }
 
-  entry_batch->Serialize();
   entry_batch->set_callback(callback);
-  TRACE("Serialized $0 byte log entry", entry_batch->total_size_bytes());
-  TRACE_EVENT_FLOW_BEGIN0("log", "Batch", entry_batch);
-  entry_batch->MarkReady();
+  TRACE_EVENT_FLOW_BEGIN0("log", "Batch", entry_batch.get());
+  if (PREDICT_FALSE(!entry_batch_queue_.BlockingPut(entry_batch.get()))) {
+    TRACE_EVENT_FLOW_END0("log", "Batch", entry_batch.get());
+    return kLogShutdownStatus;
+  }
+  entry_batch.release();
+  return Status::OK();
 }
 
 Status Log::AsyncAppendReplicates(const vector<ReplicateRefPtr>& replicates,
                                   const StatusCallback& callback) {
-  gscoped_ptr<LogEntryBatchPB> batch;
-  CreateBatchFromAllocatedOperations(replicates, &batch);
-
-  LogEntryBatch* reserved_entry_batch;
-  RETURN_NOT_OK(Reserve(REPLICATE, std::move(batch), &reserved_entry_batch));
-  // If we're able to reserve set the vector of replicate scoped ptrs in
-  // the LogEntryBatch. This will make sure there's a reference for each
-  // replicate while we're appending.
-  reserved_entry_batch->SetReplicates(replicates);
+  unique_ptr<LogEntryBatchPB> batch_pb = CreateBatchFromAllocatedOperations(replicates);
 
-  AsyncAppend(reserved_entry_batch, callback);
-  return Status::OK();
+  unique_ptr<LogEntryBatch> batch;
+  RETURN_NOT_OK(CreateBatchFromPB(REPLICATE, std::move(batch_pb), &batch));
+  batch->SetReplicates(replicates);
+  return AsyncAppend(std::move(batch), callback);
 }
 
 Status Log::AsyncAppendCommit(gscoped_ptr<consensus::CommitMsg> commit_msg,
                               const StatusCallback& callback) {
   MAYBE_FAULT(FLAGS_fault_crash_before_append_commit);
 
-  gscoped_ptr<LogEntryBatchPB> batch(new LogEntryBatchPB);
-  LogEntryPB* entry = batch->add_entry();
+  unique_ptr<LogEntryBatchPB> batch_pb(new LogEntryBatchPB);
+  LogEntryPB* entry = batch_pb->add_entry();
   entry->set_type(COMMIT);
   entry->set_allocated_commit(commit_msg.release());
 
-  LogEntryBatch* reserved_entry_batch;
-  RETURN_NOT_OK(Reserve(COMMIT, std::move(batch), &reserved_entry_batch));
-
-  AsyncAppend(reserved_entry_batch, callback);
+  unique_ptr<LogEntryBatch> entry_batch;
+  RETURN_NOT_OK(CreateBatchFromPB(COMMIT, std::move(batch_pb), &entry_batch));
+  AsyncAppend(std::move(entry_batch), callback);
   return Status::OK();
 }
 
@@ -696,12 +663,10 @@ Status Log::GetSegmentsToGCUnlocked(RetentionIndexes retention_indexes,
 }
 
 Status Log::Append(LogEntryPB* entry) {
-  gscoped_ptr<LogEntryBatchPB> entry_batch_pb(new LogEntryBatchPB);
+  unique_ptr<LogEntryBatchPB> entry_batch_pb(new LogEntryBatchPB);
   entry_batch_pb->mutable_entry()->AddAllocated(entry);
   LogEntryBatch entry_batch(entry->type(), std::move(entry_batch_pb), 1);
-  entry_batch.state_ = LogEntryBatch::kEntryReserved;
   entry_batch.Serialize();
-  entry_batch.state_ = LogEntryBatch::kEntryReady;
   Status s = DoAppend(&entry_batch);
   if (s.ok()) {
     s = Sync();
@@ -713,12 +678,12 @@ Status Log::Append(LogEntryPB* entry) {
 Status Log::WaitUntilAllFlushed() {
   // In order to make sure we empty the queue we need to use
   // the async api.
-  gscoped_ptr<LogEntryBatchPB> entry_batch(new LogEntryBatchPB);
+  unique_ptr<LogEntryBatchPB> entry_batch(new LogEntryBatchPB);
   entry_batch->add_entry()->set_type(log::FLUSH_MARKER);
-  LogEntryBatch* reserved_entry_batch;
-  RETURN_NOT_OK(Reserve(FLUSH_MARKER, std::move(entry_batch), &reserved_entry_batch));
+  unique_ptr<LogEntryBatch> reserved_entry_batch;
+  RETURN_NOT_OK(CreateBatchFromPB(FLUSH_MARKER, std::move(entry_batch), &reserved_entry_batch));
   Synchronizer s;
-  AsyncAppend(reserved_entry_batch, s.AsStatusCallback());
+  AsyncAppend(std::move(reserved_entry_batch), s.AsStatusCallback());
   return s.Wait();
 }
 
@@ -1022,14 +987,14 @@ Log::~Log() {
 }
 
 LogEntryBatch::LogEntryBatch(LogEntryTypePB type,
-                             gscoped_ptr<LogEntryBatchPB> entry_batch_pb, size_t count)
+                             unique_ptr<LogEntryBatchPB> entry_batch_pb,
+                             size_t count)
     : type_(type),
       entry_batch_pb_(std::move(entry_batch_pb)),
       total_size_bytes_(
           PREDICT_FALSE(count == 1 && entry_batch_pb_->entry(0).type() == FLUSH_MARKER) ?
           0 : entry_batch_pb_->ByteSize()),
-      count_(count),
-      state_(kEntryInitialized) {
+      count_(count) {
 }
 
 LogEntryBatch::~LogEntryBatch() {
@@ -1042,36 +1007,16 @@ LogEntryBatch::~LogEntryBatch() {
   }
 }
 
-void LogEntryBatch::MarkReserved() {
-  DCHECK_EQ(state_, kEntryInitialized);
-  ready_lock_.Lock();
-  state_ = kEntryReserved;
-}
-
 void LogEntryBatch::Serialize() {
-  DCHECK_EQ(state_, kEntryReserved);
-  buffer_.clear();
+  DCHECK_EQ(buffer_.size(), 0);
   // FLUSH_MARKER LogEntries are markers and are not serialized.
   if (PREDICT_FALSE(count() == 1 && entry_batch_pb_->entry(0).type() == FLUSH_MARKER)) {
-    state_ = kEntrySerialized;
     return;
   }
   buffer_.reserve(total_size_bytes_);
   pb_util::AppendToString(*entry_batch_pb_, &buffer_);
-  state_ = kEntrySerialized;
 }
 
-void LogEntryBatch::MarkReady() {
-  DCHECK_EQ(state_, kEntrySerialized);
-  state_ = kEntryReady;
-  ready_lock_.Unlock();
-}
-
-void LogEntryBatch::WaitForReady() {
-  ready_lock_.Lock();
-  DCHECK_EQ(state_, kEntryReady);
-  ready_lock_.Unlock();
-}
 
 }  // namespace log
 }  // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d2ea24b/src/kudu/consensus/log.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log.h b/src/kudu/consensus/log.h
index 5caa347..9048b38 100644
--- a/src/kudu/consensus/log.h
+++ b/src/kudu/consensus/log.h
@@ -58,24 +58,12 @@ typedef BlockingQueue<LogEntryBatch*, LogEntryBatchLogicalSize> LogEntryBatchQue
 // Kudu as a normal Write Ahead Log and also plays the role of persistent
 // storage for the consensus state machine.
 //
-// Note: This class is not thread safe, the caller is expected to synchronize
-// Log::Reserve() and Log::Append() calls.
-//
 // Log uses group commit to improve write throughput and latency
-// without compromising ordering and durability guarantees.
-//
-// To add operations to the log, the caller must obtain the lock and
-// call Reserve() with the collection of operations to be added. Then,
-// the caller may release the lock and call AsyncAppend(). Reserve()
-// reserves a slot on a queue for the log entry; AsyncAppend()
-// indicates that the entry in the slot is safe to write to disk and
-// adds a callback that will be invoked once the entry is written and
-// synchronized to disk.
-//
-// For sample usage see mt-log-test.cc
+// without compromising ordering and durability guarantees. A single background
+// thread per Log instance is responsible for accumulating pending writes
+// and flushing them to the log.
 //
-// Methods on this class are _not_ thread-safe and must be externally
-// synchronized unless otherwise noted.
+// This class is thread-safe unless otherwise noted.
 //
 // Note: The Log needs to be Close()d before any log-writing class is
 // destroyed, otherwise the Log might hold references to these classes
@@ -99,24 +87,6 @@ class Log : public RefCountedThreadSafe<Log> {
 
   ~Log();
 
-  // Reserves a spot in the log's queue for 'entry_batch'.
-  //
-  // 'reserved_entry' is initialized by this method and any resources
-  // associated with it will be released in AsyncAppend().  In order
-  // to ensure correct ordering of operations across multiple threads,
-  // calls to this method must be externally synchronized.
-  //
-  // WARNING: the caller _must_ call AsyncAppend() or else the log
-  // will "stall" and will never be able to make forward progress.
-  Status Reserve(LogEntryTypePB type,
-                 gscoped_ptr<LogEntryBatchPB> entry_batch,
-                 LogEntryBatch** reserved_entry);
-
-  // Asynchronously appends 'entry_batch' to the log. Once the append
-  // completes and is synced, 'callback' will be invoked.
-  void AsyncAppend(LogEntryBatch* entry_batch,
-                   const StatusCallback& callback);
-
   // Synchronously append a new entry to the log.
   // Log does not take ownership of the passed 'entry'.
   Status Append(LogEntryPB* entry);
@@ -281,6 +251,15 @@ class Log : public RefCountedThreadSafe<Log> {
   // Make segments roll over.
   Status RollOver();
 
+  static Status CreateBatchFromPB(LogEntryTypePB type,
+                                  std::unique_ptr<LogEntryBatchPB> entry_batch_pb,
+                                  std::unique_ptr<LogEntryBatch>* entry_batch);
+
+  // Asynchronously appends 'entry_batch' to the log. Once the append
+  // completes and is synced, 'callback' will be invoked.
+  Status AsyncAppend(std::unique_ptr<LogEntryBatch> entry_batch,
+                     const StatusCallback& callback);
+
   // Writes the footer and closes the current segment.
   Status CloseCurrentSegment();
 
@@ -300,9 +279,7 @@ class Log : public RefCountedThreadSafe<Log> {
   Status PreAllocateNewSegment();
 
   // Writes serialized contents of 'entry' to the log. Called inside
-  // AppenderThread. If 'caller_owns_operation' is true, then the
-  // 'operation' field of the entry will be released after the entry
-  // is appended.
+  // AppenderThread.
   Status DoAppend(LogEntryBatch* entry_batch);
 
   // Update footer_builder_ to reflect the log indexes seen in 'batch'.
@@ -391,8 +368,8 @@ class Log : public RefCountedThreadSafe<Log> {
   // The maximum segment size, in bytes.
   uint64_t max_segment_size_;
 
-  // The queue used to communicate between the thread calling
-  // Reserve() and the Log Appender thread
+  // The queue used to communicate between the threads appending operations
+  // and the thread which actually appends them to the log.
   LogEntryBatchQueue entry_batch_queue_;
 
   // Thread writing to the log
@@ -468,7 +445,8 @@ class LogEntryBatch {
   friend class MultiThreadedLogTest;
 
   LogEntryBatch(LogEntryTypePB type,
-                gscoped_ptr<LogEntryBatchPB> entry_batch_pb, size_t count);
+                std::unique_ptr<LogEntryBatchPB> entry_batch_pb,
+                size_t count);
 
   // Serializes contents of the entry to an internal buffer.
   void Serialize();
@@ -485,27 +463,10 @@ class LogEntryBatch {
     return callback_;
   }
 
-  bool failed_to_append() const {
-    return state_ == kEntryFailedToAppend;
-  }
-
-  void set_failed_to_append() {
-    state_ = kEntryFailedToAppend;
-  }
-
-  // Mark the entry as reserved, but not yet ready to write to the log.
-  void MarkReserved();
-
-  // Mark the entry as ready to write to log.
-  void MarkReady();
-
-  // Wait (currently, by spinning on ready_lock_) until ready.
-  void WaitForReady();
 
   // Returns a Slice representing the serialized contents of the
   // entry.
   Slice data() const {
-    DCHECK_EQ(state_, kEntryReady);
     return Slice(buffer_);
   }
 
@@ -533,7 +494,7 @@ class LogEntryBatch {
   const LogEntryTypePB type_;
 
   // Contents of the log entries that will be written to disk.
-  gscoped_ptr<LogEntryBatchPB> entry_batch_pb_;
+  std::unique_ptr<LogEntryBatchPB> entry_batch_pb_;
 
    // Total size in bytes of all entries
   const uint32_t total_size_bytes_;
@@ -551,26 +512,10 @@ class LogEntryBatch {
   // synced to disk.
   StatusCallback callback_;
 
-  // Used to coordinate the synchronizer thread and the caller
-  // thread: this lock starts out locked, and is unlocked by the
-  // caller thread (i.e., inside AppendThread()) once the entry is
-  // fully initialized (once the callback is set and data is
-  // serialized)
-  base::SpinLock ready_lock_;
-
   // Buffer to which 'phys_entries_' are serialized by call to
   // 'Serialize()'
   faststring buffer_;
 
-  enum LogEntryState {
-    kEntryInitialized,
-    kEntryReserved,
-    kEntrySerialized,
-    kEntryReady,
-    kEntryFailedToAppend
-  };
-  LogEntryState state_;
-
   DISALLOW_COPY_AND_ASSIGN(LogEntryBatch);
 };
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d2ea24b/src/kudu/consensus/log_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log_util.cc b/src/kudu/consensus/log_util.cc
index e3f4413..c31c58a 100644
--- a/src/kudu/consensus/log_util.cc
+++ b/src/kudu/consensus/log_util.cc
@@ -842,16 +842,16 @@ Status WritableLogSegment::WriteEntryBatch(const Slice& data,
 }
 
 
-void CreateBatchFromAllocatedOperations(const vector<consensus::ReplicateRefPtr>& msgs,
-                                        gscoped_ptr<LogEntryBatchPB>* batch) {
-  gscoped_ptr<LogEntryBatchPB> entry_batch(new LogEntryBatchPB);
+unique_ptr<LogEntryBatchPB> CreateBatchFromAllocatedOperations(
+    const vector<consensus::ReplicateRefPtr>& msgs) {
+  unique_ptr<LogEntryBatchPB> entry_batch(new LogEntryBatchPB);
   entry_batch->mutable_entry()->Reserve(msgs.size());
   for (const auto& msg : msgs) {
     LogEntryPB* entry_pb = entry_batch->add_entry();
     entry_pb->set_type(log::REPLICATE);
     entry_pb->set_allocated_replicate(msg->get());
   }
-  batch->reset(entry_batch.release());
+  return entry_batch;
 }
 
 bool IsLogFileName(const string& fname) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/4d2ea24b/src/kudu/consensus/log_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log_util.h b/src/kudu/consensus/log_util.h
index 54c8698..c52e89b 100644
--- a/src/kudu/consensus/log_util.h
+++ b/src/kudu/consensus/log_util.h
@@ -473,12 +473,10 @@ class WritableLogSegment {
   DISALLOW_COPY_AND_ASSIGN(WritableLogSegment);
 };
 
-// Sets 'batch' to a newly created batch that contains the pre-allocated
+// Return a newly created batch that contains the pre-allocated
 // ReplicateMsgs in 'msgs'.
-// We use C-style passing here to avoid having to allocate a vector
-// in some hot paths.
-void CreateBatchFromAllocatedOperations(const std::vector<consensus::ReplicateRefPtr>& msgs,
-                                        gscoped_ptr<LogEntryBatchPB>* batch);
+std::unique_ptr<LogEntryBatchPB> CreateBatchFromAllocatedOperations(
+    const std::vector<consensus::ReplicateRefPtr>& msgs);
 
 // Checks if 'fname' is a correctly formatted name of log segment file.
 bool IsLogFileName(const std::string& fname);


[2/2] kudu git commit: fs: move LBM tests to new test file

Posted by mp...@apache.org.
fs: move LBM tests to new test file

They don't share much with block_manager-test, and I plan on adding more, so
this separation makes sense.

The only substantive change was to the ReopenBlockManager() signature in the
new test file.

Change-Id: I09c3970575134c630994d7dd275b436c5b2fa2bc
Reviewed-on: http://gerrit.cloudera.org:8080/6485
Tested-by: Adar Dembo <ad...@cloudera.com>
Reviewed-by: Mike Percy <mp...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/86da259a
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/86da259a
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/86da259a

Branch: refs/heads/master
Commit: 86da259a6867674749209d7e07f89b4b9820cb42
Parents: 4d2ea24
Author: Adar Dembo <ad...@cloudera.com>
Authored: Fri Mar 24 16:59:26 2017 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Sun Mar 26 00:28:03 2017 +0000

----------------------------------------------------------------------
 src/kudu/fs/CMakeLists.txt            |   4 +
 src/kudu/fs/block_manager-test.cc     | 692 +----------------------------
 src/kudu/fs/log_block_manager-test.cc | 692 +++++++++++++++++++++++++++++
 3 files changed, 697 insertions(+), 691 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/86da259a/src/kudu/fs/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/fs/CMakeLists.txt b/src/kudu/fs/CMakeLists.txt
index 3f72bb6..18e9b57 100644
--- a/src/kudu/fs/CMakeLists.txt
+++ b/src/kudu/fs/CMakeLists.txt
@@ -46,3 +46,7 @@ ADD_KUDU_TEST(block_manager-test)
 ADD_KUDU_TEST(block_manager_util-test)
 ADD_KUDU_TEST(block_manager-stress-test RUN_SERIAL true)
 ADD_KUDU_TEST(fs_manager-test)
+if (NOT APPLE)
+  # Will only pass on Linux.
+  ADD_KUDU_TEST(log_block_manager-test)
+endif()

http://git-wip-us.apache.org/repos/asf/kudu/blob/86da259a/src/kudu/fs/block_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/block_manager-test.cc b/src/kudu/fs/block_manager-test.cc
index c086067..9f66962 100644
--- a/src/kudu/fs/block_manager-test.cc
+++ b/src/kudu/fs/block_manager-test.cc
@@ -17,19 +17,17 @@
 
 #include <algorithm>
 #include <memory>
-#include <unordered_map>
-#include <unordered_set>
 #include <string>
 #include <vector>
 
 #include "kudu/fs/file_block_manager.h"
 #include "kudu/fs/fs.pb.h"
 #include "kudu/fs/log_block_manager.h"
+#include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/strings/util.h"
-#include "kudu/util/env_util.h"
 #include "kudu/util/mem_tracker.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/path_util.h"
@@ -39,13 +37,9 @@
 #include "kudu/util/test_util.h"
 #include "kudu/util/thread.h"
 
-using kudu::env_util::ReadFully;
-using kudu::pb_util::ReadablePBContainerFile;
 using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
-using std::unordered_map;
-using std::unordered_set;
 using std::vector;
 using strings::Substitute;
 
@@ -54,7 +48,6 @@ DECLARE_uint64(log_container_max_size);
 
 DECLARE_int64(fs_data_dirs_reserved_bytes);
 DECLARE_int64(disk_reserved_bytes_free_for_testing);
-DECLARE_int64(log_container_max_blocks);
 
 DECLARE_int32(fs_data_dirs_full_disk_cache_seconds);
 
@@ -70,12 +63,6 @@ METRIC_DECLARE_counter(block_manager_total_readable_blocks);
 METRIC_DECLARE_counter(block_manager_total_bytes_written);
 METRIC_DECLARE_counter(block_manager_total_bytes_read);
 
-// Log block manager metrics.
-METRIC_DECLARE_gauge_uint64(log_block_manager_bytes_under_management);
-METRIC_DECLARE_gauge_uint64(log_block_manager_blocks_under_management);
-METRIC_DECLARE_counter(log_block_manager_containers);
-METRIC_DECLARE_counter(log_block_manager_full_containers);
-
 // Data directory metrics.
 METRIC_DECLARE_gauge_uint64(data_dirs_full);
 
@@ -127,38 +114,8 @@ class BlockManagerTest : public KuduTest {
     return bm_->Open();
   }
 
-  void GetOnlyContainerDataFile(string* data_file) {
-    // The expected directory contents are dot, dotdot, test metadata, instance
-    // file, and one container file pair.
-    string container_data_filename;
-    vector<string> children;
-    ASSERT_OK(env_->GetChildren(GetTestDataDirectory(), &children));
-    ASSERT_EQ(6, children.size());
-    for (const string& child : children) {
-      if (HasSuffixString(child, ".data")) {
-        ASSERT_TRUE(container_data_filename.empty());
-        container_data_filename = JoinPathSegments(GetTestDataDirectory(), child);
-        break;
-      }
-    }
-    ASSERT_FALSE(container_data_filename.empty());
-    *data_file = container_data_filename;
-  }
-
-  void AssertNumContainers(int expected_num_containers) {
-    // The expected directory contents are dot, dotdot, test metadata, instance
-    // file, and a file pair per container.
-    vector<string> children;
-    ASSERT_OK(env_->GetChildren(GetTestDataDirectory(), &children));
-    ASSERT_EQ(4 + (2 * expected_num_containers), children.size());
-  }
-
   void RunMultipathTest(const vector<string>& paths);
 
-  void RunLogMetricsTest();
-
-  void RunLogContainerPreallocationTest();
-
   void RunMemTrackerTest();
 
   gscoped_ptr<T> bm_;
@@ -234,143 +191,6 @@ void BlockManagerTest<LogBlockManager>::RunMultipathTest(const vector<string>& p
 }
 
 template <>
-void BlockManagerTest<FileBlockManager>::RunLogMetricsTest() {
-  LOG(INFO) << "Test skipped; wrong block manager";
-}
-
-static void CheckLogMetrics(const scoped_refptr<MetricEntity>& entity,
-                            int bytes_under_management, int blocks_under_management,
-                            int containers, int full_containers) {
-  ASSERT_EQ(bytes_under_management, down_cast<AtomicGauge<uint64_t>*>(
-                entity->FindOrNull(METRIC_log_block_manager_bytes_under_management)
-                .get())->value());
-  ASSERT_EQ(blocks_under_management, down_cast<AtomicGauge<uint64_t>*>(
-                entity->FindOrNull(METRIC_log_block_manager_blocks_under_management)
-                .get())->value());
-  ASSERT_EQ(containers, down_cast<Counter*>(
-                entity->FindOrNull(METRIC_log_block_manager_containers)
-                .get())->value());
-  ASSERT_EQ(full_containers, down_cast<Counter*>(
-                entity->FindOrNull(METRIC_log_block_manager_full_containers)
-                .get())->value());
-}
-
-template <>
-void BlockManagerTest<LogBlockManager>::RunLogMetricsTest() {
-  MetricRegistry registry;
-  scoped_refptr<MetricEntity> entity = METRIC_ENTITY_server.Instantiate(&registry, "test");
-  ASSERT_OK(ReopenBlockManager(entity,
-                               shared_ptr<MemTracker>(),
-                               { test_dir_ },
-                               false));
-  ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(entity, 0, 0, 0, 0));
-
-  // Lower the max container size so that we can more easily test full
-  // container metrics.
-  FLAGS_log_container_max_size = 1024;
-
-  // One block --> one container.
-  unique_ptr<WritableBlock> writer;
-  ASSERT_OK(bm_->CreateBlock(&writer));
-  ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(entity, 0, 0, 1, 0));
-
-  // And when the block is closed, it becomes "under management".
-  ASSERT_OK(writer->Close());
-  ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(entity, 0, 1, 1, 0));
-
-  // Create 10 blocks concurrently. We reuse the existing container and
-  // create 9 new ones. All of them get filled.
-  BlockId saved_id;
-  {
-    Random rand(SeedRandom());
-    ScopedWritableBlockCloser closer;
-    for (int i = 0; i < 10; i++) {
-      unique_ptr<WritableBlock> b;
-      ASSERT_OK(bm_->CreateBlock(&b));
-      if (saved_id.IsNull()) {
-        saved_id = b->id();
-      }
-      uint8_t data[1024];
-      for (int i = 0; i < sizeof(data); i += sizeof(uint32_t)) {
-        data[i] = rand.Next();
-      }
-      b->Append(Slice(data, sizeof(data)));
-      closer.AddBlock(std::move(b));
-    }
-    ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(entity, 0, 1, 10, 0));
-
-    // Only when the blocks are closed are the containers considered full.
-    ASSERT_OK(closer.CloseBlocks());
-    ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(entity, 10 * 1024, 11, 10, 10));
-  }
-
-  // Reopen the block manager and test the metrics. They're all based on
-  // persistent information so they should be the same.
-  MetricRegistry new_registry;
-  scoped_refptr<MetricEntity> new_entity = METRIC_ENTITY_server.Instantiate(&new_registry, "test");
-  ASSERT_OK(ReopenBlockManager(new_entity,
-                               shared_ptr<MemTracker>(),
-                               { test_dir_ },
-                               false));
-  ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(new_entity, 10 * 1024, 11, 10, 10));
-
-  // Delete a block. Its contents should no longer be under management.
-  ASSERT_OK(bm_->DeleteBlock(saved_id));
-  ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(new_entity, 9 * 1024, 10, 10, 10));
-}
-
-template <>
-void BlockManagerTest<FileBlockManager>::RunLogContainerPreallocationTest() {
-  LOG(INFO) << "Test skipped; wrong block manager";
-}
-
-template <>
-void BlockManagerTest<LogBlockManager>::RunLogContainerPreallocationTest() {
-  string kTestData = "test data";
-
-  // For this test to work properly, the preallocation window has to be at
-  // least three times the size of the test data.
-  ASSERT_GE(FLAGS_log_container_preallocate_bytes, kTestData.size() * 3);
-
-  // Create a block with some test data. This should also trigger
-  // preallocation of the container, provided it's supported by the kernel.
-  unique_ptr<WritableBlock> written_block;
-  ASSERT_OK(bm_->CreateBlock(&written_block));
-  ASSERT_OK(written_block->Append(kTestData));
-  ASSERT_OK(written_block->Close());
-
-  // We expect the container size to be equal to the preallocation amount,
-  // which we know is greater than the test data size.
-  string container_data_filename;
-  NO_FATALS(GetOnlyContainerDataFile(&container_data_filename));
-  uint64_t size;
-  ASSERT_OK(env_->GetFileSizeOnDisk(container_data_filename, &size));
-  ASSERT_EQ(FLAGS_log_container_preallocate_bytes, size);
-
-  // Upon writing a second block, we'd expect the container to remain the same
-  // size.
-  ASSERT_OK(bm_->CreateBlock(&written_block));
-  ASSERT_OK(written_block->Append(kTestData));
-  ASSERT_OK(written_block->Close());
-  NO_FATALS(GetOnlyContainerDataFile(&container_data_filename));
-  ASSERT_OK(env_->GetFileSizeOnDisk(container_data_filename, &size));
-  ASSERT_EQ(FLAGS_log_container_preallocate_bytes, size);
-
-  // Now reopen the block manager and create another block. The block manager
-  // should be smart enough to reuse the previously preallocated amount.
-  ASSERT_OK(ReopenBlockManager(scoped_refptr<MetricEntity>(),
-                               shared_ptr<MemTracker>(),
-                               { test_dir_ },
-                               false));
-  ASSERT_OK(bm_->CreateBlock(&written_block));
-  ASSERT_OK(written_block->Append(kTestData));
-  ASSERT_OK(written_block->Close());
-  NO_FATALS(GetOnlyContainerDataFile(&container_data_filename));
-  ASSERT_OK(env_->GetFileSizeOnDisk(container_data_filename, &size));
-  ASSERT_EQ(FLAGS_log_container_preallocate_bytes, size);
-}
-
-template <>
 void BlockManagerTest<FileBlockManager>::RunMemTrackerTest() {
   shared_ptr<MemTracker> tracker = MemTracker::CreateTracker(-1, "test tracker");
   ASSERT_OK(ReopenBlockManager(scoped_refptr<MetricEntity>(),
@@ -753,368 +573,10 @@ TYPED_TEST(BlockManagerTest, MetricsTest) {
   }
 }
 
-TYPED_TEST(BlockManagerTest, LogMetricsTest) {
-  ASSERT_NO_FATAL_FAILURE(this->RunLogMetricsTest());
-}
-
-TYPED_TEST(BlockManagerTest, LogContainerPreallocationTest) {
-  ASSERT_NO_FATAL_FAILURE(this->RunLogContainerPreallocationTest());
-}
-
 TYPED_TEST(BlockManagerTest, MemTrackerTest) {
   ASSERT_NO_FATAL_FAILURE(this->RunMemTrackerTest());
 }
 
-// LogBlockManager-specific tests.
-class LogBlockManagerTest : public BlockManagerTest<LogBlockManager> {
-};
-
-// Regression test for KUDU-1190, a crash at startup when a block ID has been
-// reused.
-TEST_F(LogBlockManagerTest, TestReuseBlockIds) {
-  RETURN_NOT_LOG_BLOCK_MANAGER();
-
-  // Typically, the LBM starts with a random block ID when running as a
-  // gtest. In this test, we want to control the block IDs.
-  bm_->next_block_id_.Store(1);
-
-  vector<BlockId> block_ids;
-
-  // Create 4 containers, with the first four block IDs in the sequence.
-  {
-    ScopedWritableBlockCloser closer;
-    for (int i = 0; i < 4; i++) {
-      unique_ptr<WritableBlock> writer;
-      ASSERT_OK(bm_->CreateBlock(&writer));
-      block_ids.push_back(writer->id());
-      closer.AddBlock(std::move(writer));
-    }
-    ASSERT_OK(closer.CloseBlocks());
-  }
-
-  // Create one more block, which should reuse the first container.
-  {
-    unique_ptr<WritableBlock> writer;
-    ASSERT_OK(bm_->CreateBlock(&writer));
-    ASSERT_OK(writer->Close());
-  }
-
-  ASSERT_EQ(4, bm_->all_containers_.size());
-
-  // Delete the original blocks.
-  for (const BlockId& b : block_ids) {
-    ASSERT_OK(bm_->DeleteBlock(b));
-  }
-
-  // Reset the block ID sequence and re-create new blocks which should reuse the same
-  // block IDs. This isn't allowed in current versions of Kudu, but older versions
-  // could produce this situation, and we still need to handle it on startup.
-  bm_->next_block_id_.Store(1);
-  for (int i = 0; i < 4; i++) {
-    unique_ptr<WritableBlock> writer;
-    ASSERT_OK(bm_->CreateBlock(&writer));
-    ASSERT_EQ(writer->id(), block_ids[i]);
-    ASSERT_OK(writer->Close());
-  }
-
-  // Now we have 4 containers with the following metadata:
-  //   1: CREATE(1) CREATE (5) DELETE(1) CREATE(4)
-  //   2: CREATE(2) DELETE(2) CREATE(1)
-  //   3: CREATE(3) DELETE(3) CREATE(2)
-  //   4: CREATE(4) DELETE(4) CREATE(3)
-
-  // Re-open the block manager and make sure it can deal with this case where
-  // block IDs have been reused.
-  ASSERT_OK(ReopenBlockManager(scoped_refptr<MetricEntity>(),
-                               shared_ptr<MemTracker>(),
-                               { this->test_dir_ },
-                               false));
-}
-
-// Test partial record at end of metadata file. See KUDU-1377.
-// The idea behind this test is that we should tolerate one partial record at
-// the end of a given container metadata file, since we actively append a
-// record to a container metadata file when a new block is created or deleted.
-// A system crash or disk-full event can result in a partially-written metadata
-// record. Ignoring a trailing, partial (not corrupt) record is safe, so long
-// as we only consider a container valid if there is at most one trailing
-// partial record. If any other metadata record is somehow incomplete or
-// corrupt, we consider that an error and the entire container is considered
-// corrupted.
-//
-// Note that we rely on filesystem integrity to ensure that we do not lose
-// trailing, fsync()ed metadata.
-TEST_F(LogBlockManagerTest, TestMetadataTruncation) {
-  RETURN_NOT_LOG_BLOCK_MANAGER();
-
-  // Create several blocks.
-  vector<BlockId> created_blocks;
-  BlockId last_block_id;
-  for (int i = 0; i < 4; i++) {
-    unique_ptr<WritableBlock> writer;
-    ASSERT_OK(bm_->CreateBlock(&writer));
-    last_block_id = writer->id();
-    created_blocks.push_back(last_block_id);
-    ASSERT_OK(writer->Close());
-  }
-  vector<BlockId> block_ids;
-  ASSERT_OK(bm_->GetAllBlockIds(&block_ids));
-  ASSERT_EQ(4, block_ids.size());
-  unique_ptr<ReadableBlock> block;
-  ASSERT_OK(bm_->OpenBlock(last_block_id, &block));
-  ASSERT_OK(block->Close());
-
-  // Start corrupting the metadata file in different ways.
-
-  string path = LogBlockManager::ContainerPathForTests(bm_->all_containers_.front());
-  string metadata_path = path + LogBlockManager::kContainerMetadataFileSuffix;
-  string data_path = path + LogBlockManager::kContainerDataFileSuffix;
-
-  uint64_t good_meta_size;
-  ASSERT_OK(env_->GetFileSize(metadata_path, &good_meta_size));
-
-  // First, add an extra byte to the end of the metadata file. This makes the
-  // trailing "record" of the metadata file corrupt, but doesn't cause data
-  // loss. The result is that the container will automatically truncate the
-  // metadata file back to its correct size.
-  {
-    RWFileOptions opts;
-    opts.mode = Env::OPEN_EXISTING;
-    unique_ptr<RWFile> file;
-    ASSERT_OK(env_->NewRWFile(opts, metadata_path, &file));
-    ASSERT_OK(file->Truncate(good_meta_size + 1));
-  }
-
-  uint64_t cur_meta_size;
-  ASSERT_OK(env_->GetFileSize(metadata_path, &cur_meta_size));
-  ASSERT_EQ(good_meta_size + 1, cur_meta_size);
-
-  // Reopen the metadata file. We will still see all of our blocks. The size of
-  // the metadata file will be restored back to its previous value.
-  ASSERT_OK(this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
-                                     shared_ptr<MemTracker>(),
-                                     { this->test_dir_ },
-                                     false));
-  ASSERT_OK(bm_->GetAllBlockIds(&block_ids));
-  ASSERT_EQ(4, block_ids.size());
-  ASSERT_OK(bm_->OpenBlock(last_block_id, &block));
-  ASSERT_OK(block->Close());
-
-  // Check that the file was truncated back to its previous size by the system.
-  ASSERT_OK(env_->GetFileSize(metadata_path, &cur_meta_size));
-  ASSERT_EQ(good_meta_size, cur_meta_size);
-
-  // Delete the first block we created. This necessitates writing to the
-  // metadata file of the originally-written container, since we append a
-  // delete record to the metadata.
-  ASSERT_OK(bm_->DeleteBlock(created_blocks[0]));
-  ASSERT_OK(bm_->GetAllBlockIds(&block_ids));
-  ASSERT_EQ(3, block_ids.size());
-
-  ASSERT_OK(env_->GetFileSize(metadata_path, &cur_meta_size));
-  good_meta_size = cur_meta_size;
-
-  // Add a new block, increasing the size of the container metadata file.
-  {
-    unique_ptr<WritableBlock> writer;
-    ASSERT_OK(bm_->CreateBlock(&writer));
-    last_block_id = writer->id();
-    created_blocks.push_back(last_block_id);
-    ASSERT_OK(writer->Close());
-  }
-  ASSERT_OK(bm_->GetAllBlockIds(&block_ids));
-  ASSERT_EQ(4, block_ids.size());
-  ASSERT_OK(env_->GetFileSize(metadata_path, &cur_meta_size));
-  ASSERT_GT(cur_meta_size, good_meta_size);
-  uint64_t prev_good_meta_size = good_meta_size; // Store previous size.
-  good_meta_size = cur_meta_size;
-
-  // Now, truncate the metadata file so that we lose the last valid record.
-  // This will result in the loss of a block record, therefore we will observe
-  // data loss, however it will look like a failed partial write.
-  {
-    RWFileOptions opts;
-    opts.mode = Env::OPEN_EXISTING;
-    unique_ptr<RWFile> file;
-    ASSERT_OK(env_->NewRWFile(opts, metadata_path, &file));
-    ASSERT_OK(file->Truncate(good_meta_size - 1));
-  }
-
-  // Reopen the truncated metadata file. We will not find all of our blocks.
-  ASSERT_OK(this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
-                                     shared_ptr<MemTracker>(),
-                                     { this->test_dir_ },
-                                     false));
-
-  // Because the last record was a partial record on disk, the system should
-  // have assumed that it was an incomplete write and truncated the metadata
-  // file back to the previous valid record. Let's verify that that's the case.
-  good_meta_size = prev_good_meta_size;
-  ASSERT_OK(env_->GetFileSize(metadata_path, &cur_meta_size));
-  ASSERT_EQ(good_meta_size, cur_meta_size);
-
-  ASSERT_OK(bm_->GetAllBlockIds(&block_ids));
-  ASSERT_EQ(3, block_ids.size());
-  Status s = bm_->OpenBlock(last_block_id, &block);
-  ASSERT_TRUE(s.IsNotFound()) << s.ToString();
-  ASSERT_STR_CONTAINS(s.ToString(), "Can't find block");
-
-  // Add a new block, increasing the size of the container metadata file.
-  {
-    unique_ptr<WritableBlock> writer;
-    ASSERT_OK(bm_->CreateBlock(&writer));
-    last_block_id = writer->id();
-    created_blocks.push_back(last_block_id);
-    ASSERT_OK(writer->Close());
-  }
-
-  ASSERT_OK(bm_->GetAllBlockIds(&block_ids));
-  ASSERT_EQ(4, block_ids.size());
-  ASSERT_OK(bm_->OpenBlock(last_block_id, &block));
-  ASSERT_OK(block->Close());
-
-  ASSERT_OK(env_->GetFileSize(metadata_path, &cur_meta_size));
-  ASSERT_GT(cur_meta_size, good_meta_size);
-  good_meta_size = cur_meta_size;
-
-  // Ensure that we only ever created a single container.
-  ASSERT_EQ(1, bm_->all_containers_.size());
-  ASSERT_EQ(1, bm_->available_containers_by_data_dir_.size());
-  ASSERT_EQ(1, bm_->available_containers_by_data_dir_.begin()->second.size());
-
-  // Find location of 2nd record in metadata file and corrupt it.
-  // This is an unrecoverable error because it's in the middle of the file.
-  unique_ptr<RandomAccessFile> meta_file;
-  ASSERT_OK(env_->NewRandomAccessFile(metadata_path, &meta_file));
-  ReadablePBContainerFile pb_reader(std::move(meta_file));
-  ASSERT_OK(pb_reader.Open());
-  BlockRecordPB record;
-  ASSERT_OK(pb_reader.ReadNextPB(&record));
-  uint64_t offset = pb_reader.offset();
-
-  uint64_t latest_meta_size;
-  ASSERT_OK(env_->GetFileSize(metadata_path, &latest_meta_size));
-  ASSERT_OK(env_->NewRandomAccessFile(metadata_path, &meta_file));
-  Slice result;
-  gscoped_ptr<uint8_t[]> scratch(new uint8_t[latest_meta_size]);
-  ASSERT_OK(ReadFully(meta_file.get(), 0, latest_meta_size, &result, scratch.get()));
-  string data = result.ToString();
-  // Flip the high bit of the length field, which is a 4-byte little endian
-  // unsigned integer. This will cause the length field to represent a large
-  // value and also cause the length checksum not to validate.
-  data[offset + 3] ^= 1 << 7;
-  unique_ptr<WritableFile> writable_file;
-  ASSERT_OK(env_->NewWritableFile(metadata_path, &writable_file));
-  ASSERT_OK(writable_file->Append(data));
-  ASSERT_OK(writable_file->Close());
-
-  // Now try to reopen the container.
-  // This should look like a bad checksum, and it's not recoverable.
-  s = this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
-                               shared_ptr<MemTracker>(),
-                               { this->test_dir_ },
-                               false);
-  ASSERT_TRUE(s.IsCorruption());
-  ASSERT_STR_CONTAINS(s.ToString(), "Incorrect checksum");
-
-  // Now truncate both the data and metadata files.
-  // This should be recoverable. See KUDU-668.
-  ASSERT_OK(env_->NewWritableFile(metadata_path, &writable_file));
-  ASSERT_OK(writable_file->Close());
-  ASSERT_OK(env_->NewWritableFile(data_path, &writable_file));
-  ASSERT_OK(writable_file->Close());
-
-  ASSERT_OK(this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
-                                     shared_ptr<MemTracker>(),
-                                     { this->test_dir_ },
-                                     false));
-}
-
-// Regression test for a crash when a container's append offset exceeded its
-// preallocation offset.
-TEST_F(LogBlockManagerTest, TestAppendExceedsPreallocation) {
-  RETURN_NOT_LOG_BLOCK_MANAGER();
-
-  FLAGS_log_container_preallocate_bytes = 1;
-
-  // Create a container, preallocate it by one byte, and append more than one.
-  unique_ptr<WritableBlock> writer;
-  ASSERT_OK(bm_->CreateBlock(&writer));
-  ASSERT_OK(writer->Append("hello world"));
-  ASSERT_OK(writer->Close());
-
-  // On second append, don't crash just because the append offset is ahead of
-  // the preallocation offset!
-  ASSERT_OK(bm_->CreateBlock(&writer));
-  ASSERT_OK(writer->Append("hello world"));
-}
-
-TEST_F(LogBlockManagerTest, TestPreallocationAndTruncation) {
-  RETURN_NOT_LOG_BLOCK_MANAGER();
-
-  // Ensure preallocation window is greater than the container size itself.
-  FLAGS_log_container_max_size = 1024 * 1024;
-  FLAGS_log_container_preallocate_bytes = 32 * 1024 * 1024;
-
-  // Fill up one container.
-  unique_ptr<WritableBlock> writer;
-  ASSERT_OK(bm_->CreateBlock(&writer));
-  unique_ptr<uint8_t[]> data(new uint8_t[FLAGS_log_container_max_size]);
-  memset(data.get(), 0, FLAGS_log_container_max_size);
-  ASSERT_OK(writer->Append({ data.get(), FLAGS_log_container_max_size } ));
-  string fname;
-  NO_FATALS(GetOnlyContainerDataFile(&fname));
-  uint64_t size_after_append;
-  ASSERT_OK(env_->GetFileSizeOnDisk(fname, &size_after_append));
-  ASSERT_EQ(FLAGS_log_container_preallocate_bytes, size_after_append);
-
-  // Close it. The extra preallocated space should be truncated off the file.
-  ASSERT_OK(writer->Close());
-  uint64_t size_after_close;
-  ASSERT_OK(env_->GetFileSizeOnDisk(fname, &size_after_close));
-  ASSERT_EQ(FLAGS_log_container_max_size, size_after_close);
-
-  // Now test the same startup behavior by artificially growing the file
-  // and reopening the block manager.
-  //
-  // Try preallocating in two ways: once with a change to the file size and
-  // once without. The second way serves as a proxy for XFS's speculative
-  // preallocation behavior, described in KUDU-1856.
-  for (RWFile::PreAllocateMode mode : {RWFile::CHANGE_FILE_SIZE,
-                                       RWFile::DONT_CHANGE_FILE_SIZE}) {
-    LOG(INFO) << "Pass " << mode;
-    unique_ptr<RWFile> data_file;
-    RWFileOptions opts;
-    opts.mode = Env::OPEN_EXISTING;
-    ASSERT_OK(env_->NewRWFile(opts, fname, &data_file));
-    ASSERT_OK(data_file->PreAllocate(size_after_close, size_after_close, mode));
-    uint64_t size_after_preallocate;
-    ASSERT_OK(env_->GetFileSizeOnDisk(fname, &size_after_preallocate));
-    ASSERT_EQ(size_after_close * 2, size_after_preallocate);
-
-    if (mode == RWFile::DONT_CHANGE_FILE_SIZE) {
-      // Some older versions of ext4 (such as on el6) do not appear to truncate
-      // unwritten preallocated space that extends beyond the file size. Let's
-      // coax them by writing a single byte into that space.
-      //
-      // Note: this doesn't invalidate the usefulness of this test, as it's
-      // quite possible for us to have written a little bit of data into XFS's
-      // speculative preallocated area.
-      ASSERT_OK(data_file->Write(size_after_close, "a"));
-    }
-
-    // Now reopen the block manager. It should notice that the container grew
-    // and truncate the extra preallocated space off again.
-    ASSERT_OK(ReopenBlockManager(scoped_refptr<MetricEntity>(),
-                                 shared_ptr<MemTracker>(),
-                                 { this->test_dir_ },
-                                 false));
-    uint64_t size_after_reopen;
-    ASSERT_OK(env_->GetFileSizeOnDisk(fname, &size_after_reopen));
-    ASSERT_EQ(FLAGS_log_container_max_size, size_after_reopen);
-  }
-}
-
 TYPED_TEST(BlockManagerTest, TestDiskSpaceCheck) {
   // Reopen the block manager with metrics enabled.
   MetricRegistry registry;
@@ -1302,157 +764,5 @@ TYPED_TEST(BlockManagerTest, TestGetAllBlockIds) {
   ASSERT_EQ(ids, retrieved_ids);
 }
 
-TEST_F(LogBlockManagerTest, TestContainerWithManyHoles) {
-  // This is a regression test of sorts for KUDU-1508, though it doesn't
-  // actually fail if the fix is missing; it just corrupts the filesystem.
-  RETURN_NOT_LOG_BLOCK_MANAGER();
-
-  static unordered_map<int, int> block_size_to_last_interior_node_block_number =
-     {{1024, 168},
-      {2048, 338},
-      {4096, 680}};
-
-  const int kNumBlocks = 16 * 1024;
-
-  uint64_t fs_block_size;
-  ASSERT_OK(env_->GetBlockSize(test_dir_, &fs_block_size));
-  if (!ContainsKey(block_size_to_last_interior_node_block_number,
-                   fs_block_size)) {
-    LOG(INFO) << Substitute("Filesystem block size is $0, skipping test",
-                            fs_block_size);
-    return;
-  }
-  int last_interior_node_block_number = FindOrDie(
-      block_size_to_last_interior_node_block_number, fs_block_size);
-
-  ASSERT_GE(kNumBlocks, last_interior_node_block_number);
-
-  // Create a bunch of blocks. They should all go in one container (unless
-  // the container becomes full).
-  LOG(INFO) << Substitute("Creating $0 blocks", kNumBlocks);
-  vector<BlockId> ids;
-  for (int i = 0; i < kNumBlocks; i++) {
-    unique_ptr<WritableBlock> block;
-    ASSERT_OK(bm_->CreateBlock(&block));
-    ASSERT_OK(block->Append("aaaa"));
-    ASSERT_OK(block->Close());
-    ids.push_back(block->id());
-  }
-
-  // Delete every other block. In effect, this maximizes the number of extents
-  // in the container by forcing the filesystem to alternate every hole with
-  // a live extent.
-  LOG(INFO) << "Deleting every other block";
-  for (int i = 0; i < ids.size(); i += 2) {
-    ASSERT_OK(bm_->DeleteBlock(ids[i]));
-  }
-
-  // Delete all of the blocks belonging to the interior node. If KUDU-1508
-  // applies, this should corrupt the filesystem.
-  LOG(INFO) << Substitute("Deleting remaining blocks up to block number $0",
-                          last_interior_node_block_number);
-  for (int i = 1; i < last_interior_node_block_number; i += 2) {
-    ASSERT_OK(bm_->DeleteBlock(ids[i]));
-  }
-}
-
-TEST_F(LogBlockManagerTest, TestParseKernelRelease) {
-  ASSERT_TRUE(LogBlockManager::IsBuggyEl6Kernel("1.7.0.0.el6.x86_64"));
-
-  // no el6 infix
-  ASSERT_FALSE(LogBlockManager::IsBuggyEl6Kernel("2.6.32"));
-
-  ASSERT_TRUE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-1.0.0.el6.x86_64"));
-  ASSERT_FALSE(LogBlockManager::IsBuggyEl6Kernel("2.6.33-1.0.0.el6.x86_64"));
-
-  // Make sure it's a numeric sort, not a lexicographic one.
-  ASSERT_FALSE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-1000.0.0.el6.x86_64"));
-  ASSERT_FALSE(LogBlockManager::IsBuggyEl6Kernel("2.6.100-1.0.0.el6.x86_64"));
-  ASSERT_FALSE(LogBlockManager::IsBuggyEl6Kernel("2.10.0-1.0.0.el6.x86_64"));
-  ASSERT_FALSE(LogBlockManager::IsBuggyEl6Kernel("10.0.0-1.0.0.el6.x86_64"));
-
-  // Kernels from el6.6, el6.7: buggy
-  ASSERT_TRUE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-504.30.3.el6.x86_64"));
-  ASSERT_TRUE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-573.el6.x86_64"));
-  ASSERT_TRUE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-573.1.1.el6.x86_64"));
-
-  // Kernel from el6.8: buggy
-  ASSERT_TRUE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-642.el6.x86_64"));
-
-  // Kernels from el6.8 update stream before a fix was applied: buggy.
-  ASSERT_TRUE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-642.11.1.el6.x86_64"));
-  ASSERT_TRUE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-642.14.1.el6.x86_64"));
-  ASSERT_TRUE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-642.14.2.el6.x86_64"));
-
-  // Kernels from el6.8 update stream after a fix was applied: not buggy.
-  ASSERT_FALSE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-642.15.1.el6.x86_64"));
-  ASSERT_FALSE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-642.18.1.el6.x86_64"));
-
-  // Kernel from el6.9 development prior to fix: buggy.
-  ASSERT_TRUE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-673.0.0.el6.x86_64"));
-
-  // Kernel from el6.9 development post-fix: not buggy.
-  ASSERT_FALSE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-674.0.0.el6.x86_64"));
-}
-
-TEST_F(LogBlockManagerTest, TestLookupBlockLimit) {
-  int64_t limit_1024 = LogBlockManager::LookupBlockLimit(1024);
-  int64_t limit_2048 = LogBlockManager::LookupBlockLimit(2048);
-  int64_t limit_4096 = LogBlockManager::LookupBlockLimit(4096);
-
-  // Test the floor behavior in LookupBlockLimit().
-  for (int i = 0; i < 16384; i++) {
-    if (i < 2048) {
-      ASSERT_EQ(limit_1024, LogBlockManager::LookupBlockLimit(i));
-    } else if (i < 4096) {
-      ASSERT_EQ(limit_2048, LogBlockManager::LookupBlockLimit(i));
-    } else {
-      ASSERT_EQ(limit_4096, LogBlockManager::LookupBlockLimit(i));
-    }
-  }
-}
-
-TEST_F(LogBlockManagerTest, TestContainerBlockLimiting) {
-  RETURN_NOT_LOG_BLOCK_MANAGER();
-
-  const int kNumBlocks = 1000;
-
-  // Creates 'kNumBlocks' blocks with minimal data.
-  auto create_some_blocks = [&]() -> Status {
-    for (int i = 0; i < kNumBlocks; i++) {
-      unique_ptr<WritableBlock> block;
-      RETURN_NOT_OK(bm_->CreateBlock(&block));
-      RETURN_NOT_OK(block->Append("aaaa"));
-      RETURN_NOT_OK(block->Close());
-    }
-    return Status::OK();
-  };
-
-  // All of these blocks should fit into one container.
-  ASSERT_OK(create_some_blocks());
-  NO_FATALS(AssertNumContainers(1));
-
-  // With a limit imposed, the existing container is immediately full, and we
-  // need a few more to satisfy another 'kNumBlocks' blocks.
-  FLAGS_log_container_max_blocks = 400;
-  ASSERT_OK(this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
-                                     shared_ptr<MemTracker>(),
-                                     { GetTestDataDirectory() },
-                                     false));
-  ASSERT_OK(create_some_blocks());
-  NO_FATALS(AssertNumContainers(4));
-
-  // Now remove the limit and create more blocks. They should go into existing
-  // containers, which are now no longer full.
-  FLAGS_log_container_max_blocks = -1;
-  ASSERT_OK(this->ReopenBlockManager(scoped_refptr<MetricEntity>(),
-                                     shared_ptr<MemTracker>(),
-                                     { GetTestDataDirectory() },
-                                     false));
-
-  ASSERT_OK(create_some_blocks());
-  NO_FATALS(AssertNumContainers(4));
-}
-
 } // namespace fs
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/86da259a/src/kudu/fs/log_block_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/fs/log_block_manager-test.cc b/src/kudu/fs/log_block_manager-test.cc
new file mode 100644
index 0000000..f7852d8
--- /dev/null
+++ b/src/kudu/fs/log_block_manager-test.cc
@@ -0,0 +1,692 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <memory>
+#include <unordered_map>
+#include <string>
+#include <vector>
+
+#include "kudu/fs/fs.pb.h"
+#include "kudu/fs/log_block_manager.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/strings/util.h"
+#include "kudu/util/env_util.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/random.h"
+#include "kudu/util/test_util.h"
+
+using kudu::env_util::ReadFully;
+using kudu::pb_util::ReadablePBContainerFile;
+using std::string;
+using std::unique_ptr;
+using std::unordered_map;
+using std::vector;
+using strings::Substitute;
+
+DECLARE_uint64(log_container_preallocate_bytes);
+DECLARE_uint64(log_container_max_size);
+
+DECLARE_int64(log_container_max_blocks);
+
+// Log block manager metrics.
+METRIC_DECLARE_gauge_uint64(log_block_manager_bytes_under_management);
+METRIC_DECLARE_gauge_uint64(log_block_manager_blocks_under_management);
+METRIC_DECLARE_counter(log_block_manager_containers);
+METRIC_DECLARE_counter(log_block_manager_full_containers);
+
+namespace kudu {
+namespace fs {
+
+class LogBlockManagerTest : public KuduTest {
+ public:
+  LogBlockManagerTest() :
+    bm_(CreateBlockManager(scoped_refptr<MetricEntity>())) {
+  }
+
+  void SetUp() override {
+    CHECK_OK(bm_->Create());
+    CHECK_OK(bm_->Open());
+  }
+
+ protected:
+  LogBlockManager* CreateBlockManager(const scoped_refptr<MetricEntity>& metric_entity) {
+    BlockManagerOptions opts;
+    opts.metric_entity = metric_entity;
+    opts.root_paths = { test_dir_ };
+    return new LogBlockManager(env_, opts);
+  }
+
+  Status ReopenBlockManager(
+      const scoped_refptr<MetricEntity>& metric_entity = scoped_refptr<MetricEntity>()) {
+    bm_.reset(CreateBlockManager(metric_entity));
+    return bm_->Open();
+  }
+
+  void GetOnlyContainerDataFile(string* data_file) {
+    // The expected directory contents are dot, dotdot, test metadata, instance
+    // file, and one container file pair.
+    string container_data_filename;
+    vector<string> children;
+    ASSERT_OK(env_->GetChildren(GetTestDataDirectory(), &children));
+    ASSERT_EQ(6, children.size());
+    for (const string& child : children) {
+      if (HasSuffixString(child, ".data")) {
+        ASSERT_TRUE(container_data_filename.empty());
+        container_data_filename = JoinPathSegments(GetTestDataDirectory(), child);
+        break;
+      }
+    }
+    ASSERT_FALSE(container_data_filename.empty());
+    *data_file = container_data_filename;
+  }
+
+  void AssertNumContainers(int expected_num_containers) {
+    // The expected directory contents are dot, dotdot, test metadata, instance
+    // file, and a file pair per container.
+    vector<string> children;
+    ASSERT_OK(env_->GetChildren(GetTestDataDirectory(), &children));
+    ASSERT_EQ(4 + (2 * expected_num_containers), children.size());
+  }
+
+  unique_ptr<LogBlockManager> bm_;
+};
+
+static void CheckLogMetrics(const scoped_refptr<MetricEntity>& entity,
+                            int bytes_under_management, int blocks_under_management,
+                            int containers, int full_containers) {
+  ASSERT_EQ(bytes_under_management, down_cast<AtomicGauge<uint64_t>*>(
+                entity->FindOrNull(METRIC_log_block_manager_bytes_under_management)
+                .get())->value());
+  ASSERT_EQ(blocks_under_management, down_cast<AtomicGauge<uint64_t>*>(
+                entity->FindOrNull(METRIC_log_block_manager_blocks_under_management)
+                .get())->value());
+  ASSERT_EQ(containers, down_cast<Counter*>(
+                entity->FindOrNull(METRIC_log_block_manager_containers)
+                .get())->value());
+  ASSERT_EQ(full_containers, down_cast<Counter*>(
+                entity->FindOrNull(METRIC_log_block_manager_full_containers)
+                .get())->value());
+}
+
+TEST_F(LogBlockManagerTest, MetricsTest) {
+  MetricRegistry registry;
+  scoped_refptr<MetricEntity> entity = METRIC_ENTITY_server.Instantiate(&registry, "test");
+  ASSERT_OK(ReopenBlockManager(entity));
+  ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(entity, 0, 0, 0, 0));
+
+  // Lower the max container size so that we can more easily test full
+  // container metrics.
+  FLAGS_log_container_max_size = 1024;
+
+  // One block --> one container.
+  unique_ptr<WritableBlock> writer;
+  ASSERT_OK(bm_->CreateBlock(&writer));
+  ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(entity, 0, 0, 1, 0));
+
+  // And when the block is closed, it becomes "under management".
+  ASSERT_OK(writer->Close());
+  ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(entity, 0, 1, 1, 0));
+
+  // Create 10 blocks concurrently. We reuse the existing container and
+  // create 9 new ones. All of them get filled.
+  BlockId saved_id;
+  {
+    Random rand(SeedRandom());
+    ScopedWritableBlockCloser closer;
+    for (int i = 0; i < 10; i++) {
+      unique_ptr<WritableBlock> b;
+      ASSERT_OK(bm_->CreateBlock(&b));
+      if (saved_id.IsNull()) {
+        saved_id = b->id();
+      }
+      uint8_t data[1024];
+      for (int i = 0; i < sizeof(data); i += sizeof(uint32_t)) {
+        data[i] = rand.Next();
+      }
+      b->Append(Slice(data, sizeof(data)));
+      closer.AddBlock(std::move(b));
+    }
+    ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(entity, 0, 1, 10, 0));
+
+    // Only when the blocks are closed are the containers considered full.
+    ASSERT_OK(closer.CloseBlocks());
+    ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(entity, 10 * 1024, 11, 10, 10));
+  }
+
+  // Reopen the block manager and test the metrics. They're all based on
+  // persistent information so they should be the same.
+  MetricRegistry new_registry;
+  scoped_refptr<MetricEntity> new_entity = METRIC_ENTITY_server.Instantiate(&new_registry, "test");
+  ASSERT_OK(ReopenBlockManager(new_entity));
+  ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(new_entity, 10 * 1024, 11, 10, 10));
+
+  // Delete a block. Its contents should no longer be under management.
+  ASSERT_OK(bm_->DeleteBlock(saved_id));
+  ASSERT_NO_FATAL_FAILURE(CheckLogMetrics(new_entity, 9 * 1024, 10, 10, 10));
+}
+
+TEST_F(LogBlockManagerTest, ContainerPreallocationTest) {
+  string kTestData = "test data";
+
+  // For this test to work properly, the preallocation window has to be at
+  // least three times the size of the test data.
+  ASSERT_GE(FLAGS_log_container_preallocate_bytes, kTestData.size() * 3);
+
+  // Create a block with some test data. This should also trigger
+  // preallocation of the container, provided it's supported by the kernel.
+  unique_ptr<WritableBlock> written_block;
+  ASSERT_OK(bm_->CreateBlock(&written_block));
+  ASSERT_OK(written_block->Append(kTestData));
+  ASSERT_OK(written_block->Close());
+
+  // We expect the container size to be equal to the preallocation amount,
+  // which we know is greater than the test data size.
+  string container_data_filename;
+  NO_FATALS(GetOnlyContainerDataFile(&container_data_filename));
+  uint64_t size;
+  ASSERT_OK(env_->GetFileSizeOnDisk(container_data_filename, &size));
+  ASSERT_EQ(FLAGS_log_container_preallocate_bytes, size);
+
+  // Upon writing a second block, we'd expect the container to remain the same
+  // size.
+  ASSERT_OK(bm_->CreateBlock(&written_block));
+  ASSERT_OK(written_block->Append(kTestData));
+  ASSERT_OK(written_block->Close());
+  NO_FATALS(GetOnlyContainerDataFile(&container_data_filename));
+  ASSERT_OK(env_->GetFileSizeOnDisk(container_data_filename, &size));
+  ASSERT_EQ(FLAGS_log_container_preallocate_bytes, size);
+
+  // Now reopen the block manager and create another block. The block manager
+  // should be smart enough to reuse the previously preallocated amount.
+  ASSERT_OK(ReopenBlockManager());
+  ASSERT_OK(bm_->CreateBlock(&written_block));
+  ASSERT_OK(written_block->Append(kTestData));
+  ASSERT_OK(written_block->Close());
+  NO_FATALS(GetOnlyContainerDataFile(&container_data_filename));
+  ASSERT_OK(env_->GetFileSizeOnDisk(container_data_filename, &size));
+  ASSERT_EQ(FLAGS_log_container_preallocate_bytes, size);
+}
+
+// Regression test for KUDU-1190, a crash at startup when a block ID has been
+// reused.
+TEST_F(LogBlockManagerTest, TestReuseBlockIds) {
+  // Typically, the LBM starts with a random block ID when running as a
+  // gtest. In this test, we want to control the block IDs.
+  bm_->next_block_id_.Store(1);
+
+  vector<BlockId> block_ids;
+
+  // Create 4 containers, with the first four block IDs in the sequence.
+  {
+    ScopedWritableBlockCloser closer;
+    for (int i = 0; i < 4; i++) {
+      unique_ptr<WritableBlock> writer;
+      ASSERT_OK(bm_->CreateBlock(&writer));
+      block_ids.push_back(writer->id());
+      closer.AddBlock(std::move(writer));
+    }
+    ASSERT_OK(closer.CloseBlocks());
+  }
+
+  // Create one more block, which should reuse the first container.
+  {
+    unique_ptr<WritableBlock> writer;
+    ASSERT_OK(bm_->CreateBlock(&writer));
+    ASSERT_OK(writer->Close());
+  }
+
+  ASSERT_EQ(4, bm_->all_containers_.size());
+
+  // Delete the original blocks.
+  for (const BlockId& b : block_ids) {
+    ASSERT_OK(bm_->DeleteBlock(b));
+  }
+
+  // Reset the block ID sequence and re-create new blocks which should reuse the same
+  // block IDs. This isn't allowed in current versions of Kudu, but older versions
+  // could produce this situation, and we still need to handle it on startup.
+  bm_->next_block_id_.Store(1);
+  for (int i = 0; i < 4; i++) {
+    unique_ptr<WritableBlock> writer;
+    ASSERT_OK(bm_->CreateBlock(&writer));
+    ASSERT_EQ(writer->id(), block_ids[i]);
+    ASSERT_OK(writer->Close());
+  }
+
+  // Now we have 4 containers with the following metadata:
+  //   1: CREATE(1) CREATE (5) DELETE(1) CREATE(4)
+  //   2: CREATE(2) DELETE(2) CREATE(1)
+  //   3: CREATE(3) DELETE(3) CREATE(2)
+  //   4: CREATE(4) DELETE(4) CREATE(3)
+
+  // Re-open the block manager and make sure it can deal with this case where
+  // block IDs have been reused.
+  ASSERT_OK(ReopenBlockManager());
+}
+
+// Test partial record at end of metadata file. See KUDU-1377.
+// The idea behind this test is that we should tolerate one partial record at
+// the end of a given container metadata file, since we actively append a
+// record to a container metadata file when a new block is created or deleted.
+// A system crash or disk-full event can result in a partially-written metadata
+// record. Ignoring a trailing, partial (not corrupt) record is safe, so long
+// as we only consider a container valid if there is at most one trailing
+// partial record. If any other metadata record is somehow incomplete or
+// corrupt, we consider that an error and the entire container is considered
+// corrupted.
+//
+// Note that we rely on filesystem integrity to ensure that we do not lose
+// trailing, fsync()ed metadata.
+TEST_F(LogBlockManagerTest, TestMetadataTruncation) {
+  // Create several blocks.
+  vector<BlockId> created_blocks;
+  BlockId last_block_id;
+  for (int i = 0; i < 4; i++) {
+    unique_ptr<WritableBlock> writer;
+    ASSERT_OK(bm_->CreateBlock(&writer));
+    last_block_id = writer->id();
+    created_blocks.push_back(last_block_id);
+    ASSERT_OK(writer->Close());
+  }
+  vector<BlockId> block_ids;
+  ASSERT_OK(bm_->GetAllBlockIds(&block_ids));
+  ASSERT_EQ(4, block_ids.size());
+  unique_ptr<ReadableBlock> block;
+  ASSERT_OK(bm_->OpenBlock(last_block_id, &block));
+  ASSERT_OK(block->Close());
+
+  // Start corrupting the metadata file in different ways.
+
+  string path = LogBlockManager::ContainerPathForTests(bm_->all_containers_[0]);
+  string metadata_path = path + LogBlockManager::kContainerMetadataFileSuffix;
+  string data_path = path + LogBlockManager::kContainerDataFileSuffix;
+
+  uint64_t good_meta_size;
+  ASSERT_OK(env_->GetFileSize(metadata_path, &good_meta_size));
+
+  // First, add an extra byte to the end of the metadata file. This makes the
+  // trailing "record" of the metadata file corrupt, but doesn't cause data
+  // loss. The result is that the container will automatically truncate the
+  // metadata file back to its correct size.
+  {
+    RWFileOptions opts;
+    opts.mode = Env::OPEN_EXISTING;
+    unique_ptr<RWFile> file;
+    ASSERT_OK(env_->NewRWFile(opts, metadata_path, &file));
+    ASSERT_OK(file->Truncate(good_meta_size + 1));
+  }
+
+  uint64_t cur_meta_size;
+  ASSERT_OK(env_->GetFileSize(metadata_path, &cur_meta_size));
+  ASSERT_EQ(good_meta_size + 1, cur_meta_size);
+
+  // Reopen the metadata file. We will still see all of our blocks. The size of
+  // the metadata file will be restored back to its previous value.
+  ASSERT_OK(ReopenBlockManager());
+  ASSERT_OK(bm_->GetAllBlockIds(&block_ids));
+  ASSERT_EQ(4, block_ids.size());
+  ASSERT_OK(bm_->OpenBlock(last_block_id, &block));
+  ASSERT_OK(block->Close());
+
+  // Check that the file was truncated back to its previous size by the system.
+  ASSERT_OK(env_->GetFileSize(metadata_path, &cur_meta_size));
+  ASSERT_EQ(good_meta_size, cur_meta_size);
+
+  // Delete the first block we created. This necessitates writing to the
+  // metadata file of the originally-written container, since we append a
+  // delete record to the metadata.
+  ASSERT_OK(bm_->DeleteBlock(created_blocks[0]));
+  ASSERT_OK(bm_->GetAllBlockIds(&block_ids));
+  ASSERT_EQ(3, block_ids.size());
+
+  ASSERT_OK(env_->GetFileSize(metadata_path, &cur_meta_size));
+  good_meta_size = cur_meta_size;
+
+  // Add a new block, increasing the size of the container metadata file.
+  {
+    unique_ptr<WritableBlock> writer;
+    ASSERT_OK(bm_->CreateBlock(&writer));
+    last_block_id = writer->id();
+    created_blocks.push_back(last_block_id);
+    ASSERT_OK(writer->Close());
+  }
+  ASSERT_OK(bm_->GetAllBlockIds(&block_ids));
+  ASSERT_EQ(4, block_ids.size());
+  ASSERT_OK(env_->GetFileSize(metadata_path, &cur_meta_size));
+  ASSERT_GT(cur_meta_size, good_meta_size);
+  uint64_t prev_good_meta_size = good_meta_size; // Store previous size.
+  good_meta_size = cur_meta_size;
+
+  // Now, truncate the metadata file so that we lose the last valid record.
+  // This will result in the loss of a block record, therefore we will observe
+  // data loss, however it will look like a failed partial write.
+  {
+    RWFileOptions opts;
+    opts.mode = Env::OPEN_EXISTING;
+    unique_ptr<RWFile> file;
+    ASSERT_OK(env_->NewRWFile(opts, metadata_path, &file));
+    ASSERT_OK(file->Truncate(good_meta_size - 1));
+  }
+
+  // Reopen the truncated metadata file. We will not find all of our blocks.
+  ASSERT_OK(ReopenBlockManager());
+
+  // Because the last record was a partial record on disk, the system should
+  // have assumed that it was an incomplete write and truncated the metadata
+  // file back to the previous valid record. Let's verify that that's the case.
+  good_meta_size = prev_good_meta_size;
+  ASSERT_OK(env_->GetFileSize(metadata_path, &cur_meta_size));
+  ASSERT_EQ(good_meta_size, cur_meta_size);
+
+  ASSERT_OK(bm_->GetAllBlockIds(&block_ids));
+  ASSERT_EQ(3, block_ids.size());
+  Status s = bm_->OpenBlock(last_block_id, &block);
+  ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "Can't find block");
+
+  // Add a new block, increasing the size of the container metadata file.
+  {
+    unique_ptr<WritableBlock> writer;
+    ASSERT_OK(bm_->CreateBlock(&writer));
+    last_block_id = writer->id();
+    created_blocks.push_back(last_block_id);
+    ASSERT_OK(writer->Close());
+  }
+
+  ASSERT_OK(bm_->GetAllBlockIds(&block_ids));
+  ASSERT_EQ(4, block_ids.size());
+  ASSERT_OK(bm_->OpenBlock(last_block_id, &block));
+  ASSERT_OK(block->Close());
+
+  ASSERT_OK(env_->GetFileSize(metadata_path, &cur_meta_size));
+  ASSERT_GT(cur_meta_size, good_meta_size);
+  good_meta_size = cur_meta_size;
+
+  // Ensure that we only ever created a single container.
+  ASSERT_EQ(1, bm_->all_containers_.size());
+  ASSERT_EQ(1, bm_->available_containers_by_data_dir_.size());
+  ASSERT_EQ(1, bm_->available_containers_by_data_dir_.begin()->second.size());
+
+  // Find location of 2nd record in metadata file and corrupt it.
+  // This is an unrecoverable error because it's in the middle of the file.
+  unique_ptr<RandomAccessFile> meta_file;
+  ASSERT_OK(env_->NewRandomAccessFile(metadata_path, &meta_file));
+  ReadablePBContainerFile pb_reader(std::move(meta_file));
+  ASSERT_OK(pb_reader.Open());
+  BlockRecordPB record;
+  ASSERT_OK(pb_reader.ReadNextPB(&record));
+  uint64_t offset = pb_reader.offset();
+
+  uint64_t latest_meta_size;
+  ASSERT_OK(env_->GetFileSize(metadata_path, &latest_meta_size));
+  ASSERT_OK(env_->NewRandomAccessFile(metadata_path, &meta_file));
+  Slice result;
+  gscoped_ptr<uint8_t[]> scratch(new uint8_t[latest_meta_size]);
+  ASSERT_OK(ReadFully(meta_file.get(), 0, latest_meta_size, &result, scratch.get()));
+  string data = result.ToString();
+  // Flip the high bit of the length field, which is a 4-byte little endian
+  // unsigned integer. This will cause the length field to represent a large
+  // value and also cause the length checksum not to validate.
+  data[offset + 3] ^= 1 << 7;
+  unique_ptr<WritableFile> writable_file;
+  ASSERT_OK(env_->NewWritableFile(metadata_path, &writable_file));
+  ASSERT_OK(writable_file->Append(data));
+  ASSERT_OK(writable_file->Close());
+
+  // Now try to reopen the container.
+  // This should look like a bad checksum, and it's not recoverable.
+  s = ReopenBlockManager();
+  ASSERT_TRUE(s.IsCorruption());
+  ASSERT_STR_CONTAINS(s.ToString(), "Incorrect checksum");
+
+  // Now truncate both the data and metadata files.
+  // This should be recoverable. See KUDU-668.
+  ASSERT_OK(env_->NewWritableFile(metadata_path, &writable_file));
+  ASSERT_OK(writable_file->Close());
+  ASSERT_OK(env_->NewWritableFile(data_path, &writable_file));
+  ASSERT_OK(writable_file->Close());
+
+  ASSERT_OK(ReopenBlockManager());
+}
+
+// Regression test for a crash when a container's append offset exceeded its
+// preallocation offset.
+TEST_F(LogBlockManagerTest, TestAppendExceedsPreallocation) {
+  FLAGS_log_container_preallocate_bytes = 1;
+
+  // Create a container, preallocate it by one byte, and append more than one.
+  unique_ptr<WritableBlock> writer;
+  ASSERT_OK(bm_->CreateBlock(&writer));
+  ASSERT_OK(writer->Append("hello world"));
+  ASSERT_OK(writer->Close());
+
+  // On second append, don't crash just because the append offset is ahead of
+  // the preallocation offset!
+  ASSERT_OK(bm_->CreateBlock(&writer));
+  ASSERT_OK(writer->Append("hello world"));
+}
+
+TEST_F(LogBlockManagerTest, TestPreallocationAndTruncation) {
+  // Ensure preallocation window is greater than the container size itself.
+  FLAGS_log_container_max_size = 1024 * 1024;
+  FLAGS_log_container_preallocate_bytes = 32 * 1024 * 1024;
+
+  // Fill up one container.
+  unique_ptr<WritableBlock> writer;
+  ASSERT_OK(bm_->CreateBlock(&writer));
+  unique_ptr<uint8_t[]> data(new uint8_t[FLAGS_log_container_max_size]);
+  memset(data.get(), 0, FLAGS_log_container_max_size);
+  ASSERT_OK(writer->Append({ data.get(), FLAGS_log_container_max_size } ));
+  string fname;
+  NO_FATALS(GetOnlyContainerDataFile(&fname));
+  uint64_t size_after_append;
+  ASSERT_OK(env_->GetFileSizeOnDisk(fname, &size_after_append));
+  ASSERT_EQ(FLAGS_log_container_preallocate_bytes, size_after_append);
+
+  // Close it. The extra preallocated space should be truncated off the file.
+  ASSERT_OK(writer->Close());
+  uint64_t size_after_close;
+  ASSERT_OK(env_->GetFileSizeOnDisk(fname, &size_after_close));
+  ASSERT_EQ(FLAGS_log_container_max_size, size_after_close);
+
+  // Now test the same startup behavior by artificially growing the file
+  // and reopening the block manager.
+  //
+  // Try preallocating in two ways: once with a change to the file size and
+  // once without. The second way serves as a proxy for XFS's speculative
+  // preallocation behavior, described in KUDU-1856.
+  for (RWFile::PreAllocateMode mode : {RWFile::CHANGE_FILE_SIZE,
+                                       RWFile::DONT_CHANGE_FILE_SIZE}) {
+    LOG(INFO) << "Pass " << mode;
+    unique_ptr<RWFile> data_file;
+    RWFileOptions opts;
+    opts.mode = Env::OPEN_EXISTING;
+    ASSERT_OK(env_->NewRWFile(opts, fname, &data_file));
+    ASSERT_OK(data_file->PreAllocate(size_after_close, size_after_close, mode));
+    uint64_t size_after_preallocate;
+    ASSERT_OK(env_->GetFileSizeOnDisk(fname, &size_after_preallocate));
+    ASSERT_EQ(size_after_close * 2, size_after_preallocate);
+
+    if (mode == RWFile::DONT_CHANGE_FILE_SIZE) {
+      // Some older versions of ext4 (such as on el6) do not appear to truncate
+      // unwritten preallocated space that extends beyond the file size. Let's
+      // coax them by writing a single byte into that space.
+      //
+      // Note: this doesn't invalidate the usefulness of this test, as it's
+      // quite possible for us to have written a little bit of data into XFS's
+      // speculative preallocated area.
+      ASSERT_OK(data_file->Write(size_after_close, "a"));
+    }
+
+    // Now reopen the block manager. It should notice that the container grew
+    // and truncate the extra preallocated space off again.
+    ASSERT_OK(ReopenBlockManager());
+    uint64_t size_after_reopen;
+    ASSERT_OK(env_->GetFileSizeOnDisk(fname, &size_after_reopen));
+    ASSERT_EQ(FLAGS_log_container_max_size, size_after_reopen);
+  }
+}
+
+TEST_F(LogBlockManagerTest, TestContainerWithManyHoles) {
+  // This is a regression test of sorts for KUDU-1508, though it doesn't
+  // actually fail if the fix is missing; it just corrupts the filesystem.
+
+  static unordered_map<int, int> block_size_to_last_interior_node_block_number =
+     {{1024, 168},
+      {2048, 338},
+      {4096, 680}};
+
+  const int kNumBlocks = 16 * 1024;
+
+  uint64_t fs_block_size;
+  ASSERT_OK(env_->GetBlockSize(test_dir_, &fs_block_size));
+  if (!ContainsKey(block_size_to_last_interior_node_block_number,
+                   fs_block_size)) {
+    LOG(INFO) << Substitute("Filesystem block size is $0, skipping test",
+                            fs_block_size);
+    return;
+  }
+  int last_interior_node_block_number = FindOrDie(
+      block_size_to_last_interior_node_block_number, fs_block_size);
+
+  ASSERT_GE(kNumBlocks, last_interior_node_block_number);
+
+  // Create a bunch of blocks. They should all go in one container (unless
+  // the container becomes full).
+  LOG(INFO) << Substitute("Creating $0 blocks", kNumBlocks);
+  vector<BlockId> ids;
+  for (int i = 0; i < kNumBlocks; i++) {
+    unique_ptr<WritableBlock> block;
+    ASSERT_OK(bm_->CreateBlock(&block));
+    ASSERT_OK(block->Append("aaaa"));
+    ASSERT_OK(block->Close());
+    ids.push_back(block->id());
+  }
+
+  // Delete every other block. In effect, this maximizes the number of extents
+  // in the container by forcing the filesystem to alternate every hole with
+  // a live extent.
+  LOG(INFO) << "Deleting every other block";
+  for (int i = 0; i < ids.size(); i += 2) {
+    ASSERT_OK(bm_->DeleteBlock(ids[i]));
+  }
+
+  // Delete all of the blocks belonging to the interior node. If KUDU-1508
+  // applies, this should corrupt the filesystem.
+  LOG(INFO) << Substitute("Deleting remaining blocks up to block number $0",
+                          last_interior_node_block_number);
+  for (int i = 1; i < last_interior_node_block_number; i += 2) {
+    ASSERT_OK(bm_->DeleteBlock(ids[i]));
+  }
+}
+
+TEST_F(LogBlockManagerTest, TestParseKernelRelease) {
+  ASSERT_TRUE(LogBlockManager::IsBuggyEl6Kernel("1.7.0.0.el6.x86_64"));
+
+  // no el6 infix
+  ASSERT_FALSE(LogBlockManager::IsBuggyEl6Kernel("2.6.32"));
+
+  ASSERT_TRUE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-1.0.0.el6.x86_64"));
+  ASSERT_FALSE(LogBlockManager::IsBuggyEl6Kernel("2.6.33-1.0.0.el6.x86_64"));
+
+  // Make sure it's a numeric sort, not a lexicographic one.
+  ASSERT_FALSE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-1000.0.0.el6.x86_64"));
+  ASSERT_FALSE(LogBlockManager::IsBuggyEl6Kernel("2.6.100-1.0.0.el6.x86_64"));
+  ASSERT_FALSE(LogBlockManager::IsBuggyEl6Kernel("2.10.0-1.0.0.el6.x86_64"));
+  ASSERT_FALSE(LogBlockManager::IsBuggyEl6Kernel("10.0.0-1.0.0.el6.x86_64"));
+
+  // Kernels from el6.6, el6.7: buggy
+  ASSERT_TRUE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-504.30.3.el6.x86_64"));
+  ASSERT_TRUE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-573.el6.x86_64"));
+  ASSERT_TRUE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-573.1.1.el6.x86_64"));
+
+  // Kernel from el6.8: buggy
+  ASSERT_TRUE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-642.el6.x86_64"));
+
+  // Kernels from el6.8 update stream before a fix was applied: buggy.
+  ASSERT_TRUE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-642.11.1.el6.x86_64"));
+  ASSERT_TRUE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-642.14.1.el6.x86_64"));
+  ASSERT_TRUE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-642.14.2.el6.x86_64"));
+
+  // Kernels from el6.8 update stream after a fix was applied: not buggy.
+  ASSERT_FALSE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-642.15.1.el6.x86_64"));
+  ASSERT_FALSE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-642.18.1.el6.x86_64"));
+
+  // Kernel from el6.9 development prior to fix: buggy.
+  ASSERT_TRUE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-673.0.0.el6.x86_64"));
+
+  // Kernel from el6.9 development post-fix: not buggy.
+  ASSERT_FALSE(LogBlockManager::IsBuggyEl6Kernel("2.6.32-674.0.0.el6.x86_64"));
+}
+
+TEST_F(LogBlockManagerTest, TestLookupBlockLimit) {
+  int64_t limit_1024 = LogBlockManager::LookupBlockLimit(1024);
+  int64_t limit_2048 = LogBlockManager::LookupBlockLimit(2048);
+  int64_t limit_4096 = LogBlockManager::LookupBlockLimit(4096);
+
+  // Test the floor behavior in LookupBlockLimit().
+  for (int i = 0; i < 16384; i++) {
+    if (i < 2048) {
+      ASSERT_EQ(limit_1024, LogBlockManager::LookupBlockLimit(i));
+    } else if (i < 4096) {
+      ASSERT_EQ(limit_2048, LogBlockManager::LookupBlockLimit(i));
+    } else {
+      ASSERT_EQ(limit_4096, LogBlockManager::LookupBlockLimit(i));
+    }
+  }
+}
+
+TEST_F(LogBlockManagerTest, TestContainerBlockLimiting) {
+  const int kNumBlocks = 1000;
+
+  // Creates 'kNumBlocks' blocks with minimal data.
+  auto create_some_blocks = [&]() -> Status {
+    for (int i = 0; i < kNumBlocks; i++) {
+      unique_ptr<WritableBlock> block;
+      RETURN_NOT_OK(bm_->CreateBlock(&block));
+      RETURN_NOT_OK(block->Append("aaaa"));
+      RETURN_NOT_OK(block->Close());
+    }
+    return Status::OK();
+  };
+
+  // All of these blocks should fit into one container.
+  ASSERT_OK(create_some_blocks());
+  NO_FATALS(AssertNumContainers(1));
+
+  // With a limit imposed, the existing container is immediately full, and we
+  // need a few more to satisfy another 'kNumBlocks' blocks.
+  FLAGS_log_container_max_blocks = 400;
+  ASSERT_OK(ReopenBlockManager());
+  ASSERT_OK(create_some_blocks());
+  NO_FATALS(AssertNumContainers(4));
+
+  // Now remove the limit and create more blocks. They should go into existing
+  // containers, which are now no longer full.
+  FLAGS_log_container_max_blocks = -1;
+  ASSERT_OK(ReopenBlockManager());
+
+  ASSERT_OK(create_some_blocks());
+  NO_FATALS(AssertNumContainers(4));
+}
+
+} // namespace fs
+} // namespace kudu