You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2020/03/28 08:13:38 UTC
[kudu] branch master updated: log: clean up entry batch append
callback usage
This is an automated email from the ASF dual-hosted git repository.
adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new daced7e log: clean up entry batch append callback usage
daced7e is described below
commit daced7ec446ea8e7e1a8eeebe21e08ee038f285a
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Fri Mar 27 13:04:31 2020 -0700
log: clean up entry batch append callback usage
I started with the desire to pass these callbacks by value and move them,
then ended up doing some additional cleanup.
Change-Id: I34bbe51fbe824745fc2705819546ddf14714da52
Reviewed-on: http://gerrit.cloudera.org:8080/15571
Reviewed-by: Andrew Wong <aw...@cloudera.com>
Tested-by: Adar Dembo <ad...@cloudera.com>
---
src/kudu/consensus/log.cc | 92 +++++++++++++++++++++---------------------
src/kudu/consensus/log.h | 52 +++++++++++++-----------
src/kudu/consensus/log_util.cc | 13 ------
src/kudu/consensus/log_util.h | 6 ---
4 files changed, 74 insertions(+), 89 deletions(-)
diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index 576b44e..2e6a514 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -373,7 +373,7 @@ void Log::AppendThread::HandleBatches(vector<LogEntryBatch*> entry_batches) {
SCOPED_LATENCY_METRIC(log_->ctx_.metrics, group_commit_latency);
bool is_all_commits = true;
- for (LogEntryBatch* entry_batch : entry_batches) {
+ for (auto* entry_batch : entry_batches) {
TRACE_EVENT_FLOW_END0("log", "Batch", entry_batch);
Status s = log_->WriteBatch(entry_batch);
if (PREDICT_FALSE(!s.ok())) {
@@ -382,10 +382,7 @@ void Log::AppendThread::HandleBatches(vector<LogEntryBatch*> entry_batches) {
// abort all subsequent transactions in this batch or allow
// them to be appended? What about transactions in future
// batches?
- if (entry_batch->callback()) {
- entry_batch->callback()(s);
- entry_batch->callback_ = nullptr;
- }
+ entry_batch->SetAppendError(s);
}
if (is_all_commits && entry_batch->type_ != COMMIT) {
is_all_commits = false;
@@ -398,25 +395,21 @@ void Log::AppendThread::HandleBatches(vector<LogEntryBatch*> entry_batches) {
}
if (PREDICT_FALSE(!s.ok())) {
LOG_WITH_PREFIX(ERROR) << "Error syncing log: " << s.ToString();
- for (LogEntryBatch* entry_batch : entry_batches) {
- if (entry_batch->callback()) {
- entry_batch->callback()(s);
- }
- delete entry_batch;
+ for (auto* entry_batch : entry_batches) {
+ entry_batch->SetAppendError(s);
}
} else {
- TRACE_EVENT0("log", "Callbacks");
VLOG_WITH_PREFIX(2) << "Synchronized " << entry_batches.size() << " entry batches";
- SCOPED_WATCH_STACK(100);
- for (LogEntryBatch* entry_batch : entry_batches) {
- if (PREDICT_TRUE(entry_batch->callback())) {
- entry_batch->callback()(Status::OK());
- }
- // It's important to delete each batch as we see it, because
- // deleting it may free up memory from memory trackers, and the
- // callback of a later batch may want to use that memory.
- delete entry_batch;
- }
+ }
+ TRACE_EVENT0("log", "Callbacks");
+ SCOPED_WATCH_STACK(100);
+ for (auto* entry_batch : entry_batches) {
+ entry_batch->RunCallback();
+
+ // It's important to delete each batch as we see it, because
+ // deleting it may free up memory from memory trackers, and the
+ // callback of a later batch may want to use that memory.
+ delete entry_batch;
}
}
@@ -836,23 +829,20 @@ Status Log::Init() {
return Status::OK();
}
-Status Log::CreateBatchFromPB(LogEntryTypePB type,
- unique_ptr<LogEntryBatchPB> entry_batch_pb,
- unique_ptr<LogEntryBatch>* entry_batch) {
+unique_ptr<LogEntryBatch> Log::CreateBatchFromPB(
+ LogEntryTypePB type,
+ unique_ptr<LogEntryBatchPB> entry_batch_pb,
+ StatusCallback cb) {
int num_ops = entry_batch_pb->entry_size();
unique_ptr<LogEntryBatch> new_entry_batch(new LogEntryBatch(
- type, std::move(entry_batch_pb), num_ops));
+ type, std::move(entry_batch_pb), num_ops, std::move(cb)));
new_entry_batch->Serialize();
TRACE("Serialized $0 byte log entry", new_entry_batch->total_size_bytes());
-
- *entry_batch = std::move(new_entry_batch);
- return Status::OK();
+ return new_entry_batch;
}
-Status Log::AsyncAppend(unique_ptr<LogEntryBatch> entry_batch, const StatusCallback& callback) {
+Status Log::AsyncAppend(unique_ptr<LogEntryBatch> entry_batch) {
TRACE_EVENT0("log", "Log::AsyncAppend");
-
- entry_batch->set_callback(callback);
TRACE_EVENT_FLOW_BEGIN0("log", "Batch", entry_batch.get());
if (PREDICT_FALSE(!entry_batch_queue_.BlockingPut(entry_batch.get()).ok())) {
TRACE_EVENT_FLOW_END0("log", "Batch", entry_batch.get());
@@ -864,17 +854,22 @@ Status Log::AsyncAppend(unique_ptr<LogEntryBatch> entry_batch, const StatusCallb
}
Status Log::AsyncAppendReplicates(const vector<ReplicateRefPtr>& replicates,
- const StatusCallback& callback) {
- unique_ptr<LogEntryBatchPB> batch_pb = CreateBatchFromAllocatedOperations(replicates);
-
- unique_ptr<LogEntryBatch> batch;
- RETURN_NOT_OK(CreateBatchFromPB(REPLICATE, std::move(batch_pb), &batch));
+ StatusCallback callback) {
+ unique_ptr<LogEntryBatchPB> batch_pb(new LogEntryBatchPB);
+ batch_pb->mutable_entry()->Reserve(replicates.size());
+ for (const auto& r : replicates) {
+ LogEntryPB* entry_pb = batch_pb->add_entry();
+ entry_pb->set_type(REPLICATE);
+ entry_pb->set_allocated_replicate(r->get());
+ }
+ unique_ptr<LogEntryBatch> batch = CreateBatchFromPB(
+ REPLICATE, std::move(batch_pb), std::move(callback));
batch->SetReplicates(replicates);
- return AsyncAppend(std::move(batch), callback);
+ return AsyncAppend(std::move(batch));
}
Status Log::AsyncAppendCommit(unique_ptr<consensus::CommitMsg> commit_msg,
- const StatusCallback& callback) {
+ StatusCallback callback) {
MAYBE_FAULT(FLAGS_fault_crash_before_append_commit);
unique_ptr<LogEntryBatchPB> batch_pb(new LogEntryBatchPB);
@@ -882,9 +877,9 @@ Status Log::AsyncAppendCommit(unique_ptr<consensus::CommitMsg> commit_msg,
entry->set_type(COMMIT);
entry->set_allocated_commit(commit_msg.release());
- unique_ptr<LogEntryBatch> entry_batch;
- RETURN_NOT_OK(CreateBatchFromPB(COMMIT, std::move(batch_pb), &entry_batch));
- AsyncAppend(std::move(entry_batch), callback);
+ unique_ptr<LogEntryBatch> entry_batch = CreateBatchFromPB(
+ COMMIT, std::move(batch_pb), std::move(callback));
+ AsyncAppend(std::move(entry_batch));
return Status::OK();
}
@@ -1012,7 +1007,8 @@ void Log::GetSegmentsToGCUnlocked(RetentionIndexes retention_indexes,
Status Log::Append(LogEntryPB* entry) {
unique_ptr<LogEntryBatchPB> entry_batch_pb(new LogEntryBatchPB);
entry_batch_pb->mutable_entry()->AddAllocated(entry);
- LogEntryBatch entry_batch(entry->type(), std::move(entry_batch_pb), 1);
+ LogEntryBatch entry_batch(entry->type(), std::move(entry_batch_pb), 1,
+ &DoNothingStatusCB);
entry_batch.Serialize();
Status s = WriteBatch(&entry_batch);
if (s.ok()) {
@@ -1027,10 +1023,10 @@ Status Log::WaitUntilAllFlushed() {
// the async api.
unique_ptr<LogEntryBatchPB> entry_batch(new LogEntryBatchPB);
entry_batch->add_entry()->set_type(log::FLUSH_MARKER);
- unique_ptr<LogEntryBatch> reserved_entry_batch;
- RETURN_NOT_OK(CreateBatchFromPB(FLUSH_MARKER, std::move(entry_batch), &reserved_entry_batch));
Synchronizer s;
- AsyncAppend(std::move(reserved_entry_batch), s.AsStatusCallback());
+ unique_ptr<LogEntryBatch> reserved_entry_batch = CreateBatchFromPB(
+ FLUSH_MARKER, std::move(entry_batch), s.AsStatusCallback());
+ AsyncAppend(std::move(reserved_entry_batch));
return s.Wait();
}
@@ -1241,13 +1237,15 @@ Log::~Log() {
LogEntryBatch::LogEntryBatch(LogEntryTypePB type,
unique_ptr<LogEntryBatchPB> entry_batch_pb,
- size_t count)
+ size_t count,
+ StatusCallback cb)
: type_(type),
entry_batch_pb_(std::move(entry_batch_pb)),
total_size_bytes_(
PREDICT_FALSE(count == 1 && entry_batch_pb_->entry(0).type() == FLUSH_MARKER) ?
0 : entry_batch_pb_->ByteSize()),
- count_(count) {
+ count_(count),
+ callback_(std::move(cb)) {
}
LogEntryBatch::~LogEntryBatch() {
diff --git a/src/kudu/consensus/log.h b/src/kudu/consensus/log.h
index ceeba63..68a24c2 100644
--- a/src/kudu/consensus/log.h
+++ b/src/kudu/consensus/log.h
@@ -19,6 +19,7 @@
#include <atomic>
#include <cstddef>
#include <cstdint>
+#include <functional>
#include <limits>
#include <list>
#include <map>
@@ -296,13 +297,13 @@ class Log : public RefCountedThreadSafe<Log> {
// Append the given set of replicate messages, asynchronously.
// This requires that the replicates have already been assigned OpIds.
Status AsyncAppendReplicates(const std::vector<consensus::ReplicateRefPtr>& replicates,
- const StatusCallback& callback);
+ StatusCallback callback);
// Append the given commit message, asynchronously.
//
// Returns a bad status if the log is already shut down.
Status AsyncAppendCommit(std::unique_ptr<consensus::CommitMsg> commit_msg,
- const StatusCallback& callback);
+ StatusCallback callback);
// Blocks the current thread until all the entries in the log queue
// are flushed and fsynced (if fsync of log entries is enabled).
@@ -441,14 +442,17 @@ class Log : public RefCountedThreadSafe<Log> {
// Sets that the current active segment is idle.
void SetActiveSegmentIdle();
- static Status CreateBatchFromPB(LogEntryTypePB type,
- std::unique_ptr<LogEntryBatchPB> entry_batch_pb,
- std::unique_ptr<LogEntryBatch>* entry_batch);
+ // Creates a new LogEntryBatch from 'entry_batch_pb'; all entries must be of
+ // type 'type'.
+ //
+ // After the batch is appended to the log, 'cb' will be invoked with the
+ // result status of the append.
+ static std::unique_ptr<LogEntryBatch> CreateBatchFromPB(
+ LogEntryTypePB type, std::unique_ptr<LogEntryBatchPB> entry_batch_pb,
+ StatusCallback cb);
- // Asynchronously appends 'entry_batch' to the log. Once the append
- // completes and is synced, 'callback' will be invoked.
- Status AsyncAppend(std::unique_ptr<LogEntryBatch> entry_batch,
- const StatusCallback& callback);
+ // Asynchronously appends 'entry_batch' to the log.
+ Status AsyncAppend(std::unique_ptr<LogEntryBatch> entry_batch);
// Writes serialized contents of 'entry' to the log. This is not thread-safe.
Status WriteBatch(LogEntryBatch* entry_batch);
@@ -561,24 +565,12 @@ class LogEntryBatch {
LogEntryBatch(LogEntryTypePB type,
std::unique_ptr<LogEntryBatchPB> entry_batch_pb,
- size_t count);
+ size_t count,
+ StatusCallback cb);
// Serializes contents of the entry to an internal buffer.
void Serialize();
- // Sets the callback that will be invoked after the entry is
- // appended and synced to disk
- void set_callback(const StatusCallback& cb) {
- callback_ = cb;
- }
-
- // Returns the callback that will be invoked after the entry is
- // appended and synced to disk.
- const StatusCallback& callback() {
- return callback_;
- }
-
-
// Returns a Slice representing the serialized contents of the
// entry.
Slice data() const {
@@ -605,6 +597,17 @@ class LogEntryBatch {
replicates_ = replicates;
}
+ void SetAppendError(const Status& s) {
+ DCHECK(!s.ok());
+ if (append_status_.ok()) {
+ append_status_ = s;
+ }
+ }
+
+ void RunCallback() {
+ callback_(append_status_);
+ }
+
// The type of entries in this batch.
const LogEntryTypePB type_;
@@ -631,6 +634,9 @@ class LogEntryBatch {
// 'Serialize()'
faststring buffer_;
+ // Tracks whether this batch was successfully append to the log.
+ Status append_status_;
+
DISALLOW_COPY_AND_ASSIGN(LogEntryBatch);
};
diff --git a/src/kudu/consensus/log_util.cc b/src/kudu/consensus/log_util.cc
index 6060c2a..865fb07 100644
--- a/src/kudu/consensus/log_util.cc
+++ b/src/kudu/consensus/log_util.cc
@@ -27,7 +27,6 @@
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/opid_util.h"
-#include "kudu/consensus/ref_counted_replicate.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/split.h"
@@ -859,18 +858,6 @@ void WritableLogSegment::GoIdle() {
compress_buf_.shrink_to_fit();
}
-unique_ptr<LogEntryBatchPB> CreateBatchFromAllocatedOperations(
- const vector<consensus::ReplicateRefPtr>& msgs) {
- unique_ptr<LogEntryBatchPB> entry_batch(new LogEntryBatchPB);
- entry_batch->mutable_entry()->Reserve(msgs.size());
- for (const auto& msg : msgs) {
- LogEntryPB* entry_pb = entry_batch->add_entry();
- entry_pb->set_type(log::REPLICATE);
- entry_pb->set_allocated_replicate(msg->get());
- }
- return entry_batch;
-}
-
bool IsLogFileName(const string& fname) {
if (HasPrefixString(fname, ".")) {
// Hidden file or ./..
diff --git a/src/kudu/consensus/log_util.h b/src/kudu/consensus/log_util.h
index 62571f1..630cb24 100644
--- a/src/kudu/consensus/log_util.h
+++ b/src/kudu/consensus/log_util.h
@@ -29,7 +29,6 @@
#include "kudu/consensus/log.pb.h"
#include "kudu/consensus/opid.pb.h"
-#include "kudu/consensus/ref_counted_replicate.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/util/atomic.h"
@@ -503,11 +502,6 @@ class WritableLogSegment {
DISALLOW_COPY_AND_ASSIGN(WritableLogSegment);
};
-// Return a newly created batch that contains the pre-allocated
-// ReplicateMsgs in 'msgs'.
-std::unique_ptr<LogEntryBatchPB> CreateBatchFromAllocatedOperations(
- const std::vector<consensus::ReplicateRefPtr>& msgs);
-
// Checks if 'fname' is a correctly formatted name of log segment file.
bool IsLogFileName(const std::string& fname);