You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2020/03/30 18:56:41 UTC

[kudu] 07/07: fs: remove kudu::Bind usage from DataDir closures

This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 676a765f2f8744bd7273b7358ab3a8b22c148edb
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Sat Mar 28 01:46:12 2020 -0700

    fs: remove kudu::Bind usage from DataDir closures
    
    Change-Id: Id58b8740ccc33762383a48680c726e8d30e7f25c
    Reviewed-on: http://gerrit.cloudera.org:8080/15581
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/fs/dir_manager.cc        | 10 +++++-----
 src/kudu/fs/dir_manager.h         |  4 ++--
 src/kudu/fs/file_block_manager.cc | 12 ++++++------
 src/kudu/fs/log_block_manager.cc  | 39 ++++++++++++++++++++-------------------
 4 files changed, 33 insertions(+), 32 deletions(-)

diff --git a/src/kudu/fs/dir_manager.cc b/src/kudu/fs/dir_manager.cc
index c76f773..fd8a81f 100644
--- a/src/kudu/fs/dir_manager.cc
+++ b/src/kudu/fs/dir_manager.cc
@@ -34,7 +34,6 @@
 
 #include "kudu/fs/dir_util.h"
 #include "kudu/fs/fs.pb.h"
-#include "kudu/gutil/bind.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/join.h"
@@ -103,12 +102,12 @@ void Dir::Shutdown() {
   is_shutdown_ = true;
 }
 
-void Dir::ExecClosure(const Closure& task) {
-  Status s = pool_->Submit([task]() { task.Run(); });
+void Dir::ExecClosure(const std::function<void()>& task) {
+  Status s = pool_->Submit(task);
   if (!s.ok()) {
     WARN_NOT_OK(
         s, "Could not submit task to thread pool, running it synchronously");
-    task.Run();
+    task();
   }
 }
 
@@ -577,7 +576,8 @@ Status DirManager::Open() {
   // Use the per-dir thread pools to delete temporary files in parallel.
   for (const auto& dir : dirs) {
     if (dir->instance()->healthy()) {
-      dir->ExecClosure(Bind(&DeleteTmpFilesRecursively, env_, dir->dir()));
+      auto* d = dir.get();
+      dir->ExecClosure([this, d]() { DeleteTmpFilesRecursively(this->env_, d->dir()); });
     }
   }
   for (const auto& dir : dirs) {
diff --git a/src/kudu/fs/dir_manager.h b/src/kudu/fs/dir_manager.h
index e1beae0..d162ae8 100644
--- a/src/kudu/fs/dir_manager.h
+++ b/src/kudu/fs/dir_manager.h
@@ -19,6 +19,7 @@
 
 #include <stdint.h>
 
+#include <functional>
 #include <memory>
 #include <mutex>
 #include <set>
@@ -26,7 +27,6 @@
 #include <unordered_map>
 #include <vector>
 
-#include "kudu/gutil/callback.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/util/locks.h"
@@ -109,7 +109,7 @@ class Dir {
   //
   // Normally the task is performed asynchronously. However, if submission to
   // the pool fails, it runs synchronously on the current thread.
-  void ExecClosure(const Closure& task);
+  void ExecClosure(const std::function<void()>& task);
 
   // Waits for any outstanding closures submitted via ExecClosure() to finish.
   void WaitOnClosures();
diff --git a/src/kudu/fs/file_block_manager.cc b/src/kudu/fs/file_block_manager.cc
index ae8f1bc..2e631cb 100644
--- a/src/kudu/fs/file_block_manager.cc
+++ b/src/kudu/fs/file_block_manager.cc
@@ -37,7 +37,6 @@
 #include "kudu/fs/dir_manager.h"
 #include "kudu/fs/error_manager.h"
 #include "kudu/fs/fs_report.h"
-#include "kudu/gutil/bind.h"
 #include "kudu/gutil/casts.h"
 #include "kudu/gutil/integral_types.h"
 #include "kudu/gutil/map-util.h"
@@ -943,11 +942,12 @@ Status FileBlockManager::GetAllBlockIds(vector<BlockId>* block_ids) {
   vector<vector<BlockId>> block_id_vecs(dds.size());
   vector<Status> statuses(dds.size());
   for (int i = 0; i < dds.size(); i++) {
-    dds[i]->ExecClosure(Bind(&GetAllBlockIdsForDir,
-                             env_,
-                             dds[i].get(),
-                             &block_id_vecs[i],
-                             &statuses[i]));
+    auto* dd = dds[i].get();
+    auto* bid_vec = &block_id_vecs[i];
+    auto* s = &statuses[i];
+    dds[i]->ExecClosure([this, dd, bid_vec, s]() {
+      GetAllBlockIdsForDir(this->env_, dd, bid_vec, s);
+    });
   }
   for (const auto& dd : dd_manager_->dirs()) {
     dd->WaitOnClosures();
diff --git a/src/kudu/fs/log_block_manager.cc b/src/kudu/fs/log_block_manager.cc
index cb40778..945d53f 100644
--- a/src/kudu/fs/log_block_manager.cc
+++ b/src/kudu/fs/log_block_manager.cc
@@ -22,6 +22,7 @@
 #include <algorithm>
 #include <cstddef>
 #include <cstdint>
+#include <functional>
 #include <map>
 #include <memory>
 #include <mutex>
@@ -44,9 +45,6 @@
 #include "kudu/fs/error_manager.h"
 #include "kudu/fs/fs.pb.h"
 #include "kudu/fs/fs_report.h"
-#include "kudu/gutil/bind.h"
-#include "kudu/gutil/bind_helpers.h"
-#include "kudu/gutil/callback.h"
 #include "kudu/gutil/casts.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
@@ -502,7 +500,7 @@ class LogBlockContainer: public RefCountedThreadSafe<LogBlockContainer> {
   //
   // Normally the task is performed asynchronously. However, if submission to
   // the pool fails, it runs synchronously on the current thread.
-  void ExecClosure(const Closure& task);
+  void ExecClosure(const std::function<void()>& task);
 
   // Produces a debug-friendly string representation of this container.
   string ToString() const;
@@ -1337,7 +1335,7 @@ void LogBlockContainer::BlockDeleted(const LogBlockRefPtr& block) {
   live_blocks_.IncrementBy(-1);
 }
 
-void LogBlockContainer::ExecClosure(const Closure& task) {
+void LogBlockContainer::ExecClosure(const std::function<void()>& task) {
   data_dir_->ExecClosure(task);
 }
 
@@ -1496,11 +1494,11 @@ LogBlockDeletionTransaction::~LogBlockDeletionTransaction() {
                      Substitute("could not coalesce hole punching for container: $0",
                                 container->ToString()));
 
+    scoped_refptr<LogBlockContainer> self(container);
     for (const auto& interval : entry.second) {
-      container->ExecClosure(Bind(&LogBlockContainer::ContainerDeletionAsync,
-                                  container,
-                                  interval.first,
-                                  interval.second - interval.first));
+      container->ExecClosure([self, interval]() {
+        self->ContainerDeletionAsync(interval.first, interval.second - interval.first);
+      });
     }
   }
 }
@@ -2081,12 +2079,12 @@ Status LogBlockManager::Open(FsReport* report) {
     }
 
     // Open the data dir asynchronously.
-    dd->ExecClosure(
-        Bind(&LogBlockManager::OpenDataDir,
-             Unretained(this),
-             dd.get(),
-             &container_results[i],
-             &statuses[i]));
+    auto* dd_raw = dd.get();
+    auto* results = &container_results[i];
+    auto* s = &statuses[i];
+    dd->ExecClosure([this, dd_raw, results, s]() {
+      this->OpenDataDir(dd_raw, results, s);
+    });
   }
 
   // Wait for the opens to complete.
@@ -2136,8 +2134,9 @@ Status LogBlockManager::Open(FsReport* report) {
     }
     if (do_repair) {
       dir_results[i] = std::move(dir_result);
-      dd->ExecClosure(Bind(&LogBlockManager::RepairTask, Unretained(this),
-                           dd.get(), Unretained(dir_results[i].get())));
+      auto* dd_raw = dd.get();
+      auto* dr = dir_results[i].get();
+      dd->ExecClosure([this, dd_raw, dr]() { this->RepairTask(dd_raw, dr); });
     }
   }
 
@@ -2558,8 +2557,10 @@ void LogBlockManager::OpenDataDir(
     }
 
     // Load the container's records asynchronously.
-    dir->ExecClosure(Bind(&LogBlockManager::LoadContainer, Unretained(this),
-                          dir, container, Unretained(results->back().get())));
+    auto* r = results->back().get();
+    dir->ExecClosure([this, dir, container, r]() {
+      this->LoadContainer(dir, container, r);
+    });
   }
 }