You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2020/03/30 18:56:41 UTC
[kudu] 07/07: fs: remove kudu::Bind usage from DataDir closures
This is an automated email from the ASF dual-hosted git repository.
adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 676a765f2f8744bd7273b7358ab3a8b22c148edb
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Sat Mar 28 01:46:12 2020 -0700
fs: remove kudu::Bind usage from DataDir closures
Change-Id: Id58b8740ccc33762383a48680c726e8d30e7f25c
Reviewed-on: http://gerrit.cloudera.org:8080/15581
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
src/kudu/fs/dir_manager.cc | 10 +++++-----
src/kudu/fs/dir_manager.h | 4 ++--
src/kudu/fs/file_block_manager.cc | 12 ++++++------
src/kudu/fs/log_block_manager.cc | 39 ++++++++++++++++++++-------------------
4 files changed, 33 insertions(+), 32 deletions(-)
diff --git a/src/kudu/fs/dir_manager.cc b/src/kudu/fs/dir_manager.cc
index c76f773..fd8a81f 100644
--- a/src/kudu/fs/dir_manager.cc
+++ b/src/kudu/fs/dir_manager.cc
@@ -34,7 +34,6 @@
#include "kudu/fs/dir_util.h"
#include "kudu/fs/fs.pb.h"
-#include "kudu/gutil/bind.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/join.h"
@@ -103,12 +102,12 @@ void Dir::Shutdown() {
is_shutdown_ = true;
}
-void Dir::ExecClosure(const Closure& task) {
- Status s = pool_->Submit([task]() { task.Run(); });
+void Dir::ExecClosure(const std::function<void()>& task) {
+ Status s = pool_->Submit(task);
if (!s.ok()) {
WARN_NOT_OK(
s, "Could not submit task to thread pool, running it synchronously");
- task.Run();
+ task();
}
}
@@ -577,7 +576,8 @@ Status DirManager::Open() {
// Use the per-dir thread pools to delete temporary files in parallel.
for (const auto& dir : dirs) {
if (dir->instance()->healthy()) {
- dir->ExecClosure(Bind(&DeleteTmpFilesRecursively, env_, dir->dir()));
+ auto* d = dir.get();
+ dir->ExecClosure([this, d]() { DeleteTmpFilesRecursively(this->env_, d->dir()); });
}
}
for (const auto& dir : dirs) {
diff --git a/src/kudu/fs/dir_manager.h b/src/kudu/fs/dir_manager.h
index e1beae0..d162ae8 100644
--- a/src/kudu/fs/dir_manager.h
+++ b/src/kudu/fs/dir_manager.h
@@ -19,6 +19,7 @@
#include <stdint.h>
+#include <functional>
#include <memory>
#include <mutex>
#include <set>
@@ -26,7 +27,6 @@
#include <unordered_map>
#include <vector>
-#include "kudu/gutil/callback.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/util/locks.h"
@@ -109,7 +109,7 @@ class Dir {
//
// Normally the task is performed asynchronously. However, if submission to
// the pool fails, it runs synchronously on the current thread.
- void ExecClosure(const Closure& task);
+ void ExecClosure(const std::function<void()>& task);
// Waits for any outstanding closures submitted via ExecClosure() to finish.
void WaitOnClosures();
diff --git a/src/kudu/fs/file_block_manager.cc b/src/kudu/fs/file_block_manager.cc
index ae8f1bc..2e631cb 100644
--- a/src/kudu/fs/file_block_manager.cc
+++ b/src/kudu/fs/file_block_manager.cc
@@ -37,7 +37,6 @@
#include "kudu/fs/dir_manager.h"
#include "kudu/fs/error_manager.h"
#include "kudu/fs/fs_report.h"
-#include "kudu/gutil/bind.h"
#include "kudu/gutil/casts.h"
#include "kudu/gutil/integral_types.h"
#include "kudu/gutil/map-util.h"
@@ -943,11 +942,12 @@ Status FileBlockManager::GetAllBlockIds(vector<BlockId>* block_ids) {
vector<vector<BlockId>> block_id_vecs(dds.size());
vector<Status> statuses(dds.size());
for (int i = 0; i < dds.size(); i++) {
- dds[i]->ExecClosure(Bind(&GetAllBlockIdsForDir,
- env_,
- dds[i].get(),
- &block_id_vecs[i],
- &statuses[i]));
+ auto* dd = dds[i].get();
+ auto* bid_vec = &block_id_vecs[i];
+ auto* s = &statuses[i];
+ dds[i]->ExecClosure([this, dd, bid_vec, s]() {
+ GetAllBlockIdsForDir(this->env_, dd, bid_vec, s);
+ });
}
for (const auto& dd : dd_manager_->dirs()) {
dd->WaitOnClosures();
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index cb40778..945d53f 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -22,6 +22,7 @@
#include <algorithm>
#include <cstddef>
#include <cstdint>
+#include <functional>
#include <map>
#include <memory>
#include <mutex>
@@ -44,9 +45,6 @@
#include "kudu/fs/error_manager.h"
#include "kudu/fs/fs.pb.h"
#include "kudu/fs/fs_report.h"
-#include "kudu/gutil/bind.h"
-#include "kudu/gutil/bind_helpers.h"
-#include "kudu/gutil/callback.h"
#include "kudu/gutil/casts.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
@@ -502,7 +500,7 @@ class LogBlockContainer: public RefCountedThreadSafe<LogBlockContainer> {
//
// Normally the task is performed asynchronously. However, if submission to
// the pool fails, it runs synchronously on the current thread.
- void ExecClosure(const Closure& task);
+ void ExecClosure(const std::function<void()>& task);
// Produces a debug-friendly string representation of this container.
string ToString() const;
@@ -1337,7 +1335,7 @@ void LogBlockContainer::BlockDeleted(const LogBlockRefPtr& block) {
live_blocks_.IncrementBy(-1);
}
-void LogBlockContainer::ExecClosure(const Closure& task) {
+void LogBlockContainer::ExecClosure(const std::function<void()>& task) {
data_dir_->ExecClosure(task);
}
@@ -1496,11 +1494,11 @@ LogBlockDeletionTransaction::~LogBlockDeletionTransaction() {
Substitute("could not coalesce hole punching for container: $0",
container->ToString()));
+ scoped_refptr<LogBlockContainer> self(container);
for (const auto& interval : entry.second) {
- container->ExecClosure(Bind(&LogBlockContainer::ContainerDeletionAsync,
- container,
- interval.first,
- interval.second - interval.first));
+ container->ExecClosure([self, interval]() {
+ self->ContainerDeletionAsync(interval.first, interval.second - interval.first);
+ });
}
}
}
@@ -2081,12 +2079,12 @@ Status LogBlockManager::Open(FsReport* report) {
}
// Open the data dir asynchronously.
- dd->ExecClosure(
- Bind(&LogBlockManager::OpenDataDir,
- Unretained(this),
- dd.get(),
- &container_results[i],
- &statuses[i]));
+ auto* dd_raw = dd.get();
+ auto* results = &container_results[i];
+ auto* s = &statuses[i];
+ dd->ExecClosure([this, dd_raw, results, s]() {
+ this->OpenDataDir(dd_raw, results, s);
+ });
}
// Wait for the opens to complete.
@@ -2136,8 +2134,9 @@ Status LogBlockManager::Open(FsReport* report) {
}
if (do_repair) {
dir_results[i] = std::move(dir_result);
- dd->ExecClosure(Bind(&LogBlockManager::RepairTask, Unretained(this),
- dd.get(), Unretained(dir_results[i].get())));
+ auto* dd_raw = dd.get();
+ auto* dr = dir_results[i].get();
+ dd->ExecClosure([this, dd_raw, dr]() { this->RepairTask(dd_raw, dr); });
}
}
@@ -2558,8 +2557,10 @@ void LogBlockManager::OpenDataDir(
}
// Load the container's records asynchronously.
- dir->ExecClosure(Bind(&LogBlockManager::LoadContainer, Unretained(this),
- dir, container, Unretained(results->back().get())));
+ auto* r = results->back().get();
+ dir->ExecClosure([this, dir, container, r]() {
+ this->LoadContainer(dir, container, r);
+ });
}
}