You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2020/03/28 08:13:38 UTC

[kudu] branch master updated: log: clean up entry batch append callback usage

This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new daced7e  log: clean up entry batch append callback usage
daced7e is described below

commit daced7ec446ea8e7e1a8eeebe21e08ee038f285a
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Fri Mar 27 13:04:31 2020 -0700

    log: clean up entry batch append callback usage
    
    I started with the desire to pass these callbacks by value and move them,
    then ended up doing some additional cleanup.
    
    Change-Id: I34bbe51fbe824745fc2705819546ddf14714da52
    Reviewed-on: http://gerrit.cloudera.org:8080/15571
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Tested-by: Adar Dembo <ad...@cloudera.com>
---
 src/kudu/consensus/log.cc      | 92 +++++++++++++++++++++---------------------
 src/kudu/consensus/log.h       | 52 +++++++++++++-----------
 src/kudu/consensus/log_util.cc | 13 ------
 src/kudu/consensus/log_util.h  |  6 ---
 4 files changed, 74 insertions(+), 89 deletions(-)

diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index 576b44e..2e6a514 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -373,7 +373,7 @@ void Log::AppendThread::HandleBatches(vector<LogEntryBatch*> entry_batches) {
   SCOPED_LATENCY_METRIC(log_->ctx_.metrics, group_commit_latency);
 
   bool is_all_commits = true;
-  for (LogEntryBatch* entry_batch : entry_batches) {
+  for (auto* entry_batch : entry_batches) {
     TRACE_EVENT_FLOW_END0("log", "Batch", entry_batch);
     Status s = log_->WriteBatch(entry_batch);
     if (PREDICT_FALSE(!s.ok())) {
@@ -382,10 +382,7 @@ void Log::AppendThread::HandleBatches(vector<LogEntryBatch*> entry_batches) {
       // abort all subsequent transactions in this batch or allow
       // them to be appended? What about transactions in future
       // batches?
-      if (entry_batch->callback()) {
-        entry_batch->callback()(s);
-        entry_batch->callback_ = nullptr;
-      }
+      entry_batch->SetAppendError(s);
     }
     if (is_all_commits && entry_batch->type_ != COMMIT) {
       is_all_commits = false;
@@ -398,25 +395,21 @@ void Log::AppendThread::HandleBatches(vector<LogEntryBatch*> entry_batches) {
   }
   if (PREDICT_FALSE(!s.ok())) {
     LOG_WITH_PREFIX(ERROR) << "Error syncing log: " << s.ToString();
-    for (LogEntryBatch* entry_batch : entry_batches) {
-      if (entry_batch->callback()) {
-        entry_batch->callback()(s);
-      }
-      delete entry_batch;
+    for (auto* entry_batch : entry_batches) {
+      entry_batch->SetAppendError(s);
     }
   } else {
-    TRACE_EVENT0("log", "Callbacks");
     VLOG_WITH_PREFIX(2) << "Synchronized " << entry_batches.size() << " entry batches";
-    SCOPED_WATCH_STACK(100);
-    for (LogEntryBatch* entry_batch : entry_batches) {
-      if (PREDICT_TRUE(entry_batch->callback())) {
-        entry_batch->callback()(Status::OK());
-      }
-      // It's important to delete each batch as we see it, because
-      // deleting it may free up memory from memory trackers, and the
-      // callback of a later batch may want to use that memory.
-      delete entry_batch;
-    }
+  }
+  TRACE_EVENT0("log", "Callbacks");
+  SCOPED_WATCH_STACK(100);
+  for (auto* entry_batch : entry_batches) {
+    entry_batch->RunCallback();
+
+    // It's important to delete each batch as we see it, because
+    // deleting it may free up memory from memory trackers, and the
+    // callback of a later batch may want to use that memory.
+    delete entry_batch;
   }
 }
 
@@ -836,23 +829,20 @@ Status Log::Init() {
   return Status::OK();
 }
 
-Status Log::CreateBatchFromPB(LogEntryTypePB type,
-                              unique_ptr<LogEntryBatchPB> entry_batch_pb,
-                              unique_ptr<LogEntryBatch>* entry_batch) {
+unique_ptr<LogEntryBatch> Log::CreateBatchFromPB(
+    LogEntryTypePB type,
+    unique_ptr<LogEntryBatchPB> entry_batch_pb,
+    StatusCallback cb) {
   int num_ops = entry_batch_pb->entry_size();
   unique_ptr<LogEntryBatch> new_entry_batch(new LogEntryBatch(
-      type, std::move(entry_batch_pb), num_ops));
+      type, std::move(entry_batch_pb), num_ops, std::move(cb)));
   new_entry_batch->Serialize();
   TRACE("Serialized $0 byte log entry", new_entry_batch->total_size_bytes());
-
-  *entry_batch = std::move(new_entry_batch);
-  return Status::OK();
+  return new_entry_batch;
 }
 
-Status Log::AsyncAppend(unique_ptr<LogEntryBatch> entry_batch, const StatusCallback& callback) {
+Status Log::AsyncAppend(unique_ptr<LogEntryBatch> entry_batch) {
   TRACE_EVENT0("log", "Log::AsyncAppend");
-
-  entry_batch->set_callback(callback);
   TRACE_EVENT_FLOW_BEGIN0("log", "Batch", entry_batch.get());
   if (PREDICT_FALSE(!entry_batch_queue_.BlockingPut(entry_batch.get()).ok())) {
     TRACE_EVENT_FLOW_END0("log", "Batch", entry_batch.get());
@@ -864,17 +854,22 @@ Status Log::AsyncAppend(unique_ptr<LogEntryBatch> entry_batch, const StatusCallb
 }
 
 Status Log::AsyncAppendReplicates(const vector<ReplicateRefPtr>& replicates,
-                                  const StatusCallback& callback) {
-  unique_ptr<LogEntryBatchPB> batch_pb = CreateBatchFromAllocatedOperations(replicates);
-
-  unique_ptr<LogEntryBatch> batch;
-  RETURN_NOT_OK(CreateBatchFromPB(REPLICATE, std::move(batch_pb), &batch));
+                                  StatusCallback callback) {
+  unique_ptr<LogEntryBatchPB> batch_pb(new LogEntryBatchPB);
+  batch_pb->mutable_entry()->Reserve(replicates.size());
+  for (const auto& r : replicates) {
+    LogEntryPB* entry_pb = batch_pb->add_entry();
+    entry_pb->set_type(REPLICATE);
+    entry_pb->set_allocated_replicate(r->get());
+  }
+  unique_ptr<LogEntryBatch> batch = CreateBatchFromPB(
+      REPLICATE, std::move(batch_pb), std::move(callback));
   batch->SetReplicates(replicates);
-  return AsyncAppend(std::move(batch), callback);
+  return AsyncAppend(std::move(batch));
 }
 
 Status Log::AsyncAppendCommit(unique_ptr<consensus::CommitMsg> commit_msg,
-                              const StatusCallback& callback) {
+                              StatusCallback callback) {
   MAYBE_FAULT(FLAGS_fault_crash_before_append_commit);
 
   unique_ptr<LogEntryBatchPB> batch_pb(new LogEntryBatchPB);
@@ -882,9 +877,9 @@ Status Log::AsyncAppendCommit(unique_ptr<consensus::CommitMsg> commit_msg,
   entry->set_type(COMMIT);
   entry->set_allocated_commit(commit_msg.release());
 
-  unique_ptr<LogEntryBatch> entry_batch;
-  RETURN_NOT_OK(CreateBatchFromPB(COMMIT, std::move(batch_pb), &entry_batch));
-  AsyncAppend(std::move(entry_batch), callback);
+  unique_ptr<LogEntryBatch> entry_batch = CreateBatchFromPB(
+      COMMIT, std::move(batch_pb), std::move(callback));
+  AsyncAppend(std::move(entry_batch));
   return Status::OK();
 }
 
@@ -1012,7 +1007,8 @@ void Log::GetSegmentsToGCUnlocked(RetentionIndexes retention_indexes,
 Status Log::Append(LogEntryPB* entry) {
   unique_ptr<LogEntryBatchPB> entry_batch_pb(new LogEntryBatchPB);
   entry_batch_pb->mutable_entry()->AddAllocated(entry);
-  LogEntryBatch entry_batch(entry->type(), std::move(entry_batch_pb), 1);
+  LogEntryBatch entry_batch(entry->type(), std::move(entry_batch_pb), 1,
+                            &DoNothingStatusCB);
   entry_batch.Serialize();
   Status s = WriteBatch(&entry_batch);
   if (s.ok()) {
@@ -1027,10 +1023,10 @@ Status Log::WaitUntilAllFlushed() {
   // the async api.
   unique_ptr<LogEntryBatchPB> entry_batch(new LogEntryBatchPB);
   entry_batch->add_entry()->set_type(log::FLUSH_MARKER);
-  unique_ptr<LogEntryBatch> reserved_entry_batch;
-  RETURN_NOT_OK(CreateBatchFromPB(FLUSH_MARKER, std::move(entry_batch), &reserved_entry_batch));
   Synchronizer s;
-  AsyncAppend(std::move(reserved_entry_batch), s.AsStatusCallback());
+  unique_ptr<LogEntryBatch> reserved_entry_batch = CreateBatchFromPB(
+      FLUSH_MARKER, std::move(entry_batch), s.AsStatusCallback());
+  AsyncAppend(std::move(reserved_entry_batch));
   return s.Wait();
 }
 
@@ -1241,13 +1237,15 @@ Log::~Log() {
 
 LogEntryBatch::LogEntryBatch(LogEntryTypePB type,
                              unique_ptr<LogEntryBatchPB> entry_batch_pb,
-                             size_t count)
+                             size_t count,
+                             StatusCallback cb)
     : type_(type),
       entry_batch_pb_(std::move(entry_batch_pb)),
       total_size_bytes_(
           PREDICT_FALSE(count == 1 && entry_batch_pb_->entry(0).type() == FLUSH_MARKER) ?
           0 : entry_batch_pb_->ByteSize()),
-      count_(count) {
+      count_(count),
+      callback_(std::move(cb)) {
 }
 
 LogEntryBatch::~LogEntryBatch() {
diff --git a/src/kudu/consensus/log.h b/src/kudu/consensus/log.h
index ceeba63..68a24c2 100644
--- a/src/kudu/consensus/log.h
+++ b/src/kudu/consensus/log.h
@@ -19,6 +19,7 @@
 #include <atomic>
 #include <cstddef>
 #include <cstdint>
+#include <functional>
 #include <limits>
 #include <list>
 #include <map>
@@ -296,13 +297,13 @@ class Log : public RefCountedThreadSafe<Log> {
   // Append the given set of replicate messages, asynchronously.
   // This requires that the replicates have already been assigned OpIds.
   Status AsyncAppendReplicates(const std::vector<consensus::ReplicateRefPtr>& replicates,
-                               const StatusCallback& callback);
+                               StatusCallback callback);
 
   // Append the given commit message, asynchronously.
   //
   // Returns a bad status if the log is already shut down.
   Status AsyncAppendCommit(std::unique_ptr<consensus::CommitMsg> commit_msg,
-                           const StatusCallback& callback);
+                           StatusCallback callback);
 
   // Blocks the current thread until all the entries in the log queue
   // are flushed and fsynced (if fsync of log entries is enabled).
@@ -441,14 +442,17 @@ class Log : public RefCountedThreadSafe<Log> {
   // Sets that the current active segment is idle.
   void SetActiveSegmentIdle();
 
-  static Status CreateBatchFromPB(LogEntryTypePB type,
-                                  std::unique_ptr<LogEntryBatchPB> entry_batch_pb,
-                                  std::unique_ptr<LogEntryBatch>* entry_batch);
+  // Creates a new LogEntryBatch from 'entry_batch_pb'; all entries must be of
+  // type 'type'.
+  //
+  // After the batch is appended to the log, 'cb' will be invoked with the
+  // result status of the append.
+  static std::unique_ptr<LogEntryBatch> CreateBatchFromPB(
+      LogEntryTypePB type, std::unique_ptr<LogEntryBatchPB> entry_batch_pb,
+      StatusCallback cb);
 
-  // Asynchronously appends 'entry_batch' to the log. Once the append
-  // completes and is synced, 'callback' will be invoked.
-  Status AsyncAppend(std::unique_ptr<LogEntryBatch> entry_batch,
-                     const StatusCallback& callback);
+  // Asynchronously appends 'entry_batch' to the log.
+  Status AsyncAppend(std::unique_ptr<LogEntryBatch> entry_batch);
 
   // Writes serialized contents of 'entry' to the log. This is not thread-safe.
   Status WriteBatch(LogEntryBatch* entry_batch);
@@ -561,24 +565,12 @@ class LogEntryBatch {
 
   LogEntryBatch(LogEntryTypePB type,
                 std::unique_ptr<LogEntryBatchPB> entry_batch_pb,
-                size_t count);
+                size_t count,
+                StatusCallback cb);
 
   // Serializes contents of the entry to an internal buffer.
   void Serialize();
 
-  // Sets the callback that will be invoked after the entry is
-  // appended and synced to disk
-  void set_callback(const StatusCallback& cb) {
-    callback_ = cb;
-  }
-
-  // Returns the callback that will be invoked after the entry is
-  // appended and synced to disk.
-  const StatusCallback& callback() {
-    return callback_;
-  }
-
-
   // Returns a Slice representing the serialized contents of the
   // entry.
   Slice data() const {
@@ -605,6 +597,17 @@ class LogEntryBatch {
     replicates_ = replicates;
   }
 
+  void SetAppendError(const Status& s) {
+    DCHECK(!s.ok());
+    if (append_status_.ok()) {
+      append_status_ = s;
+    }
+  }
+
+  void RunCallback() {
+    callback_(append_status_);
+  }
+
   // The type of entries in this batch.
   const LogEntryTypePB type_;
 
@@ -631,6 +634,9 @@ class LogEntryBatch {
   // 'Serialize()'
   faststring buffer_;
 
+  // Tracks whether this batch was successfully append to the log.
+  Status append_status_;
+
   DISALLOW_COPY_AND_ASSIGN(LogEntryBatch);
 };
 
diff --git a/src/kudu/consensus/log_util.cc b/src/kudu/consensus/log_util.cc
index 6060c2a..865fb07 100644
--- a/src/kudu/consensus/log_util.cc
+++ b/src/kudu/consensus/log_util.cc
@@ -27,7 +27,6 @@
 
 #include "kudu/consensus/consensus.pb.h"
 #include "kudu/consensus/opid_util.h"
-#include "kudu/consensus/ref_counted_replicate.h"
 #include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/split.h"
@@ -859,18 +858,6 @@ void WritableLogSegment::GoIdle() {
   compress_buf_.shrink_to_fit();
 }
 
-unique_ptr<LogEntryBatchPB> CreateBatchFromAllocatedOperations(
-    const vector<consensus::ReplicateRefPtr>& msgs) {
-  unique_ptr<LogEntryBatchPB> entry_batch(new LogEntryBatchPB);
-  entry_batch->mutable_entry()->Reserve(msgs.size());
-  for (const auto& msg : msgs) {
-    LogEntryPB* entry_pb = entry_batch->add_entry();
-    entry_pb->set_type(log::REPLICATE);
-    entry_pb->set_allocated_replicate(msg->get());
-  }
-  return entry_batch;
-}
-
 bool IsLogFileName(const string& fname) {
   if (HasPrefixString(fname, ".")) {
     // Hidden file or ./..
diff --git a/src/kudu/consensus/log_util.h b/src/kudu/consensus/log_util.h
index 62571f1..630cb24 100644
--- a/src/kudu/consensus/log_util.h
+++ b/src/kudu/consensus/log_util.h
@@ -29,7 +29,6 @@
 
 #include "kudu/consensus/log.pb.h"
 #include "kudu/consensus/opid.pb.h"
-#include "kudu/consensus/ref_counted_replicate.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/util/atomic.h"
@@ -503,11 +502,6 @@ class WritableLogSegment {
   DISALLOW_COPY_AND_ASSIGN(WritableLogSegment);
 };
 
-// Return a newly created batch that contains the pre-allocated
-// ReplicateMsgs in 'msgs'.
-std::unique_ptr<LogEntryBatchPB> CreateBatchFromAllocatedOperations(
-    const std::vector<consensus::ReplicateRefPtr>& msgs);
-
 // Checks if 'fname' is a correctly formatted name of log segment file.
 bool IsLogFileName(const std::string& fname);