You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2017/06/27 23:03:43 UTC
[3/5] kudu git commit: server: move apply_pool into KuduServer
server: move apply_pool into KuduServer
The server-wide apply_pool was instantiated in two places: SysCatalogTable
(for the master) and TsTabletManager (for the tserver). This commit moves it
into KuduServer and unifies the instantiation. Note: the apply pool
semantics have not changed.
Some interesting side effects:
1. The master will now generate apply pool metrics.
2. The apply pool is shut down a little bit later in server shutdown than it
was before, though I don't see any issues with this.
Change-Id: Ie7ffc886269aa6531517a52fef29c4408a925aed
Reviewed-on: http://gerrit.cloudera.org:8080/6984
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/74f99e40
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/74f99e40
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/74f99e40
Branch: refs/heads/master
Commit: 74f99e40da6ccab0b9042e7f38a50176db1d8899
Parents: 4c5c512
Author: Adar Dembo <ad...@cloudera.com>
Authored: Mon May 22 12:27:04 2017 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Tue Jun 27 22:53:23 2017 +0000
----------------------------------------------------------------------
src/kudu/kserver/kserver.cc | 47 ++++++++++++++++++++++++++++++
src/kudu/kserver/kserver.h | 8 +++++
src/kudu/master/catalog_manager.cc | 1 -
src/kudu/master/sys_catalog.cc | 9 ++----
src/kudu/master/sys_catalog.h | 5 +---
src/kudu/tserver/tablet_server.cc | 2 +-
src/kudu/tserver/ts_tablet_manager.cc | 44 +++-------------------------
src/kudu/tserver/ts_tablet_manager.h | 8 +----
8 files changed, 65 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/74f99e40/src/kudu/kserver/kserver.cc
----------------------------------------------------------------------
diff --git a/src/kudu/kserver/kserver.cc b/src/kudu/kserver/kserver.cc
index 20268b4..855eded 100644
--- a/src/kudu/kserver/kserver.cc
+++ b/src/kudu/kserver/kserver.cc
@@ -21,7 +21,9 @@
#include <utility>
#include "kudu/server/server_base_options.h"
+#include "kudu/util/metrics.h"
#include "kudu/util/status.h"
+#include "kudu/util/threadpool.h"
using std::string;
@@ -31,6 +33,28 @@ using server::ServerBaseOptions;
namespace kserver {
+METRIC_DEFINE_histogram(server, op_apply_queue_length, "Operation Apply Queue Length",
+ MetricUnit::kTasks,
+ "Number of operations waiting to be applied to the tablet. "
+ "High queue lengths indicate that the server is unable to process "
+ "operations as fast as they are being written to the WAL.",
+ 10000, 2);
+
+METRIC_DEFINE_histogram(server, op_apply_queue_time, "Operation Apply Queue Time",
+ MetricUnit::kMicroseconds,
+ "Time that operations spent waiting in the apply queue before being "
+ "processed. High queue times indicate that the server is unable to "
+ "process operations as fast as they are being written to the WAL.",
+ 10000000, 2);
+
+METRIC_DEFINE_histogram(server, op_apply_run_time, "Operation Apply Run Time",
+ MetricUnit::kMicroseconds,
+ "Time that operations spent being applied to the tablet. "
+ "High values may indicate that the server is under-provisioned or "
+ "that operations consist of very large batches.",
+ 10000000, 2);
+
+
KuduServer::KuduServer(string name,
const ServerBaseOptions& options,
const string& metric_namespace)
@@ -39,6 +63,16 @@ KuduServer::KuduServer(string name,
Status KuduServer::Init() {
RETURN_NOT_OK(ServerBase::Init());
+
+ RETURN_NOT_OK(ThreadPoolBuilder("apply")
+ .Build(&tablet_apply_pool_));
+ tablet_apply_pool_->SetQueueLengthHistogram(
+ METRIC_op_apply_queue_length.Instantiate(metric_entity_));
+ tablet_apply_pool_->SetQueueTimeMicrosHistogram(
+ METRIC_op_apply_queue_time.Instantiate(metric_entity_));
+ tablet_apply_pool_->SetRunTimeMicrosHistogram(
+ METRIC_op_apply_run_time.Instantiate(metric_entity_));
+
return Status::OK();
}
@@ -48,6 +82,19 @@ Status KuduServer::Start() {
}
void KuduServer::Shutdown() {
+ // Shut down the messenger early, waiting for any reactor threads to finish
+ // running. This ensures that any ref-counted objects inside closures run by
+ // reactor threads will be destroyed before we shut down server-wide thread
+ // pools below, which is important because those objects may own tokens
+ // belonging to the pools.
+ //
+ // Note: prior to this call, it is assumed that any incoming RPCs deferred
+ // from reactor threads have already been cleaned up.
+ messenger_->Shutdown();
+
+ if (tablet_apply_pool_) {
+ tablet_apply_pool_->Shutdown();
+ }
ServerBase::Shutdown();
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/74f99e40/src/kudu/kserver/kserver.h
----------------------------------------------------------------------
diff --git a/src/kudu/kserver/kserver.h b/src/kudu/kserver/kserver.h
index 1f37886..56d62dd 100644
--- a/src/kudu/kserver/kserver.h
+++ b/src/kudu/kserver/kserver.h
@@ -19,11 +19,13 @@
#include <string>
+#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/gutil/macros.h"
#include "kudu/server/server_base.h"
namespace kudu {
class Status;
+class ThreadPool;
namespace server {
struct ServerBaseOptions;
@@ -54,7 +56,13 @@ class KuduServer : public server::ServerBase {
// Shuts down a KuduServer instance.
virtual void Shutdown() override;
+ ThreadPool* tablet_apply_pool() const { return tablet_apply_pool_.get(); }
+
private:
+
+ // Thread pool for applying transactions, shared between all tablets.
+ gscoped_ptr<ThreadPool> tablet_apply_pool_;
+
DISALLOW_COPY_AND_ASSIGN(KuduServer);
};
http://git-wip-us.apache.org/repos/asf/kudu/blob/74f99e40/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index de632f9..6a1a471 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -1085,7 +1085,6 @@ Status CatalogManager::InitSysCatalogAsync(bool is_first_run) {
std::lock_guard<LockType> l(lock_);
unique_ptr<SysCatalogTable> new_catalog(
new SysCatalogTable(master_,
- master_->metric_registry(),
Bind(&CatalogManager::ElectedAsLeaderCb,
Unretained(this))));
if (is_first_run) {
http://git-wip-us.apache.org/repos/asf/kudu/blob/74f99e40/src/kudu/master/sys_catalog.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index bf7ae7a..2c251dc 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -96,7 +96,6 @@ const char* const SysCatalogTable::kSysCertAuthorityEntryId =
const char* const SysCatalogTable::kInjectedFailureStatusMsg =
"INJECTED FAILURE";
-
namespace {
// Return true if the two PBs are equal.
@@ -116,12 +115,11 @@ bool ArePBsEqual(const google::protobuf::Message& prev_pb,
} // anonymous namespace
-SysCatalogTable::SysCatalogTable(Master* master, MetricRegistry* metrics,
+SysCatalogTable::SysCatalogTable(Master* master,
ElectedLeaderCallback leader_cb)
- : metric_registry_(metrics),
+ : metric_registry_(master->metric_registry()),
master_(master),
leader_cb_(std::move(leader_cb)) {
- CHECK_OK(ThreadPoolBuilder("apply").Build(&apply_pool_));
}
SysCatalogTable::~SysCatalogTable() {
@@ -131,7 +129,6 @@ void SysCatalogTable::Shutdown() {
if (tablet_replica_) {
tablet_replica_->Shutdown();
}
- apply_pool_->Shutdown();
}
Status SysCatalogTable::Load(FsManager *fs_manager) {
@@ -310,7 +307,7 @@ Status SysCatalogTable::SetupTablet(const scoped_refptr<tablet::TabletMetadata>&
tablet_replica_.reset(new TabletReplica(
metadata,
local_peer_pb_,
- apply_pool_.get(),
+ master_->tablet_apply_pool(),
Bind(&SysCatalogTable::SysCatalogStateChanged, Unretained(this), metadata->tablet_id())));
consensus::ConsensusBootstrapInfo consensus_info;
http://git-wip-us.apache.org/repos/asf/kudu/blob/74f99e40/src/kudu/master/sys_catalog.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog.h b/src/kudu/master/sys_catalog.h
index 1f306b6..36456f3 100644
--- a/src/kudu/master/sys_catalog.h
+++ b/src/kudu/master/sys_catalog.h
@@ -108,8 +108,7 @@ class SysCatalogTable {
// the consensus configuration's progress, any long running tasks (e.g., scanning
// tablets) should be performed asynchronously (by, e.g., submitting
// them to a to a separate threadpool).
- SysCatalogTable(Master* master, MetricRegistry* metrics,
- ElectedLeaderCallback leader_cb);
+ SysCatalogTable(Master* master, ElectedLeaderCallback leader_cb);
~SysCatalogTable();
@@ -248,8 +247,6 @@ class SysCatalogTable {
MetricRegistry* metric_registry_;
- gscoped_ptr<ThreadPool> apply_pool_;
-
scoped_refptr<tablet::TabletReplica> tablet_replica_;
Master* master_;
http://git-wip-us.apache.org/repos/asf/kudu/blob/74f99e40/src/kudu/tserver/tablet_server.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server.cc b/src/kudu/tserver/tablet_server.cc
index 658ecfb..bf98057 100644
--- a/src/kudu/tserver/tablet_server.cc
+++ b/src/kudu/tserver/tablet_server.cc
@@ -46,7 +46,7 @@ TabletServer::TabletServer(const TabletServerOptions& opts)
initted_(false),
fail_heartbeats_for_tests_(false),
opts_(opts),
- tablet_manager_(new TSTabletManager(fs_manager_.get(), this, metric_registry())),
+ tablet_manager_(new TSTabletManager(this)),
scanner_manager_(new ScannerManager(metric_entity())),
path_handlers_(new TabletServerPathHandlers(this)),
maintenance_manager_(new MaintenanceManager(MaintenanceManager::kDefaultOptions)) {
http://git-wip-us.apache.org/repos/asf/kudu/blob/74f99e40/src/kudu/tserver/ts_tablet_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index 1553fca..866374b 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -54,7 +54,6 @@
#include "kudu/util/env_util.h"
#include "kudu/util/fault_injection.h"
#include "kudu/util/flag_tags.h"
-#include "kudu/util/metrics.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/trace.h"
@@ -104,29 +103,7 @@ TAG_FLAG(fault_crash_after_tc_files_fetched, unsafe);
namespace kudu {
namespace tserver {
-METRIC_DEFINE_histogram(server, op_apply_queue_length, "Operation Apply Queue Length",
- MetricUnit::kTasks,
- "Number of operations waiting to be applied to the tablet. "
- "High queue lengths indicate that the server is unable to process "
- "operations as fast as they are being written to the WAL.",
- 10000, 2);
-
-METRIC_DEFINE_histogram(server, op_apply_queue_time, "Operation Apply Queue Time",
- MetricUnit::kMicroseconds,
- "Time that operations spent waiting in the apply queue before being "
- "processed. High queue times indicate that the server is unable to "
- "process operations as fast as they are being written to the WAL.",
- 10000000, 2);
-
-METRIC_DEFINE_histogram(server, op_apply_run_time, "Operation Apply Run Time",
- MetricUnit::kMicroseconds,
- "Time that operations spent being applied to the tablet. "
- "High values may indicate that the server is under-provisioned or "
- "that operations consist of very large batches.",
- 10000000, 2);
-
using consensus::ConsensusMetadata;
-using consensus::ConsensusStatePB;
using consensus::OpId;
using consensus::RaftConfigPB;
using consensus::RaftPeerPB;
@@ -150,21 +127,11 @@ using tablet::TabletMetadata;
using tablet::TabletReplica;
using tserver::TabletCopyClient;
-TSTabletManager::TSTabletManager(FsManager* fs_manager,
- TabletServer* server,
- MetricRegistry* metric_registry)
- : fs_manager_(fs_manager),
+TSTabletManager::TSTabletManager(TabletServer* server)
+ : fs_manager_(server->fs_manager()),
server_(server),
- metric_registry_(metric_registry),
+ metric_registry_(server->metric_registry()),
state_(MANAGER_INITIALIZING) {
-
- CHECK_OK(ThreadPoolBuilder("apply").Build(&apply_pool_));
- apply_pool_->SetQueueLengthHistogram(
- METRIC_op_apply_queue_length.Instantiate(server_->metric_entity()));
- apply_pool_->SetQueueTimeMicrosHistogram(
- METRIC_op_apply_queue_time.Instantiate(server_->metric_entity()));
- apply_pool_->SetRunTimeMicrosHistogram(
- METRIC_op_apply_run_time.Instantiate(server_->metric_entity()));
}
TSTabletManager::~TSTabletManager() {
@@ -594,7 +561,7 @@ scoped_refptr<TabletReplica> TSTabletManager::CreateAndRegisterTabletReplica(
scoped_refptr<TabletReplica> replica(
new TabletReplica(meta,
local_peer_pb_,
- apply_pool_.get(),
+ server_->tablet_apply_pool(),
Bind(&TSTabletManager::MarkTabletDirty,
Unretained(this),
meta->tablet_id())));
@@ -877,9 +844,6 @@ void TSTabletManager::Shutdown() {
replica->Shutdown();
}
- // Shut down the apply pool.
- apply_pool_->Shutdown();
-
{
std::lock_guard<rw_spinlock> l(lock_);
// We don't expect anyone else to be modifying the map after we start the
http://git-wip-us.apache.org/repos/asf/kudu/blob/74f99e40/src/kudu/tserver/ts_tablet_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/ts_tablet_manager.h b/src/kudu/tserver/ts_tablet_manager.h
index 13fd87f..66f1b4d 100644
--- a/src/kudu/tserver/ts_tablet_manager.h
+++ b/src/kudu/tserver/ts_tablet_manager.h
@@ -78,10 +78,7 @@ class TransitionInProgressDeleter;
class TSTabletManager : public tserver::TabletReplicaLookupIf {
public:
// Construct the tablet manager.
- // 'fs_manager' must remain valid until this object is destructed.
- TSTabletManager(FsManager* fs_manager,
- TabletServer* server,
- MetricRegistry* metric_registry);
+ explicit TSTabletManager(TabletServer* server);
virtual ~TSTabletManager();
@@ -315,9 +312,6 @@ class TSTabletManager : public tserver::TabletReplicaLookupIf {
// Thread pool used to open the tablets async, whether bootstrap is required or not.
gscoped_ptr<ThreadPool> open_tablet_pool_;
- // Thread pool for apply transactions, shared between all tablets.
- gscoped_ptr<ThreadPool> apply_pool_;
-
DISALLOW_COPY_AND_ASSIGN(TSTabletManager);
};