You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2017/06/27 23:03:43 UTC

[3/5] kudu git commit: server: move apply_pool into KuduServer

server: move apply_pool into KuduServer

The server-wide apply_pool was instantiated in two places: SysCatalogTable
(for the master) and TsTabletManager (for the tserver). This commit moves it
into KuduServer and unifies the instantiation. Note: the apply pool
semantics have not changed.

Some interesting side effects:
1. The master will now generate apply pool metrics.
2. The apply pool is shut down a little bit later in server shutdown than it
   was before, though I don't see any issues with this.

Change-Id: Ie7ffc886269aa6531517a52fef29c4408a925aed
Reviewed-on: http://gerrit.cloudera.org:8080/6984
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/74f99e40
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/74f99e40
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/74f99e40

Branch: refs/heads/master
Commit: 74f99e40da6ccab0b9042e7f38a50176db1d8899
Parents: 4c5c512
Author: Adar Dembo <ad...@cloudera.com>
Authored: Mon May 22 12:27:04 2017 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Tue Jun 27 22:53:23 2017 +0000

----------------------------------------------------------------------
 src/kudu/kserver/kserver.cc           | 47 ++++++++++++++++++++++++++++++
 src/kudu/kserver/kserver.h            |  8 +++++
 src/kudu/master/catalog_manager.cc    |  1 -
 src/kudu/master/sys_catalog.cc        |  9 ++----
 src/kudu/master/sys_catalog.h         |  5 +---
 src/kudu/tserver/tablet_server.cc     |  2 +-
 src/kudu/tserver/ts_tablet_manager.cc | 44 +++-------------------------
 src/kudu/tserver/ts_tablet_manager.h  |  8 +----
 8 files changed, 65 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/74f99e40/src/kudu/kserver/kserver.cc
----------------------------------------------------------------------
diff --git a/src/kudu/kserver/kserver.cc b/src/kudu/kserver/kserver.cc
index 20268b4..855eded 100644
--- a/src/kudu/kserver/kserver.cc
+++ b/src/kudu/kserver/kserver.cc
@@ -21,7 +21,9 @@
 #include <utility>
 
 #include "kudu/server/server_base_options.h"
+#include "kudu/util/metrics.h"
 #include "kudu/util/status.h"
+#include "kudu/util/threadpool.h"
 
 using std::string;
 
@@ -31,6 +33,28 @@ using server::ServerBaseOptions;
 
 namespace kserver {
 
+METRIC_DEFINE_histogram(server, op_apply_queue_length, "Operation Apply Queue Length",
+                        MetricUnit::kTasks,
+                        "Number of operations waiting to be applied to the tablet. "
+                        "High queue lengths indicate that the server is unable to process "
+                        "operations as fast as they are being written to the WAL.",
+                        10000, 2);
+
+METRIC_DEFINE_histogram(server, op_apply_queue_time, "Operation Apply Queue Time",
+                        MetricUnit::kMicroseconds,
+                        "Time that operations spent waiting in the apply queue before being "
+                        "processed. High queue times indicate that the server is unable to "
+                        "process operations as fast as they are being written to the WAL.",
+                        10000000, 2);
+
+METRIC_DEFINE_histogram(server, op_apply_run_time, "Operation Apply Run Time",
+                        MetricUnit::kMicroseconds,
+                        "Time that operations spent being applied to the tablet. "
+                        "High values may indicate that the server is under-provisioned or "
+                        "that operations consist of very large batches.",
+                        10000000, 2);
+
+
 KuduServer::KuduServer(string name,
                        const ServerBaseOptions& options,
                        const string& metric_namespace)
@@ -39,6 +63,16 @@ KuduServer::KuduServer(string name,
 
 Status KuduServer::Init() {
   RETURN_NOT_OK(ServerBase::Init());
+
+  RETURN_NOT_OK(ThreadPoolBuilder("apply")
+                .Build(&tablet_apply_pool_));
+  tablet_apply_pool_->SetQueueLengthHistogram(
+      METRIC_op_apply_queue_length.Instantiate(metric_entity_));
+  tablet_apply_pool_->SetQueueTimeMicrosHistogram(
+      METRIC_op_apply_queue_time.Instantiate(metric_entity_));
+  tablet_apply_pool_->SetRunTimeMicrosHistogram(
+      METRIC_op_apply_run_time.Instantiate(metric_entity_));
+
   return Status::OK();
 }
 
@@ -48,6 +82,19 @@ Status KuduServer::Start() {
 }
 
 void KuduServer::Shutdown() {
+  // Shut down the messenger early, waiting for any reactor threads to finish
+  // running. This ensures that any ref-counted objects inside closures run by
+  // reactor threads will be destroyed before we shut down server-wide thread
+  // pools below, which is important because those objects may own tokens
+  // belonging to the pools.
+  //
+  // Note: prior to this call, it is assumed that any incoming RPCs deferred
+  // from reactor threads have already been cleaned up.
+  messenger_->Shutdown();
+
+  if (tablet_apply_pool_) {
+    tablet_apply_pool_->Shutdown();
+  }
   ServerBase::Shutdown();
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/74f99e40/src/kudu/kserver/kserver.h
----------------------------------------------------------------------
diff --git a/src/kudu/kserver/kserver.h b/src/kudu/kserver/kserver.h
index 1f37886..56d62dd 100644
--- a/src/kudu/kserver/kserver.h
+++ b/src/kudu/kserver/kserver.h
@@ -19,11 +19,13 @@
 
 #include <string>
 
+#include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/server/server_base.h"
 
 namespace kudu {
 class Status;
+class ThreadPool;
 
 namespace server {
 struct ServerBaseOptions;
@@ -54,7 +56,13 @@ class KuduServer : public server::ServerBase {
   // Shuts down a KuduServer instance.
   virtual void Shutdown() override;
 
+  ThreadPool* tablet_apply_pool() const { return tablet_apply_pool_.get(); }
+
  private:
+
+  // Thread pool for applying transactions, shared between all tablets.
+  gscoped_ptr<ThreadPool> tablet_apply_pool_;
+
   DISALLOW_COPY_AND_ASSIGN(KuduServer);
 };
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/74f99e40/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index de632f9..6a1a471 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -1085,7 +1085,6 @@ Status CatalogManager::InitSysCatalogAsync(bool is_first_run) {
   std::lock_guard<LockType> l(lock_);
   unique_ptr<SysCatalogTable> new_catalog(
       new SysCatalogTable(master_,
-                          master_->metric_registry(),
                           Bind(&CatalogManager::ElectedAsLeaderCb,
                                Unretained(this))));
   if (is_first_run) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/74f99e40/src/kudu/master/sys_catalog.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index bf7ae7a..2c251dc 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -96,7 +96,6 @@ const char* const SysCatalogTable::kSysCertAuthorityEntryId =
 const char* const SysCatalogTable::kInjectedFailureStatusMsg =
     "INJECTED FAILURE";
 
-
 namespace {
 
 // Return true if the two PBs are equal.
@@ -116,12 +115,11 @@ bool ArePBsEqual(const google::protobuf::Message& prev_pb,
 } // anonymous namespace
 
 
-SysCatalogTable::SysCatalogTable(Master* master, MetricRegistry* metrics,
+SysCatalogTable::SysCatalogTable(Master* master,
                                  ElectedLeaderCallback leader_cb)
-    : metric_registry_(metrics),
+    : metric_registry_(master->metric_registry()),
       master_(master),
       leader_cb_(std::move(leader_cb)) {
-  CHECK_OK(ThreadPoolBuilder("apply").Build(&apply_pool_));
 }
 
 SysCatalogTable::~SysCatalogTable() {
@@ -131,7 +129,6 @@ void SysCatalogTable::Shutdown() {
   if (tablet_replica_) {
     tablet_replica_->Shutdown();
   }
-  apply_pool_->Shutdown();
 }
 
 Status SysCatalogTable::Load(FsManager *fs_manager) {
@@ -310,7 +307,7 @@ Status SysCatalogTable::SetupTablet(const scoped_refptr<tablet::TabletMetadata>&
   tablet_replica_.reset(new TabletReplica(
       metadata,
       local_peer_pb_,
-      apply_pool_.get(),
+      master_->tablet_apply_pool(),
       Bind(&SysCatalogTable::SysCatalogStateChanged, Unretained(this), metadata->tablet_id())));
 
   consensus::ConsensusBootstrapInfo consensus_info;

http://git-wip-us.apache.org/repos/asf/kudu/blob/74f99e40/src/kudu/master/sys_catalog.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog.h b/src/kudu/master/sys_catalog.h
index 1f306b6..36456f3 100644
--- a/src/kudu/master/sys_catalog.h
+++ b/src/kudu/master/sys_catalog.h
@@ -108,8 +108,7 @@ class SysCatalogTable {
   // the consensus configuration's progress, any long running tasks (e.g., scanning
   // tablets) should be performed asynchronously (by, e.g., submitting
   // them to a to a separate threadpool).
-  SysCatalogTable(Master* master, MetricRegistry* metrics,
-                  ElectedLeaderCallback leader_cb);
+  SysCatalogTable(Master* master,  ElectedLeaderCallback leader_cb);
 
   ~SysCatalogTable();
 
@@ -248,8 +247,6 @@ class SysCatalogTable {
 
   MetricRegistry* metric_registry_;
 
-  gscoped_ptr<ThreadPool> apply_pool_;
-
   scoped_refptr<tablet::TabletReplica> tablet_replica_;
 
   Master* master_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/74f99e40/src/kudu/tserver/tablet_server.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server.cc b/src/kudu/tserver/tablet_server.cc
index 658ecfb..bf98057 100644
--- a/src/kudu/tserver/tablet_server.cc
+++ b/src/kudu/tserver/tablet_server.cc
@@ -46,7 +46,7 @@ TabletServer::TabletServer(const TabletServerOptions& opts)
     initted_(false),
     fail_heartbeats_for_tests_(false),
     opts_(opts),
-    tablet_manager_(new TSTabletManager(fs_manager_.get(), this, metric_registry())),
+    tablet_manager_(new TSTabletManager(this)),
     scanner_manager_(new ScannerManager(metric_entity())),
     path_handlers_(new TabletServerPathHandlers(this)),
     maintenance_manager_(new MaintenanceManager(MaintenanceManager::kDefaultOptions)) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/74f99e40/src/kudu/tserver/ts_tablet_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index 1553fca..866374b 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -54,7 +54,6 @@
 #include "kudu/util/env_util.h"
 #include "kudu/util/fault_injection.h"
 #include "kudu/util/flag_tags.h"
-#include "kudu/util/metrics.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/stopwatch.h"
 #include "kudu/util/trace.h"
@@ -104,29 +103,7 @@ TAG_FLAG(fault_crash_after_tc_files_fetched, unsafe);
 namespace kudu {
 namespace tserver {
 
-METRIC_DEFINE_histogram(server, op_apply_queue_length, "Operation Apply Queue Length",
-                        MetricUnit::kTasks,
-                        "Number of operations waiting to be applied to the tablet. "
-                        "High queue lengths indicate that the server is unable to process "
-                        "operations as fast as they are being written to the WAL.",
-                        10000, 2);
-
-METRIC_DEFINE_histogram(server, op_apply_queue_time, "Operation Apply Queue Time",
-                        MetricUnit::kMicroseconds,
-                        "Time that operations spent waiting in the apply queue before being "
-                        "processed. High queue times indicate that the server is unable to "
-                        "process operations as fast as they are being written to the WAL.",
-                        10000000, 2);
-
-METRIC_DEFINE_histogram(server, op_apply_run_time, "Operation Apply Run Time",
-                        MetricUnit::kMicroseconds,
-                        "Time that operations spent being applied to the tablet. "
-                        "High values may indicate that the server is under-provisioned or "
-                        "that operations consist of very large batches.",
-                        10000000, 2);
-
 using consensus::ConsensusMetadata;
-using consensus::ConsensusStatePB;
 using consensus::OpId;
 using consensus::RaftConfigPB;
 using consensus::RaftPeerPB;
@@ -150,21 +127,11 @@ using tablet::TabletMetadata;
 using tablet::TabletReplica;
 using tserver::TabletCopyClient;
 
-TSTabletManager::TSTabletManager(FsManager* fs_manager,
-                                 TabletServer* server,
-                                 MetricRegistry* metric_registry)
-  : fs_manager_(fs_manager),
+TSTabletManager::TSTabletManager(TabletServer* server)
+  : fs_manager_(server->fs_manager()),
     server_(server),
-    metric_registry_(metric_registry),
+    metric_registry_(server->metric_registry()),
     state_(MANAGER_INITIALIZING) {
-
-  CHECK_OK(ThreadPoolBuilder("apply").Build(&apply_pool_));
-  apply_pool_->SetQueueLengthHistogram(
-      METRIC_op_apply_queue_length.Instantiate(server_->metric_entity()));
-  apply_pool_->SetQueueTimeMicrosHistogram(
-      METRIC_op_apply_queue_time.Instantiate(server_->metric_entity()));
-  apply_pool_->SetRunTimeMicrosHistogram(
-      METRIC_op_apply_run_time.Instantiate(server_->metric_entity()));
 }
 
 TSTabletManager::~TSTabletManager() {
@@ -594,7 +561,7 @@ scoped_refptr<TabletReplica> TSTabletManager::CreateAndRegisterTabletReplica(
   scoped_refptr<TabletReplica> replica(
       new TabletReplica(meta,
                         local_peer_pb_,
-                        apply_pool_.get(),
+                        server_->tablet_apply_pool(),
                         Bind(&TSTabletManager::MarkTabletDirty,
                              Unretained(this),
                              meta->tablet_id())));
@@ -877,9 +844,6 @@ void TSTabletManager::Shutdown() {
     replica->Shutdown();
   }
 
-  // Shut down the apply pool.
-  apply_pool_->Shutdown();
-
   {
     std::lock_guard<rw_spinlock> l(lock_);
     // We don't expect anyone else to be modifying the map after we start the

http://git-wip-us.apache.org/repos/asf/kudu/blob/74f99e40/src/kudu/tserver/ts_tablet_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/ts_tablet_manager.h b/src/kudu/tserver/ts_tablet_manager.h
index 13fd87f..66f1b4d 100644
--- a/src/kudu/tserver/ts_tablet_manager.h
+++ b/src/kudu/tserver/ts_tablet_manager.h
@@ -78,10 +78,7 @@ class TransitionInProgressDeleter;
 class TSTabletManager : public tserver::TabletReplicaLookupIf {
  public:
   // Construct the tablet manager.
-  // 'fs_manager' must remain valid until this object is destructed.
-  TSTabletManager(FsManager* fs_manager,
-                  TabletServer* server,
-                  MetricRegistry* metric_registry);
+  explicit TSTabletManager(TabletServer* server);
 
   virtual ~TSTabletManager();
 
@@ -315,9 +312,6 @@ class TSTabletManager : public tserver::TabletReplicaLookupIf {
   // Thread pool used to open the tablets async, whether bootstrap is required or not.
   gscoped_ptr<ThreadPool> open_tablet_pool_;
 
-  // Thread pool for apply transactions, shared between all tablets.
-  gscoped_ptr<ThreadPool> apply_pool_;
-
   DISALLOW_COPY_AND_ASSIGN(TSTabletManager);
 };