You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2016/07/15 00:57:43 UTC
[1/6] incubator-kudu git commit: Disable exactly once semantics by
default and add a flag to enable it for tests
Repository: incubator-kudu
Updated Branches:
refs/heads/master b666cc07e -> 59ab14b9a
Disable exactly once semantics by default and add a flag to enable it for tests
Since exactly once semantics is still missing some pieces, like garbage collection
this disables it by default on the server, but adds a flag to allow enabling
it and enables it in all tablet server tests, by default.
Change-Id: I77096be608afb31194f62f04a946bd3f42537a35
Reviewed-on: http://gerrit.cloudera.org:8080/3506
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>
Tested-by: Kudu Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/257ba292
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/257ba292
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/257ba292
Branch: refs/heads/master
Commit: 257ba2923ad962193fbca49b3d22b9fb6a578509
Parents: b666cc0
Author: David Alves <da...@cloudera.com>
Authored: Mon Jun 27 02:09:09 2016 -0700
Committer: David Ribeiro Alves <dr...@apache.org>
Committed: Thu Jul 14 22:57:20 2016 +0000
----------------------------------------------------------------------
.../integration-tests/external_mini_cluster-itest-base.h | 2 ++
src/kudu/integration-tests/remote_bootstrap-itest.cc | 3 +++
src/kudu/integration-tests/ts_itest-base.h | 4 ++++
src/kudu/rpc/rpc-test-base.h | 3 +++
src/kudu/rpc/service_if.cc | 10 +++++++++-
src/kudu/tserver/tablet_server-test-base.h | 5 +++++
6 files changed, 26 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/257ba292/src/kudu/integration-tests/external_mini_cluster-itest-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/external_mini_cluster-itest-base.h b/src/kudu/integration-tests/external_mini_cluster-itest-base.h
index e2a9cf5..0fde8d4 100644
--- a/src/kudu/integration-tests/external_mini_cluster-itest-base.h
+++ b/src/kudu/integration-tests/external_mini_cluster-itest-base.h
@@ -77,6 +77,8 @@ void ExternalMiniClusterITestBase::StartCluster(const std::vector<std::string>&
opts.num_tablet_servers = num_tablet_servers;
opts.extra_master_flags = extra_master_flags;
opts.extra_tserver_flags = extra_ts_flags;
+ // TODO remove when this is enabled by default.
+ opts.extra_tserver_flags.push_back("--enable_exactly_once");
opts.extra_tserver_flags.push_back("--never_fsync"); // fsync causes flakiness on EC2.
cluster_.reset(new ExternalMiniCluster(opts));
ASSERT_OK(cluster_->Start());
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/257ba292/src/kudu/integration-tests/remote_bootstrap-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/remote_bootstrap-itest.cc b/src/kudu/integration-tests/remote_bootstrap-itest.cc
index e9bf883..661ed21 100644
--- a/src/kudu/integration-tests/remote_bootstrap-itest.cc
+++ b/src/kudu/integration-tests/remote_bootstrap-itest.cc
@@ -112,6 +112,9 @@ void RemoteBootstrapITest::StartCluster(const vector<string>& extra_tserver_flag
ExternalMiniClusterOptions opts;
opts.num_tablet_servers = num_tablet_servers;
opts.extra_tserver_flags = extra_tserver_flags;
+ // Enable EO semantics for tests.
+ // TODO remove this once EO is the default.
+ opts.extra_tserver_flags.push_back("--enable_exactly_once");
opts.extra_tserver_flags.push_back("--never_fsync"); // fsync causes flakiness on EC2.
opts.extra_master_flags = extra_master_flags;
cluster_.reset(new ExternalMiniCluster(opts));
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/257ba292/src/kudu/integration-tests/ts_itest-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/ts_itest-base.h b/src/kudu/integration-tests/ts_itest-base.h
index ad38b3e..225c942 100644
--- a/src/kudu/integration-tests/ts_itest-base.h
+++ b/src/kudu/integration-tests/ts_itest-base.h
@@ -97,6 +97,10 @@ class TabletServerIntegrationTestBase : public TabletServerTestBase {
opts.num_tablet_servers = FLAGS_num_tablet_servers;
opts.data_root = GetTestPath(data_root_path);
+ // Enable exactly once semantics for tests.
+ // TODO remove this once we have ResultTracker GC
+ opts.extra_tserver_flags.push_back("--enable_exactly_once");
+
// If the caller passed no flags use the default ones, where we stress consensus by setting
// low timeouts and frequent cache misses.
if (non_default_ts_flags.empty()) {
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/257ba292/src/kudu/rpc/rpc-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h
index ec0edc9..7b82b08 100644
--- a/src/kudu/rpc/rpc-test-base.h
+++ b/src/kudu/rpc/rpc-test-base.h
@@ -45,6 +45,8 @@
#include "kudu/util/test_util.h"
#include "kudu/util/trace.h"
+DECLARE_bool(enable_exactly_once);
+
namespace kudu { namespace rpc {
using kudu::rpc_test::AddRequestPB;
@@ -318,6 +320,7 @@ class RpcTestBase : public KuduTest {
n_server_reactor_threads_(3),
keepalive_time_ms_(1000),
metric_entity_(METRIC_ENTITY_server.Instantiate(&metric_registry_, "test.rpc_test")) {
+ FLAGS_enable_exactly_once = true;
}
void SetUp() override {
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/257ba292/src/kudu/rpc/service_if.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/service_if.cc b/src/kudu/rpc/service_if.cc
index 4a8c387..8b47ce9 100644
--- a/src/kudu/rpc/service_if.cc
+++ b/src/kudu/rpc/service_if.cc
@@ -27,6 +27,12 @@
#include "kudu/rpc/inbound_call.h"
#include "kudu/rpc/rpc_context.h"
#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/util/flag_tags.h"
+
+// TODO remove this once we have ResultTracker GC
+DEFINE_bool(enable_exactly_once, false, "Whether to enable exactly once semantics on the client "
+ "(experimental).");
+TAG_FLAG(enable_exactly_once, experimental);
using google::protobuf::Message;
using std::string;
@@ -92,7 +98,9 @@ void GeneratedServiceIf::Handle(InboundCall *call) {
}
Message* resp = method_info->resp_prototype->New();
- bool track_result = call->header().has_request_id() && method_info->track_result;
+ bool track_result = call->header().has_request_id()
+ && method_info->track_result
+ && FLAGS_enable_exactly_once;
RpcContext* ctx = new RpcContext(call,
req.release(),
resp,
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/257ba292/src/kudu/tserver/tablet_server-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server-test-base.h b/src/kudu/tserver/tablet_server-test-base.h
index 8f0b757..1d5022e 100644
--- a/src/kudu/tserver/tablet_server-test-base.h
+++ b/src/kudu/tserver/tablet_server-test-base.h
@@ -58,6 +58,7 @@
DEFINE_int32(rpc_timeout, 1000, "Timeout for RPC calls, in seconds");
DEFINE_int32(num_updater_threads, 1, "Number of updating threads to launch");
DECLARE_bool(log_force_fsync_all);
+DECLARE_bool(enable_exactly_once);
DECLARE_bool(enable_maintenance_manager);
DECLARE_bool(enable_data_block_fsync);
DECLARE_int32(heartbeat_rpc_timeout_ms);
@@ -80,6 +81,10 @@ class TabletServerTestBase : public KuduTest {
// maintenance operations at predetermined times.
FLAGS_enable_maintenance_manager = false;
+ // Enable exactly once semantics, for tests.
+ // TODO remove this once we have ResultTracker GC
+ FLAGS_enable_exactly_once = true;
+
// Decrease heartbeat timeout: we keep re-trying heartbeats when a
// single master server fails due to a network error. Decreasing
// the hearbeat timeout to 1 second speeds up unit tests which
[4/6] incubator-kudu git commit: Add a way to include request ids in
log-dump
Posted by mp...@apache.org.
Add a way to include request ids in log-dump
This adds a way to output the request ids, if they exist, in log-dump.cc.
This is helpful when debugging as it allows to see which writes the
request ids get associated with.
Change-Id: I88d7c65887a98544ee83b5b4bc0817bea7222131
Reviewed-on: http://gerrit.cloudera.org:8080/3612
Reviewed-by: David Ribeiro Alves <dr...@apache.org>
Tested-by: David Ribeiro Alves <dr...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/0412fa76
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/0412fa76
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/0412fa76
Branch: refs/heads/master
Commit: 0412fa7686d70720b7ac67b66ae85dc832963636
Parents: ee38093
Author: David Alves <da...@cloudera.com>
Authored: Mon Jul 4 14:58:53 2016 -0700
Committer: David Ribeiro Alves <dr...@apache.org>
Committed: Fri Jul 15 00:05:41 2016 +0000
----------------------------------------------------------------------
src/kudu/consensus/log-dump.cc | 12 ++++++++++--
1 file changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/0412fa76/src/kudu/consensus/log-dump.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log-dump.cc b/src/kudu/consensus/log-dump.cc
index e26d778..8bab8fc 100644
--- a/src/kudu/consensus/log-dump.cc
+++ b/src/kudu/consensus/log-dump.cc
@@ -28,6 +28,7 @@
#include "kudu/consensus/log_reader.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/numbers.h"
+#include "kudu/rpc/rpc_header.pb.h"
#include "kudu/util/env.h"
#include "kudu/util/flags.h"
#include "kudu/util/logging.h"
@@ -104,7 +105,8 @@ void PrintIdOnly(const LogEntryPB& entry) {
Status PrintDecodedWriteRequestPB(const string& indent,
const Schema& tablet_schema,
- const WriteRequestPB& write) {
+ const WriteRequestPB& write,
+ const rpc::RequestIdPB* request_id) {
Schema request_schema;
RETURN_NOT_OK(SchemaFromPB(write.schema(), &request_schema));
@@ -114,6 +116,8 @@ Status PrintDecodedWriteRequestPB(const string& indent,
RETURN_NOT_OK(dec.DecodeOperations(&ops));
cout << indent << "Tablet: " << write.tablet_id() << endl;
+ cout << indent << "RequestId: "
+ << (request_id ? request_id->ShortDebugString() : "None") << endl;
cout << indent << "Consistency: "
<< ExternalConsistencyMode_Name(write.external_consistency_mode()) << endl;
if (write.has_propagated_timestamp()) {
@@ -139,7 +143,11 @@ Status PrintDecoded(const LogEntryPB& entry, const Schema& tablet_schema) {
const ReplicateMsg& replicate = entry.replicate();
if (replicate.op_type() == consensus::WRITE_OP) {
- RETURN_NOT_OK(PrintDecodedWriteRequestPB(indent, tablet_schema, replicate.write_request()));
+ RETURN_NOT_OK(PrintDecodedWriteRequestPB(
+ indent,
+ tablet_schema,
+ replicate.write_request(),
+ replicate.has_request_id() ? &replicate.request_id() : nullptr));
} else {
cout << indent << replicate.ShortDebugString() << endl;
}
[6/6] incubator-kudu git commit: KUDU-1530: Update docs about OS X
build dependency on Xcode package
Posted by mp...@apache.org.
KUDU-1530: Update docs about OS X build dependency on Xcode package
Change-Id: I8962d5539f437dba8b120c70c90c1e384ed550c9
Reviewed-on: http://gerrit.cloudera.org:8080/3653
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Mike Percy <mp...@apache.org>
Reviewed-by: Mike Percy <mp...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/59ab14b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/59ab14b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/59ab14b9
Branch: refs/heads/master
Commit: 59ab14b9a8927dd08b34c3e4e6be9649f6e7dd9c
Parents: 6d2679b
Author: Dinesh Bhat <di...@cloudera.com>
Authored: Thu Jul 14 12:25:36 2016 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Fri Jul 15 00:57:01 2016 +0000
----------------------------------------------------------------------
docs/installation.adoc | 7 +++----
1 file changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/59ab14b9/docs/installation.adoc
----------------------------------------------------------------------
diff --git a/docs/installation.adoc b/docs/installation.adoc
index 2c3918c..c7a343f 100644
--- a/docs/installation.adoc
+++ b/docs/installation.adoc
@@ -471,10 +471,9 @@ make -j4
[[osx_from_source]]
=== OS X
-The Xcode toolchain is necessary for compiling Kudu. Use `xcode-select --install`
-to install the Xcode Command Line Tools if Xcode is not already installed. These
-instructions use link:http://brew.sh/[Homebrew] to install dependencies, but
-manual dependency installation is possible.
+The link:https://developer.apple.com/xcode/[Xcode] package is necessary for
+compiling Kudu. Some of the instructions below use link:http://brew.sh/[Homebrew]
+to install dependencies, but manual dependency installation is possible.
[WARNING]
.OS X Known Issues
[2/6] incubator-kudu git commit: Move the maintenance manager to util
Posted by mp...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/util/maintenance_manager.proto
----------------------------------------------------------------------
diff --git a/src/kudu/util/maintenance_manager.proto b/src/kudu/util/maintenance_manager.proto
new file mode 100644
index 0000000..04b4519
--- /dev/null
+++ b/src/kudu/util/maintenance_manager.proto
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package kudu;
+
+option java_package = "org.kududb";
+
+// Used to present the maintenance manager's internal state.
+message MaintenanceManagerStatusPB {
+ message MaintenanceOpPB {
+ required string name = 1;
+ // Number of times this operation is currently running.
+ required uint32 running = 2;
+ required bool runnable = 3;
+ required uint64 ram_anchored_bytes = 4;
+ required int64 logs_retained_bytes = 5;
+ required double perf_improvement = 6;
+ }
+
+ message CompletedOpPB {
+ required string name = 1;
+ required int32 duration_millis = 2;
+ // Number of seconds since this operation started.
+ required int32 secs_since_start = 3;
+ }
+
+ // The next operation that would run.
+ optional MaintenanceOpPB best_op = 1;
+
+ // List of all the operations.
+ repeated MaintenanceOpPB registered_operations = 2;
+
+ // This list isn't in order of anything. Can contain the same operation mutiple times.
+ repeated CompletedOpPB completed_operations = 3;
+}
\ No newline at end of file
[3/6] incubator-kudu git commit: Move the maintenance manager to util
Posted by mp...@apache.org.
Move the maintenance manager to util
This moves the maintenance manager to util, along with its required protobufs.
This move is required as we'll need a server-level MaintenanceMananager to
run garbage collection on the ResultTracker.
Change-Id: Ifcf072d443ac3d069bda15b9dc0f8c442b9ac5c0
Reviewed-on: http://gerrit.cloudera.org:8080/3656
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>
Tested-by: Kudu Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/ee380939
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/ee380939
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/ee380939
Branch: refs/heads/master
Commit: ee38093949fd7ccb773025e6ec7e8e60daa31149
Parents: 257ba29
Author: dralves <dr...@apache.org>
Authored: Thu Jul 14 14:29:26 2016 -0700
Committer: David Ribeiro Alves <dr...@apache.org>
Committed: Thu Jul 14 23:48:30 2016 +0000
----------------------------------------------------------------------
.../full_stack-insert-scan-test.cc | 2 +-
src/kudu/master/master.cc | 2 +-
src/kudu/tablet/CMakeLists.txt | 2 -
src/kudu/tablet/maintenance_manager-test.cc | 287 -------------
src/kudu/tablet/maintenance_manager.cc | 419 -------------------
src/kudu/tablet/maintenance_manager.h | 280 -------------
src/kudu/tablet/tablet.cc | 2 +-
src/kudu/tablet/tablet.proto | 29 --
src/kudu/tablet/tablet_mm_ops.h | 2 +-
src/kudu/tablet/tablet_peer-test.cc | 2 +-
src/kudu/tablet/tablet_peer_mm_ops.cc | 2 +-
src/kudu/tablet/tablet_peer_mm_ops.h | 2 +-
src/kudu/tserver/mini_tablet_server.cc | 2 +-
src/kudu/tserver/tablet_server-test-base.h | 2 +-
src/kudu/tserver/tablet_server.cc | 2 +-
src/kudu/tserver/tserver-path-handlers.cc | 8 +-
src/kudu/util/CMakeLists.txt | 17 +
src/kudu/util/maintenance_manager-test.cc | 286 +++++++++++++
src/kudu/util/maintenance_manager.cc | 415 ++++++++++++++++++
src/kudu/util/maintenance_manager.h | 277 ++++++++++++
src/kudu/util/maintenance_manager.proto | 48 +++
21 files changed, 1057 insertions(+), 1031 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/integration-tests/full_stack-insert-scan-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/full_stack-insert-scan-test.cc b/src/kudu/integration-tests/full_stack-insert-scan-test.cc
index 78f2f9d..5a1a61b 100644
--- a/src/kudu/integration-tests/full_stack-insert-scan-test.cc
+++ b/src/kudu/integration-tests/full_stack-insert-scan-test.cc
@@ -37,7 +37,6 @@
#include "kudu/gutil/strings/substitute.h"
#include "kudu/integration-tests/mini_cluster.h"
#include "kudu/master/mini_master.h"
-#include "kudu/tablet/maintenance_manager.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_metrics.h"
#include "kudu/tablet/tablet_peer.h"
@@ -47,6 +46,7 @@
#include "kudu/util/async_util.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/errno.h"
+#include "kudu/util/maintenance_manager.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/master/master.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/master.cc b/src/kudu/master/master.cc
index adb9533..c42d7f0 100644
--- a/src/kudu/master/master.cc
+++ b/src/kudu/master/master.cc
@@ -36,9 +36,9 @@
#include "kudu/rpc/service_if.h"
#include "kudu/rpc/service_pool.h"
#include "kudu/server/rpc_server.h"
-#include "kudu/tablet/maintenance_manager.h"
#include "kudu/tserver/tablet_service.h"
#include "kudu/util/flag_tags.h"
+#include "kudu/util/maintenance_manager.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/status.h"
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/tablet/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/CMakeLists.txt b/src/kudu/tablet/CMakeLists.txt
index e7e94c9..c6baa67 100644
--- a/src/kudu/tablet/CMakeLists.txt
+++ b/src/kudu/tablet/CMakeLists.txt
@@ -33,7 +33,6 @@ set(TABLET_SRCS
delta_key.cc
diskrowset.cc
lock_manager.cc
- maintenance_manager.cc
memrowset.cc
multi_column_writer.cc
mutation.cc
@@ -98,7 +97,6 @@ ADD_KUDU_TEST(cfile_set-test)
ADD_KUDU_TEST(tablet-pushdown-test)
ADD_KUDU_TEST(tablet-schema-test)
ADD_KUDU_TEST(tablet_bootstrap-test)
-ADD_KUDU_TEST(maintenance_manager-test)
ADD_KUDU_TEST(metadata-test)
ADD_KUDU_TEST(mvcc-test)
ADD_KUDU_TEST(compaction-test)
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/tablet/maintenance_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/maintenance_manager-test.cc b/src/kudu/tablet/maintenance_manager-test.cc
deleted file mode 100644
index 663ed47..0000000
--- a/src/kudu/tablet/maintenance_manager-test.cc
+++ /dev/null
@@ -1,287 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <gflags/gflags.h>
-#include <gtest/gtest.h>
-#include <memory>
-#include <mutex>
-#include <vector>
-
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/tablet/maintenance_manager.h"
-#include "kudu/tablet/tablet.pb.h"
-#include "kudu/util/mem_tracker.h"
-#include "kudu/util/metrics.h"
-#include "kudu/util/test_macros.h"
-#include "kudu/util/test_util.h"
-#include "kudu/util/thread.h"
-
-using kudu::tablet::MaintenanceManagerStatusPB;
-using std::shared_ptr;
-using std::vector;
-using strings::Substitute;
-
-METRIC_DEFINE_entity(test);
-METRIC_DEFINE_gauge_uint32(test, maintenance_ops_running,
- "Number of Maintenance Operations Running",
- kudu::MetricUnit::kMaintenanceOperations,
- "The number of background maintenance operations currently running.");
-METRIC_DEFINE_histogram(test, maintenance_op_duration,
- "Maintenance Operation Duration",
- kudu::MetricUnit::kSeconds, "", 60000000LU, 2);
-
-namespace kudu {
-
-const int kHistorySize = 4;
-
-class MaintenanceManagerTest : public KuduTest {
- public:
- MaintenanceManagerTest() {
- test_tracker_ = MemTracker::CreateTracker(1000, "test");
- MaintenanceManager::Options options;
- options.num_threads = 2;
- options.polling_interval_ms = 1;
- options.history_size = kHistorySize;
- options.parent_mem_tracker = test_tracker_;
- manager_.reset(new MaintenanceManager(options));
- manager_->Init();
- }
- ~MaintenanceManagerTest() {
- manager_->Shutdown();
- }
-
- protected:
- shared_ptr<MemTracker> test_tracker_;
- shared_ptr<MaintenanceManager> manager_;
-};
-
-// Just create the MaintenanceManager and then shut it down, to make sure
-// there are no race conditions there.
-TEST_F(MaintenanceManagerTest, TestCreateAndShutdown) {
-}
-
-enum TestMaintenanceOpState {
- OP_DISABLED,
- OP_RUNNABLE,
- OP_RUNNING,
- OP_FINISHED,
-};
-
-class TestMaintenanceOp : public MaintenanceOp {
- public:
- TestMaintenanceOp(const std::string& name,
- IOUsage io_usage,
- TestMaintenanceOpState state,
- const shared_ptr<MemTracker>& tracker)
- : MaintenanceOp(name, io_usage),
- state_change_cond_(&lock_),
- state_(state),
- consumption_(tracker, 500),
- logs_retained_bytes_(0),
- perf_improvement_(0),
- metric_entity_(METRIC_ENTITY_test.Instantiate(&metric_registry_, "test")),
- maintenance_op_duration_(METRIC_maintenance_op_duration.Instantiate(metric_entity_)),
- maintenance_ops_running_(METRIC_maintenance_ops_running.Instantiate(metric_entity_, 0)) {
- }
-
- virtual ~TestMaintenanceOp() {}
-
- virtual bool Prepare() OVERRIDE {
- std::lock_guard<Mutex> guard(lock_);
- if (state_ != OP_RUNNABLE) {
- return false;
- }
- state_ = OP_RUNNING;
- state_change_cond_.Broadcast();
- DLOG(INFO) << "Prepared op " << name();
- return true;
- }
-
- virtual void Perform() OVERRIDE {
- DLOG(INFO) << "Performing op " << name();
- std::lock_guard<Mutex> guard(lock_);
- CHECK_EQ(OP_RUNNING, state_);
- state_ = OP_FINISHED;
- state_change_cond_.Broadcast();
- }
-
- virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE {
- std::lock_guard<Mutex> guard(lock_);
- stats->set_runnable(state_ == OP_RUNNABLE);
- stats->set_ram_anchored(consumption_.consumption());
- stats->set_logs_retained_bytes(logs_retained_bytes_);
- stats->set_perf_improvement(perf_improvement_);
- }
-
- void Enable() {
- std::lock_guard<Mutex> guard(lock_);
- DCHECK((state_ == OP_DISABLED) || (state_ == OP_FINISHED));
- state_ = OP_RUNNABLE;
- state_change_cond_.Broadcast();
- }
-
- void WaitForState(TestMaintenanceOpState state) {
- std::lock_guard<Mutex> guard(lock_);
- while (true) {
- if (state_ == state) {
- return;
- }
- state_change_cond_.Wait();
- }
- }
-
- bool WaitForStateWithTimeout(TestMaintenanceOpState state, int ms) {
- MonoDelta to_wait = MonoDelta::FromMilliseconds(ms);
- std::lock_guard<Mutex> guard(lock_);
- while (true) {
- if (state_ == state) {
- return true;
- }
- if (!state_change_cond_.TimedWait(to_wait)) {
- return false;
- }
- }
- }
-
- void set_ram_anchored(uint64_t ram_anchored) {
- std::lock_guard<Mutex> guard(lock_);
- consumption_.Reset(ram_anchored);
- }
-
- void set_logs_retained_bytes(uint64_t logs_retained_bytes) {
- std::lock_guard<Mutex> guard(lock_);
- logs_retained_bytes_ = logs_retained_bytes;
- }
-
- void set_perf_improvement(uint64_t perf_improvement) {
- std::lock_guard<Mutex> guard(lock_);
- perf_improvement_ = perf_improvement;
- }
-
- virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE {
- return maintenance_op_duration_;
- }
-
- virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE {
- return maintenance_ops_running_;
- }
-
- private:
- Mutex lock_;
- ConditionVariable state_change_cond_;
- enum TestMaintenanceOpState state_;
- ScopedTrackedConsumption consumption_;
- uint64_t logs_retained_bytes_;
- uint64_t perf_improvement_;
- MetricRegistry metric_registry_;
- scoped_refptr<MetricEntity> metric_entity_;
- scoped_refptr<Histogram> maintenance_op_duration_;
- scoped_refptr<AtomicGauge<uint32_t> > maintenance_ops_running_;
-};
-
-// Create an op and wait for it to start running. Unregister it while it is
-// running and verify that UnregisterOp waits for it to finish before
-// proceeding.
-TEST_F(MaintenanceManagerTest, TestRegisterUnregister) {
- TestMaintenanceOp op1("1", MaintenanceOp::HIGH_IO_USAGE, OP_DISABLED, test_tracker_);
- op1.set_ram_anchored(1001);
- manager_->RegisterOp(&op1);
- scoped_refptr<kudu::Thread> thread;
- CHECK_OK(Thread::Create("TestThread", "TestRegisterUnregister",
- boost::bind(&TestMaintenanceOp::Enable, &op1), &thread));
- op1.WaitForState(OP_FINISHED);
- manager_->UnregisterOp(&op1);
- ThreadJoiner(thread.get()).Join();
-}
-
-// Test that we'll run an operation that doesn't improve performance when memory
-// pressure gets high.
-TEST_F(MaintenanceManagerTest, TestMemoryPressure) {
- TestMaintenanceOp op("op", MaintenanceOp::HIGH_IO_USAGE, OP_RUNNABLE, test_tracker_);
- op.set_ram_anchored(100);
- manager_->RegisterOp(&op);
-
- // At first, we don't want to run this, since there is no perf_improvement.
- CHECK_EQ(false, op.WaitForStateWithTimeout(OP_FINISHED, 20));
-
- // set the ram_anchored by the high mem op so high that we'll have to run it.
- scoped_refptr<kudu::Thread> thread;
- CHECK_OK(Thread::Create("TestThread", "MaintenanceManagerTest",
- boost::bind(&TestMaintenanceOp::set_ram_anchored, &op, 1100), &thread));
- op.WaitForState(OP_FINISHED);
- manager_->UnregisterOp(&op);
- ThreadJoiner(thread.get()).Join();
-}
-
-// Test that ops are prioritized correctly when we add log retention.
-TEST_F(MaintenanceManagerTest, TestLogRetentionPrioritization) {
- manager_->Shutdown();
-
- TestMaintenanceOp op1("op1", MaintenanceOp::LOW_IO_USAGE, OP_RUNNABLE, test_tracker_);
- op1.set_ram_anchored(0);
- op1.set_logs_retained_bytes(100);
-
- TestMaintenanceOp op2("op2", MaintenanceOp::HIGH_IO_USAGE, OP_RUNNABLE, test_tracker_);
- op2.set_ram_anchored(100);
- op2.set_logs_retained_bytes(100);
-
- TestMaintenanceOp op3("op3", MaintenanceOp::HIGH_IO_USAGE, OP_RUNNABLE, test_tracker_);
- op3.set_ram_anchored(200);
- op3.set_logs_retained_bytes(100);
-
- manager_->RegisterOp(&op1);
- manager_->RegisterOp(&op2);
- manager_->RegisterOp(&op3);
-
- // We want to do the low IO op first since it clears up some log retention.
- ASSERT_EQ(&op1, manager_->FindBestOp());
-
- manager_->UnregisterOp(&op1);
-
- // Low IO is taken care of, now we find the op clears the most log retention and ram.
- ASSERT_EQ(&op3, manager_->FindBestOp());
-
- manager_->UnregisterOp(&op3);
-
- ASSERT_EQ(&op2, manager_->FindBestOp());
-
- manager_->UnregisterOp(&op2);
-}
-
-// Test adding operations and make sure that the history of recently completed operations
-// is correct in that it wraps around and doesn't grow.
-TEST_F(MaintenanceManagerTest, TestCompletedOpsHistory) {
- for (int i = 0; i < 5; i++) {
- string name = Substitute("op$0", i);
- TestMaintenanceOp op(name, MaintenanceOp::HIGH_IO_USAGE, OP_RUNNABLE, test_tracker_);
- op.set_perf_improvement(1);
- op.set_ram_anchored(100);
- manager_->RegisterOp(&op);
-
- CHECK_EQ(true, op.WaitForStateWithTimeout(OP_FINISHED, 200));
- manager_->UnregisterOp(&op);
-
- MaintenanceManagerStatusPB status_pb;
- manager_->GetMaintenanceManagerStatusDump(&status_pb);
- // The size should be at most the history_size.
- ASSERT_GE(kHistorySize, status_pb.completed_operations_size());
- // See that we have the right name, even if we wrap around.
- ASSERT_EQ(name, status_pb.completed_operations(i % 4).name());
- }
-}
-
-} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/tablet/maintenance_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/maintenance_manager.cc b/src/kudu/tablet/maintenance_manager.cc
deleted file mode 100644
index b611e10..0000000
--- a/src/kudu/tablet/maintenance_manager.cc
+++ /dev/null
@@ -1,419 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "kudu/tablet/maintenance_manager.h"
-
-#include <gflags/gflags.h>
-#include <memory>
-#include <stdint.h>
-#include <string>
-#include <utility>
-
-#include "kudu/gutil/stringprintf.h"
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/util/debug/trace_event.h"
-#include "kudu/util/debug/trace_logging.h"
-#include "kudu/util/flag_tags.h"
-#include "kudu/util/mem_tracker.h"
-#include "kudu/util/metrics.h"
-#include "kudu/util/stopwatch.h"
-#include "kudu/util/thread.h"
-
-using std::pair;
-using std::shared_ptr;
-using strings::Substitute;
-
-DEFINE_int32(maintenance_manager_num_threads, 1,
- "Size of the maintenance manager thread pool. Beyond a value of '1', one thread is "
- "reserved for emergency flushes. For spinning disks, the number of threads should "
- "not be above the number of devices.");
-TAG_FLAG(maintenance_manager_num_threads, stable);
-
-DEFINE_int32(maintenance_manager_polling_interval_ms, 250,
- "Polling interval for the maintenance manager scheduler, "
- "in milliseconds.");
-TAG_FLAG(maintenance_manager_polling_interval_ms, hidden);
-
-DEFINE_int32(maintenance_manager_history_size, 8,
- "Number of completed operations the manager is keeping track of.");
-TAG_FLAG(maintenance_manager_history_size, hidden);
-
-DEFINE_bool(enable_maintenance_manager, true,
- "Enable the maintenance manager, runs compaction and tablet cleaning tasks.");
-TAG_FLAG(enable_maintenance_manager, unsafe);
-
-namespace kudu {
-
-using kudu::tablet::MaintenanceManagerStatusPB;
-using kudu::tablet::MaintenanceManagerStatusPB_CompletedOpPB;
-using kudu::tablet::MaintenanceManagerStatusPB_MaintenanceOpPB;
-
-MaintenanceOpStats::MaintenanceOpStats() {
- Clear();
-}
-
-void MaintenanceOpStats::Clear() {
- valid_ = false;
- runnable_ = false;
- ram_anchored_ = 0;
- logs_retained_bytes_ = 0;
- perf_improvement_ = 0;
-}
-
-MaintenanceOp::MaintenanceOp(std::string name, IOUsage io_usage)
- : name_(std::move(name)), running_(0), io_usage_(io_usage) {}
-
-MaintenanceOp::~MaintenanceOp() {
- CHECK(!manager_.get()) << "You must unregister the " << name_
- << " Op before destroying it.";
-}
-
-void MaintenanceOp::Unregister() {
- CHECK(manager_.get()) << "Op " << name_ << " was never registered.";
- manager_->UnregisterOp(this);
-}
-
-const MaintenanceManager::Options MaintenanceManager::DEFAULT_OPTIONS = {
- 0,
- 0,
- 0,
- shared_ptr<MemTracker>(),
-};
-
-MaintenanceManager::MaintenanceManager(const Options& options)
- : num_threads_(options.num_threads <= 0 ?
- FLAGS_maintenance_manager_num_threads : options.num_threads),
- cond_(&lock_),
- shutdown_(false),
- running_ops_(0),
- polling_interval_ms_(options.polling_interval_ms <= 0 ?
- FLAGS_maintenance_manager_polling_interval_ms :
- options.polling_interval_ms),
- completed_ops_count_(0),
- parent_mem_tracker_(!options.parent_mem_tracker ?
- MemTracker::GetRootTracker() : options.parent_mem_tracker) {
- CHECK_OK(ThreadPoolBuilder("MaintenanceMgr").set_min_threads(num_threads_)
- .set_max_threads(num_threads_).Build(&thread_pool_));
- uint32_t history_size = options.history_size == 0 ?
- FLAGS_maintenance_manager_history_size :
- options.history_size;
- completed_ops_.resize(history_size);
-}
-
-MaintenanceManager::~MaintenanceManager() {
- Shutdown();
-}
-
-Status MaintenanceManager::Init() {
- RETURN_NOT_OK(Thread::Create("maintenance", "maintenance_scheduler",
- boost::bind(&MaintenanceManager::RunSchedulerThread, this),
- &monitor_thread_));
- return Status::OK();
-}
-
-void MaintenanceManager::Shutdown() {
- {
- std::lock_guard<Mutex> guard(lock_);
- if (shutdown_) {
- return;
- }
- shutdown_ = true;
- cond_.Broadcast();
- }
- if (monitor_thread_.get()) {
- CHECK_OK(ThreadJoiner(monitor_thread_.get()).Join());
- monitor_thread_.reset();
- thread_pool_->Shutdown();
- }
-}
-
-void MaintenanceManager::RegisterOp(MaintenanceOp* op) {
- std::lock_guard<Mutex> guard(lock_);
- CHECK(!op->manager_.get()) << "Tried to register " << op->name()
- << ", but it was already registered.";
- pair<OpMapTy::iterator, bool> val
- (ops_.insert(OpMapTy::value_type(op, MaintenanceOpStats())));
- CHECK(val.second)
- << "Tried to register " << op->name()
- << ", but it already exists in ops_.";
- op->manager_ = shared_from_this();
- op->cond_.reset(new ConditionVariable(&lock_));
- VLOG_AND_TRACE("maintenance", 1) << "Registered " << op->name();
-}
-
-void MaintenanceManager::UnregisterOp(MaintenanceOp* op) {
- {
- std::lock_guard<Mutex> guard(lock_);
- CHECK(op->manager_.get() == this) << "Tried to unregister " << op->name()
- << ", but it is not currently registered with this maintenance manager.";
- auto iter = ops_.find(op);
- CHECK(iter != ops_.end()) << "Tried to unregister " << op->name()
- << ", but it was never registered";
- // While the op is running, wait for it to be finished.
- if (iter->first->running_ > 0) {
- VLOG_AND_TRACE("maintenance", 1) << "Waiting for op " << op->name() << " to finish so "
- << "we can unregister it.";
- }
- while (iter->first->running_ > 0) {
- op->cond_->Wait();
- iter = ops_.find(op);
- CHECK(iter != ops_.end()) << "Tried to unregister " << op->name()
- << ", but another thread unregistered it while we were "
- << "waiting for it to complete";
- }
- ops_.erase(iter);
- }
- LOG(INFO) << "Unregistered op " << op->name();
- op->cond_.reset();
- // Remove the op's shared_ptr reference to us. This might 'delete this'.
- op->manager_.reset();
-}
-
-void MaintenanceManager::RunSchedulerThread() {
- MonoDelta polling_interval = MonoDelta::FromMilliseconds(polling_interval_ms_);
-
- std::unique_lock<Mutex> guard(lock_);
- while (true) {
- // Loop until we are shutting down or it is time to run another op.
- cond_.TimedWait(polling_interval);
- if (shutdown_) {
- VLOG_AND_TRACE("maintenance", 1) << "Shutting down maintenance manager.";
- return;
- }
-
- // Find the best op.
- MaintenanceOp* op = FindBestOp();
- if (!op) {
- VLOG_AND_TRACE("maintenance", 2) << "No maintenance operations look worth doing.";
- continue;
- }
-
- // Prepare the maintenance operation.
- op->running_++;
- running_ops_++;
- guard.unlock();
- bool ready = op->Prepare();
- guard.lock();
- if (!ready) {
- LOG(INFO) << "Prepare failed for " << op->name()
- << ". Re-running scheduler.";
- op->running_--;
- op->cond_->Signal();
- continue;
- }
-
- // Run the maintenance operation.
- Status s = thread_pool_->SubmitFunc(boost::bind(
- &MaintenanceManager::LaunchOp, this, op));
- CHECK(s.ok());
- }
-}
-
-// Finding the best operation goes through four filters:
-// - If there's an Op that we can run quickly that frees log retention, we run it.
-// - If we've hit the overall process memory limit (note: this includes memory that the Ops cannot
-// free), we run the Op with the highest RAM usage.
-// - If there are Ops that retain logs, we run the one that has the highest retention (and if many
-// qualify, then we run the one that also frees up the most RAM).
-// - Finally, if there's nothing else that we really need to do, we run the Op that will improve
-// performance the most.
-//
-// The reason it's done this way is that we want to prioritize limiting the amount of resources we
-// hold on to. Low IO Ops go first since we can quickly run them, then we can look at memory usage.
-// Reversing those can starve the low IO Ops when the system is under intense memory pressure.
-//
-// In the third priority we're at a point where nothing's urgent and there's nothing we can run
-// quickly.
-// TODO We currently optimize for freeing log retention but we could consider having some sort of
-// sliding priority between log retention and RAM usage. For example, is an Op that frees
-// 128MB of log retention and 12MB of RAM always better than an op that frees 12MB of log retention
-// and 128MB of RAM? Maybe a more holistic approach would be better.
-MaintenanceOp* MaintenanceManager::FindBestOp() {
- TRACE_EVENT0("maintenance", "MaintenanceManager::FindBestOp");
-
- if (!FLAGS_enable_maintenance_manager) {
- VLOG_AND_TRACE("maintenance", 1) << "Maintenance manager is disabled. Doing nothing";
- return nullptr;
- }
- size_t free_threads = num_threads_ - running_ops_;
- if (free_threads == 0) {
- VLOG_AND_TRACE("maintenance", 1) << "there are no free threads, so we can't run anything.";
- return nullptr;
- }
-
- int64_t low_io_most_logs_retained_bytes = 0;
- MaintenanceOp* low_io_most_logs_retained_bytes_op = nullptr;
-
- uint64_t most_mem_anchored = 0;
- MaintenanceOp* most_mem_anchored_op = nullptr;
-
- int64_t most_logs_retained_bytes = 0;
- int64_t most_logs_retained_bytes_ram_anchored = 0;
- MaintenanceOp* most_logs_retained_bytes_op = nullptr;
-
- double best_perf_improvement = 0;
- MaintenanceOp* best_perf_improvement_op = nullptr;
- for (OpMapTy::value_type &val : ops_) {
- MaintenanceOp* op(val.first);
- MaintenanceOpStats& stats(val.second);
- // Update op stats.
- stats.Clear();
- op->UpdateStats(&stats);
- if (!stats.valid() || !stats.runnable()) {
- continue;
- }
- if (stats.logs_retained_bytes() > low_io_most_logs_retained_bytes &&
- op->io_usage_ == MaintenanceOp::LOW_IO_USAGE) {
- low_io_most_logs_retained_bytes_op = op;
- low_io_most_logs_retained_bytes = stats.logs_retained_bytes();
- }
-
- if (stats.ram_anchored() > most_mem_anchored) {
- most_mem_anchored_op = op;
- most_mem_anchored = stats.ram_anchored();
- }
- // We prioritize ops that can free more logs, but when it's the same we pick the one that
- // also frees up the most memory.
- if (stats.logs_retained_bytes() > 0 &&
- (stats.logs_retained_bytes() > most_logs_retained_bytes ||
- (stats.logs_retained_bytes() == most_logs_retained_bytes &&
- stats.ram_anchored() > most_logs_retained_bytes_ram_anchored))) {
- most_logs_retained_bytes_op = op;
- most_logs_retained_bytes = stats.logs_retained_bytes();
- most_logs_retained_bytes_ram_anchored = stats.ram_anchored();
- }
- if ((!best_perf_improvement_op) ||
- (stats.perf_improvement() > best_perf_improvement)) {
- best_perf_improvement_op = op;
- best_perf_improvement = stats.perf_improvement();
- }
- }
-
- // Look at ops that we can run quickly that free up log retention.
- if (low_io_most_logs_retained_bytes_op) {
- if (low_io_most_logs_retained_bytes > 0) {
- VLOG_AND_TRACE("maintenance", 1)
- << "Performing " << low_io_most_logs_retained_bytes_op->name() << ", "
- << "because it can free up more logs "
- << "at " << low_io_most_logs_retained_bytes
- << " bytes with a low IO cost";
- return low_io_most_logs_retained_bytes_op;
- }
- }
-
- // Look at free memory. If it is dangerously low, we must select something
- // that frees memory-- the op with the most anchored memory.
- double capacity_pct;
- if (parent_mem_tracker_->AnySoftLimitExceeded(&capacity_pct)) {
- if (!most_mem_anchored_op) {
- string msg = StringPrintf("we have exceeded our soft memory limit "
- "(current capacity is %.2f%%). However, there are no ops currently "
- "runnable which would free memory.", capacity_pct);
- LOG(INFO) << msg;
- return nullptr;
- }
- VLOG_AND_TRACE("maintenance", 1) << "we have exceeded our soft memory limit "
- << "(current capacity is " << capacity_pct << "%). Running the op "
- << "which anchors the most memory: " << most_mem_anchored_op->name();
- return most_mem_anchored_op;
- }
-
- if (most_logs_retained_bytes_op) {
- VLOG_AND_TRACE("maintenance", 1)
- << "Performing " << most_logs_retained_bytes_op->name() << ", "
- << "because it can free up more logs " << "at " << most_logs_retained_bytes
- << " bytes";
- return most_logs_retained_bytes_op;
- }
-
- if (best_perf_improvement_op) {
- if (best_perf_improvement > 0) {
- VLOG_AND_TRACE("maintenance", 1) << "Performing " << best_perf_improvement_op->name() << ", "
- << "because it had the best perf_improvement score, "
- << "at " << best_perf_improvement;
- return best_perf_improvement_op;
- }
- }
- return nullptr;
-}
-
-void MaintenanceManager::LaunchOp(MaintenanceOp* op) {
- MonoTime start_time(MonoTime::Now(MonoTime::FINE));
- op->RunningGauge()->Increment();
- LOG_TIMING(INFO, Substitute("running $0", op->name())) {
- TRACE_EVENT1("maintenance", "MaintenanceManager::LaunchOp",
- "name", op->name());
- op->Perform();
- }
- op->RunningGauge()->Decrement();
- MonoTime end_time(MonoTime::Now(MonoTime::FINE));
- MonoDelta delta(end_time.GetDeltaSince(start_time));
- std::lock_guard<Mutex> guard(lock_);
-
- CompletedOp& completed_op = completed_ops_[completed_ops_count_ % completed_ops_.size()];
- completed_op.name = op->name();
- completed_op.duration = delta;
- completed_op.start_mono_time = start_time;
- completed_ops_count_++;
-
- op->DurationHistogram()->Increment(delta.ToMilliseconds());
-
- running_ops_--;
- op->running_--;
- op->cond_->Signal();
-}
-
-void MaintenanceManager::GetMaintenanceManagerStatusDump(MaintenanceManagerStatusPB* out_pb) {
- DCHECK(out_pb != nullptr);
- std::lock_guard<Mutex> guard(lock_);
- MaintenanceOp* best_op = FindBestOp();
- for (MaintenanceManager::OpMapTy::value_type& val : ops_) {
- MaintenanceManagerStatusPB_MaintenanceOpPB* op_pb = out_pb->add_registered_operations();
- MaintenanceOp* op(val.first);
- MaintenanceOpStats& stat(val.second);
- op_pb->set_name(op->name());
- op_pb->set_running(op->running());
- if (stat.valid()) {
- op_pb->set_runnable(stat.runnable());
- op_pb->set_ram_anchored_bytes(stat.ram_anchored());
- op_pb->set_logs_retained_bytes(stat.logs_retained_bytes());
- op_pb->set_perf_improvement(stat.perf_improvement());
- } else {
- op_pb->set_runnable(false);
- op_pb->set_ram_anchored_bytes(0);
- op_pb->set_logs_retained_bytes(0);
- op_pb->set_perf_improvement(0.0);
- }
-
- if (best_op == op) {
- out_pb->mutable_best_op()->CopyFrom(*op_pb);
- }
- }
-
- for (const CompletedOp& completed_op : completed_ops_) {
- if (!completed_op.name.empty()) {
- MaintenanceManagerStatusPB_CompletedOpPB* completed_pb = out_pb->add_completed_operations();
- completed_pb->set_name(completed_op.name);
- completed_pb->set_duration_millis(completed_op.duration.ToMilliseconds());
-
- MonoDelta delta(MonoTime::Now(MonoTime::FINE).GetDeltaSince(completed_op.start_mono_time));
- completed_pb->set_secs_since_start(delta.ToSeconds());
- }
- }
-}
-
-} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/tablet/maintenance_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/maintenance_manager.h b/src/kudu/tablet/maintenance_manager.h
deleted file mode 100644
index 1d1a609..0000000
--- a/src/kudu/tablet/maintenance_manager.h
+++ /dev/null
@@ -1,280 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#ifndef KUDU_TABLET_MAINTENANCE_MANAGER_H
-#define KUDU_TABLET_MAINTENANCE_MANAGER_H
-
-#include <stdint.h>
-
-#include <map>
-#include <memory>
-#include <set>
-#include <string>
-#include <vector>
-
-#include "kudu/gutil/macros.h"
-#include "kudu/tablet/mvcc.h"
-#include "kudu/tablet/tablet.pb.h"
-#include "kudu/util/condition_variable.h"
-#include "kudu/util/monotime.h"
-#include "kudu/util/mutex.h"
-#include "kudu/util/countdown_latch.h"
-#include "kudu/util/thread.h"
-#include "kudu/util/threadpool.h"
-
-namespace kudu {
-
-template<class T>
-class AtomicGauge;
-class Histogram;
-class MaintenanceManager;
-class MemTracker;
-
-class MaintenanceOpStats {
- public:
- MaintenanceOpStats();
-
- // Zero all stats. They are invalid until the first setter is called.
- void Clear();
-
- bool runnable() const {
- DCHECK(valid_);
- return runnable_;
- }
-
- void set_runnable(bool runnable) {
- UpdateLastModified();
- runnable_ = runnable;
- }
-
- uint64_t ram_anchored() const {
- DCHECK(valid_);
- return ram_anchored_;
- }
-
- void set_ram_anchored(uint64_t ram_anchored) {
- UpdateLastModified();
- ram_anchored_ = ram_anchored;
- }
-
- int64_t logs_retained_bytes() const {
- DCHECK(valid_);
- return logs_retained_bytes_;
- }
-
- void set_logs_retained_bytes(int64_t logs_retained_bytes) {
- UpdateLastModified();
- logs_retained_bytes_ = logs_retained_bytes;
- }
-
- double perf_improvement() const {
- DCHECK(valid_);
- return perf_improvement_;
- }
-
- void set_perf_improvement(double perf_improvement) {
- UpdateLastModified();
- perf_improvement_ = perf_improvement;
- }
-
- const MonoTime& last_modified() const {
- DCHECK(valid_);
- return last_modified_;
- }
-
- bool valid() const {
- return valid_;
- }
-
- private:
- void UpdateLastModified() {
- valid_ = true;
- last_modified_ = MonoTime::Now(MonoTime::FINE);
- }
-
- // True if these stats are valid.
- bool valid_;
-
- // True if this op can be run now.
- bool runnable_;
-
- // The approximate amount of memory that not doing this operation keeps
- // around. This number is used to decide when to start freeing memory, so it
- // should be fairly accurate. May be 0.
- uint64_t ram_anchored_;
-
- // The approximate amount of disk space that not doing this operation keeps us from GCing from
- // the logs. May be 0.
- int64_t logs_retained_bytes_;
-
- // The estimated performance improvement-- how good it is to do this on some
- // absolute scale (yet TBD).
- double perf_improvement_;
-
- // The last time that the stats were modified.
- MonoTime last_modified_;
-};
-
-// MaintenanceOp objects represent background operations that the
-// MaintenanceManager can schedule. Once a MaintenanceOp is registered, the
-// manager will periodically poll it for statistics. The registrant is
-// responsible for managing the memory associated with the MaintenanceOp object.
-// Op objects should be unregistered before being de-allocated.
-class MaintenanceOp {
- public:
- friend class MaintenanceManager;
-
- // General indicator of how much IO the Op will use.
- enum IOUsage {
- LOW_IO_USAGE, // Low impact operations like removing a file, updating metadata.
- HIGH_IO_USAGE // Everything else.
- };
-
- explicit MaintenanceOp(std::string name, IOUsage io_usage);
- virtual ~MaintenanceOp();
-
- // Unregister this op, if it is currently registered.
- void Unregister();
-
- // Update the op statistics. This will be called every scheduling period
- // (about a few times a second), so it should not be too expensive. It's
- // possible for the returned statistics to be invalid; the caller should
- // call MaintenanceOpStats::valid() before using them. This will be run
- // under the MaintenanceManager lock.
- virtual void UpdateStats(MaintenanceOpStats* stats) = 0;
-
- // Prepare to perform the operation. This will be run without holding the
- // maintenance manager lock. It should be short, since it is run from the
- // context of the maintenance op scheduler thread rather than a worker thread.
- // If this returns false, we will abort the operation.
- virtual bool Prepare() = 0;
-
- // Perform the operation. This will be run without holding the maintenance
- // manager lock, and may take a long time.
- virtual void Perform() = 0;
-
- // Returns the histogram for this op that tracks duration. Cannot be NULL.
- virtual scoped_refptr<Histogram> DurationHistogram() const = 0;
-
- // Returns the gauge for this op that tracks when this op is running. Cannot be NULL.
- virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const = 0;
-
- uint32_t running() { return running_; }
-
- std::string name() const { return name_; }
-
- IOUsage io_usage() const { return io_usage_; }
-
- private:
- DISALLOW_COPY_AND_ASSIGN(MaintenanceOp);
-
- // The name of the operation. Op names must be unique.
- const std::string name_;
-
- // The number of times that this op is currently running.
- uint32_t running_;
-
- // Condition variable which the UnregisterOp function can wait on.
- //
- // Note: 'cond_' is used with the MaintenanceManager's mutex. As such,
- // it only exists when the op is registered.
- gscoped_ptr<ConditionVariable> cond_;
-
- // The MaintenanceManager with which this op is registered, or null
- // if it is not registered.
- std::shared_ptr<MaintenanceManager> manager_;
-
- IOUsage io_usage_;
-};
-
-struct MaintenanceOpComparator {
- bool operator() (const MaintenanceOp* lhs,
- const MaintenanceOp* rhs) const {
- return lhs->name().compare(rhs->name()) < 0;
- }
-};
-
-// Holds the information regarding a recently completed operation.
-struct CompletedOp {
- std::string name;
- MonoDelta duration;
- MonoTime start_mono_time;
-};
-
-// The MaintenanceManager manages the scheduling of background operations such
-// as flushes or compactions. It runs these operations in the background, in a
-// thread pool. It uses information provided in MaintenanceOpStats objects to
-// decide which operations, if any, to run.
-class MaintenanceManager : public std::enable_shared_from_this<MaintenanceManager> {
- public:
- struct Options {
- int32_t num_threads;
- int32_t polling_interval_ms;
- uint32_t history_size;
- std::shared_ptr<MemTracker> parent_mem_tracker;
- };
-
- explicit MaintenanceManager(const Options& options);
- ~MaintenanceManager();
-
- Status Init();
- void Shutdown();
-
- // Register an op with the manager.
- void RegisterOp(MaintenanceOp* op);
-
- // Unregister an op with the manager.
- // If the Op is currently running, it will not be interrupted. However, this
- // function will block until the Op is finished.
- void UnregisterOp(MaintenanceOp* op);
-
- void GetMaintenanceManagerStatusDump(tablet::MaintenanceManagerStatusPB* out_pb);
-
- static const Options DEFAULT_OPTIONS;
-
- private:
- FRIEND_TEST(MaintenanceManagerTest, TestLogRetentionPrioritization);
- typedef std::map<MaintenanceOp*, MaintenanceOpStats,
- MaintenanceOpComparator> OpMapTy;
-
- void RunSchedulerThread();
-
- // find the best op, or null if there is nothing we want to run
- MaintenanceOp* FindBestOp();
-
- void LaunchOp(MaintenanceOp* op);
-
- const int32_t num_threads_;
- OpMapTy ops_; // registered operations
- Mutex lock_;
- scoped_refptr<kudu::Thread> monitor_thread_;
- gscoped_ptr<ThreadPool> thread_pool_;
- ConditionVariable cond_;
- bool shutdown_;
- uint64_t running_ops_;
- int32_t polling_interval_ms_;
- // Vector used as a circular buffer for recently completed ops. Elements need to be added at
- // the completed_ops_count_ % the vector's size and then the count needs to be incremented.
- std::vector<CompletedOp> completed_ops_;
- int64_t completed_ops_count_;
- std::shared_ptr<MemTracker> parent_mem_tracker_;
-
- DISALLOW_COPY_AND_ASSIGN(MaintenanceManager);
-};
-
-} // namespace kudu
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/tablet/tablet.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index ff63aef..383e739 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -45,7 +45,6 @@
#include "kudu/tablet/compaction_policy.h"
#include "kudu/tablet/delta_compaction.h"
#include "kudu/tablet/diskrowset.h"
-#include "kudu/tablet/maintenance_manager.h"
#include "kudu/tablet/row_op.h"
#include "kudu/tablet/rowset_info.h"
#include "kudu/tablet/rowset_tree.h"
@@ -61,6 +60,7 @@
#include "kudu/util/flag_tags.h"
#include "kudu/util/locks.h"
#include "kudu/util/logging.h"
+#include "kudu/util/maintenance_manager.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/metrics.h"
#include "kudu/util/stopwatch.h"
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/tablet/tablet.proto
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet.proto b/src/kudu/tablet/tablet.proto
index ae02e35..18ebe50 100644
--- a/src/kudu/tablet/tablet.proto
+++ b/src/kudu/tablet/tablet.proto
@@ -96,32 +96,3 @@ message TabletStatusPB {
optional PartitionPB partition = 9;
optional int64 estimated_on_disk_size = 7;
}
-
-// Used to present the maintenance manager's internal state.
-message MaintenanceManagerStatusPB {
- message MaintenanceOpPB {
- required string name = 1;
- // Number of times this operation is currently running.
- required uint32 running = 2;
- required bool runnable = 3;
- required uint64 ram_anchored_bytes = 4;
- required int64 logs_retained_bytes = 5;
- required double perf_improvement = 6;
- }
-
- message CompletedOpPB {
- required string name = 1;
- required int32 duration_millis = 2;
- // Number of seconds since this operation started.
- required int32 secs_since_start = 3;
- }
-
- // The next operation that would run.
- optional MaintenanceOpPB best_op = 1;
-
- // List of all the operations.
- repeated MaintenanceOpPB registered_operations = 2;
-
- // This list isn't in order of anything. Can contain the same operation mutiple times.
- repeated CompletedOpPB completed_operations = 3;
-}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/tablet/tablet_mm_ops.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_mm_ops.h b/src/kudu/tablet/tablet_mm_ops.h
index cecf443..f4ee1c3 100644
--- a/src/kudu/tablet/tablet_mm_ops.h
+++ b/src/kudu/tablet/tablet_mm_ops.h
@@ -18,7 +18,7 @@
#ifndef KUDU_TABLET_TABLET_MM_OPS_H_
#define KUDU_TABLET_TABLET_MM_OPS_H_
-#include "kudu/tablet/maintenance_manager.h"
+#include "kudu/util/maintenance_manager.h"
namespace kudu {
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/tablet/tablet_peer-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_peer-test.cc b/src/kudu/tablet/tablet_peer-test.cc
index ca43117..b1c03ef 100644
--- a/src/kudu/tablet/tablet_peer-test.cc
+++ b/src/kudu/tablet/tablet_peer-test.cc
@@ -32,7 +32,6 @@
#include "kudu/rpc/messenger.h"
#include "kudu/server/clock.h"
#include "kudu/server/logical_clock.h"
-#include "kudu/tablet/maintenance_manager.h"
#include "kudu/tablet/transactions/transaction.h"
#include "kudu/tablet/transactions/transaction_driver.h"
#include "kudu/tablet/transactions/write_transaction.h"
@@ -40,6 +39,7 @@
#include "kudu/tablet/tablet_peer_mm_ops.h"
#include "kudu/tablet/tablet-test-util.h"
#include "kudu/tserver/tserver.pb.h"
+#include "kudu/util/maintenance_manager.h"
#include "kudu/util/metrics.h"
#include "kudu/util/test_util.h"
#include "kudu/util/test_macros.h"
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/tablet/tablet_peer_mm_ops.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_peer_mm_ops.cc b/src/kudu/tablet/tablet_peer_mm_ops.cc
index 3520d1b..d116cda 100644
--- a/src/kudu/tablet/tablet_peer_mm_ops.cc
+++ b/src/kudu/tablet/tablet_peer_mm_ops.cc
@@ -24,9 +24,9 @@
#include <string>
#include "kudu/gutil/strings/substitute.h"
-#include "kudu/tablet/maintenance_manager.h"
#include "kudu/tablet/tablet_metrics.h"
#include "kudu/util/flag_tags.h"
+#include "kudu/util/maintenance_manager.h"
#include "kudu/util/metrics.h"
DEFINE_int32(flush_threshold_mb, 1024,
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/tablet/tablet_peer_mm_ops.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_peer_mm_ops.h b/src/kudu/tablet/tablet_peer_mm_ops.h
index a4b475d..a985802 100644
--- a/src/kudu/tablet/tablet_peer_mm_ops.h
+++ b/src/kudu/tablet/tablet_peer_mm_ops.h
@@ -18,8 +18,8 @@
#ifndef KUDU_TABLET_TABLET_PEER_MM_OPS_H_
#define KUDU_TABLET_TABLET_PEER_MM_OPS_H_
-#include "kudu/tablet/maintenance_manager.h"
#include "kudu/tablet/tablet_peer.h"
+#include "kudu/util/maintenance_manager.h"
#include "kudu/util/stopwatch.h"
namespace kudu {
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/tserver/mini_tablet_server.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/mini_tablet_server.cc b/src/kudu/tserver/mini_tablet_server.cc
index 380946d..92f1ad2 100644
--- a/src/kudu/tserver/mini_tablet_server.cc
+++ b/src/kudu/tserver/mini_tablet_server.cc
@@ -31,12 +31,12 @@
#include "kudu/gutil/strings/substitute.h"
#include "kudu/server/rpc_server.h"
#include "kudu/server/webserver.h"
-#include "kudu/tablet/maintenance_manager.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_peer.h"
#include "kudu/tablet/tablet-test-util.h"
#include "kudu/tserver/tablet_server.h"
#include "kudu/tserver/ts_tablet_manager.h"
+#include "kudu/util/maintenance_manager.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/status.h"
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/tserver/tablet_server-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server-test-base.h b/src/kudu/tserver/tablet_server-test-base.h
index 1d5022e..3bcf34a 100644
--- a/src/kudu/tserver/tablet_server-test-base.h
+++ b/src/kudu/tserver/tablet_server-test-base.h
@@ -40,7 +40,6 @@
#include "kudu/rpc/messenger.h"
#include "kudu/server/server_base.proxy.h"
#include "kudu/tablet/local_tablet_writer.h"
-#include "kudu/tablet/maintenance_manager.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_peer.h"
#include "kudu/tserver/mini_tablet_server.h"
@@ -51,6 +50,7 @@
#include "kudu/tserver/tserver_admin.proxy.h"
#include "kudu/tserver/tserver_service.proxy.h"
#include "kudu/tserver/ts_tablet_manager.h"
+#include "kudu/util/maintenance_manager.h"
#include "kudu/util/metrics.h"
#include "kudu/util/test_graph.h"
#include "kudu/util/test_util.h"
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/tserver/tablet_server.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server.cc b/src/kudu/tserver/tablet_server.cc
index ea3faad..d933491 100644
--- a/src/kudu/tserver/tablet_server.cc
+++ b/src/kudu/tserver/tablet_server.cc
@@ -27,13 +27,13 @@
#include "kudu/rpc/service_if.h"
#include "kudu/server/rpc_server.h"
#include "kudu/server/webserver.h"
-#include "kudu/tablet/maintenance_manager.h"
#include "kudu/tserver/heartbeater.h"
#include "kudu/tserver/scanners.h"
#include "kudu/tserver/tablet_service.h"
#include "kudu/tserver/ts_tablet_manager.h"
#include "kudu/tserver/tserver-path-handlers.h"
#include "kudu/tserver/remote_bootstrap_service.h"
+#include "kudu/util/maintenance_manager.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/status.h"
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/tserver/tserver-path-handlers.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tserver-path-handlers.cc b/src/kudu/tserver/tserver-path-handlers.cc
index 67afcb6..6cad63f 100644
--- a/src/kudu/tserver/tserver-path-handlers.cc
+++ b/src/kudu/tserver/tserver-path-handlers.cc
@@ -32,13 +32,13 @@
#include "kudu/gutil/strings/numbers.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/server/webui_util.h"
-#include "kudu/tablet/maintenance_manager.h"
#include "kudu/tablet/tablet.pb.h"
#include "kudu/tablet/tablet_bootstrap.h"
#include "kudu/tablet/tablet_peer.h"
#include "kudu/tserver/scanners.h"
#include "kudu/tserver/tablet_server.h"
#include "kudu/tserver/ts_tablet_manager.h"
+#include "kudu/util/maintenance_manager.h"
#include "kudu/util/url-coding.h"
using kudu::consensus::GetConsensusRole;
@@ -46,9 +46,9 @@ using kudu::consensus::CONSENSUS_CONFIG_COMMITTED;
using kudu::consensus::ConsensusStatePB;
using kudu::consensus::RaftPeerPB;
using kudu::consensus::TransactionStatusPB;
-using kudu::tablet::MaintenanceManagerStatusPB;
-using kudu::tablet::MaintenanceManagerStatusPB_CompletedOpPB;
-using kudu::tablet::MaintenanceManagerStatusPB_MaintenanceOpPB;
+using kudu::MaintenanceManagerStatusPB;
+using kudu::MaintenanceManagerStatusPB_CompletedOpPB;
+using kudu::MaintenanceManagerStatusPB_MaintenanceOpPB;
using kudu::tablet::Tablet;
using kudu::tablet::TabletPeer;
using kudu::tablet::TabletStatusPB;
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt
index 01f9b1a..a924f38 100644
--- a/src/kudu/util/CMakeLists.txt
+++ b/src/kudu/util/CMakeLists.txt
@@ -30,6 +30,20 @@ ADD_EXPORTABLE_LIBRARY(histogram_proto
NONLINK_DEPS ${HISTOGRAM_PROTO_TGTS})
#######################################
+# maintenance_manager_proto
+#######################################
+
+PROTOBUF_GENERATE_CPP(
+ MAINTENANCE_MANAGER_PROTO_SRCS MAINTENANCE_MANAGER_PROTO_HDRS MAINTENANCE_MANAGER_PROTO_TGTS
+ SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
+ BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
+ PROTO_FILES maintenance_manager.proto)
+ADD_EXPORTABLE_LIBRARY(maintenance_manager_proto
+ SRCS ${MAINTENANCE_MANAGER_PROTO_SRCS}
+ DEPS protobuf
+ NONLINK_DEPS ${MAINTENANCE_MANAGER_PROTO_TGTS})
+
+#######################################
# pb_util_proto
#######################################
@@ -118,6 +132,7 @@ set(UTIL_SRCS
kernel_stack_watchdog.cc
locks.cc
logging.cc
+ maintenance_manager.cc
malloc.cc
memcmpable_varint.cc
memory/arena.cc
@@ -181,6 +196,7 @@ set(UTIL_LIBS
glog
gutil
histogram_proto
+ maintenance_manager_proto
pb_util_proto
protobuf
version_info_proto
@@ -279,6 +295,7 @@ ADD_KUDU_TEST(interval_tree-test)
ADD_KUDU_TEST(jsonreader-test)
ADD_KUDU_TEST(knapsack_solver-test)
ADD_KUDU_TEST(logging-test)
+ADD_KUDU_TEST(maintenance_manager-test)
ADD_KUDU_TEST(map-util-test)
ADD_KUDU_TEST(mem_tracker-test)
ADD_KUDU_TEST(memcmpable_varint-test LABELS no_tsan)
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/util/maintenance_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/maintenance_manager-test.cc b/src/kudu/util/maintenance_manager-test.cc
new file mode 100644
index 0000000..87023dc
--- /dev/null
+++ b/src/kudu/util/maintenance_manager-test.cc
@@ -0,0 +1,286 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gflags/gflags.h>
+#include <gtest/gtest.h>
+#include <memory>
+#include <mutex>
+#include <vector>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/tablet/tablet.pb.h"
+#include "kudu/util/maintenance_manager.h"
+#include "kudu/util/mem_tracker.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/thread.h"
+
+using std::shared_ptr;
+using std::vector;
+using strings::Substitute;
+
+METRIC_DEFINE_entity(test);
+METRIC_DEFINE_gauge_uint32(test, maintenance_ops_running,
+ "Number of Maintenance Operations Running",
+ kudu::MetricUnit::kMaintenanceOperations,
+ "The number of background maintenance operations currently running.");
+METRIC_DEFINE_histogram(test, maintenance_op_duration,
+ "Maintenance Operation Duration",
+ kudu::MetricUnit::kSeconds, "", 60000000LU, 2);
+
+namespace kudu {
+
+const int kHistorySize = 4;
+
+class MaintenanceManagerTest : public KuduTest {
+ public:
+ MaintenanceManagerTest() {
+ test_tracker_ = MemTracker::CreateTracker(1000, "test");
+ MaintenanceManager::Options options;
+ options.num_threads = 2;
+ options.polling_interval_ms = 1;
+ options.history_size = kHistorySize;
+ options.parent_mem_tracker = test_tracker_;
+ manager_.reset(new MaintenanceManager(options));
+ manager_->Init();
+ }
+ ~MaintenanceManagerTest() {
+ manager_->Shutdown();
+ }
+
+ protected:
+ shared_ptr<MemTracker> test_tracker_;
+ shared_ptr<MaintenanceManager> manager_;
+};
+
+// Just create the MaintenanceManager and then shut it down, to make sure
+// there are no race conditions there.
+TEST_F(MaintenanceManagerTest, TestCreateAndShutdown) {
+}
+
+enum TestMaintenanceOpState {
+ OP_DISABLED,
+ OP_RUNNABLE,
+ OP_RUNNING,
+ OP_FINISHED,
+};
+
+class TestMaintenanceOp : public MaintenanceOp {
+ public:
+ TestMaintenanceOp(const std::string& name,
+ IOUsage io_usage,
+ TestMaintenanceOpState state,
+ const shared_ptr<MemTracker>& tracker)
+ : MaintenanceOp(name, io_usage),
+ state_change_cond_(&lock_),
+ state_(state),
+ consumption_(tracker, 500),
+ logs_retained_bytes_(0),
+ perf_improvement_(0),
+ metric_entity_(METRIC_ENTITY_test.Instantiate(&metric_registry_, "test")),
+ maintenance_op_duration_(METRIC_maintenance_op_duration.Instantiate(metric_entity_)),
+ maintenance_ops_running_(METRIC_maintenance_ops_running.Instantiate(metric_entity_, 0)) {
+ }
+
+ virtual ~TestMaintenanceOp() {}
+
+ virtual bool Prepare() OVERRIDE {
+ std::lock_guard<Mutex> guard(lock_);
+ if (state_ != OP_RUNNABLE) {
+ return false;
+ }
+ state_ = OP_RUNNING;
+ state_change_cond_.Broadcast();
+ DLOG(INFO) << "Prepared op " << name();
+ return true;
+ }
+
+ virtual void Perform() OVERRIDE {
+ DLOG(INFO) << "Performing op " << name();
+ std::lock_guard<Mutex> guard(lock_);
+ CHECK_EQ(OP_RUNNING, state_);
+ state_ = OP_FINISHED;
+ state_change_cond_.Broadcast();
+ }
+
+ virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE {
+ std::lock_guard<Mutex> guard(lock_);
+ stats->set_runnable(state_ == OP_RUNNABLE);
+ stats->set_ram_anchored(consumption_.consumption());
+ stats->set_logs_retained_bytes(logs_retained_bytes_);
+ stats->set_perf_improvement(perf_improvement_);
+ }
+
+ void Enable() {
+ std::lock_guard<Mutex> guard(lock_);
+ DCHECK((state_ == OP_DISABLED) || (state_ == OP_FINISHED));
+ state_ = OP_RUNNABLE;
+ state_change_cond_.Broadcast();
+ }
+
+ void WaitForState(TestMaintenanceOpState state) {
+ std::lock_guard<Mutex> guard(lock_);
+ while (true) {
+ if (state_ == state) {
+ return;
+ }
+ state_change_cond_.Wait();
+ }
+ }
+
+ bool WaitForStateWithTimeout(TestMaintenanceOpState state, int ms) {
+ MonoDelta to_wait = MonoDelta::FromMilliseconds(ms);
+ std::lock_guard<Mutex> guard(lock_);
+ while (true) {
+ if (state_ == state) {
+ return true;
+ }
+ if (!state_change_cond_.TimedWait(to_wait)) {
+ return false;
+ }
+ }
+ }
+
+ void set_ram_anchored(uint64_t ram_anchored) {
+ std::lock_guard<Mutex> guard(lock_);
+ consumption_.Reset(ram_anchored);
+ }
+
+ void set_logs_retained_bytes(uint64_t logs_retained_bytes) {
+ std::lock_guard<Mutex> guard(lock_);
+ logs_retained_bytes_ = logs_retained_bytes;
+ }
+
+ void set_perf_improvement(uint64_t perf_improvement) {
+ std::lock_guard<Mutex> guard(lock_);
+ perf_improvement_ = perf_improvement;
+ }
+
+ virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE {
+ return maintenance_op_duration_;
+ }
+
+ virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE {
+ return maintenance_ops_running_;
+ }
+
+ private:
+ Mutex lock_;
+ ConditionVariable state_change_cond_;
+ enum TestMaintenanceOpState state_;
+ ScopedTrackedConsumption consumption_;
+ uint64_t logs_retained_bytes_;
+ uint64_t perf_improvement_;
+ MetricRegistry metric_registry_;
+ scoped_refptr<MetricEntity> metric_entity_;
+ scoped_refptr<Histogram> maintenance_op_duration_;
+ scoped_refptr<AtomicGauge<uint32_t> > maintenance_ops_running_;
+};
+
+// Create an op and wait for it to start running. Unregister it while it is
+// running and verify that UnregisterOp waits for it to finish before
+// proceeding.
+TEST_F(MaintenanceManagerTest, TestRegisterUnregister) {
+ TestMaintenanceOp op1("1", MaintenanceOp::HIGH_IO_USAGE, OP_DISABLED, test_tracker_);
+ op1.set_ram_anchored(1001);
+ manager_->RegisterOp(&op1);
+ scoped_refptr<kudu::Thread> thread;
+ CHECK_OK(Thread::Create("TestThread", "TestRegisterUnregister",
+ boost::bind(&TestMaintenanceOp::Enable, &op1), &thread));
+ op1.WaitForState(OP_FINISHED);
+ manager_->UnregisterOp(&op1);
+ ThreadJoiner(thread.get()).Join();
+}
+
+// Test that we'll run an operation that doesn't improve performance when memory
+// pressure gets high.
+TEST_F(MaintenanceManagerTest, TestMemoryPressure) {
+ TestMaintenanceOp op("op", MaintenanceOp::HIGH_IO_USAGE, OP_RUNNABLE, test_tracker_);
+ op.set_ram_anchored(100);
+ manager_->RegisterOp(&op);
+
+ // At first, we don't want to run this, since there is no perf_improvement.
+ CHECK_EQ(false, op.WaitForStateWithTimeout(OP_FINISHED, 20));
+
+ // set the ram_anchored by the high mem op so high that we'll have to run it.
+ scoped_refptr<kudu::Thread> thread;
+ CHECK_OK(Thread::Create("TestThread", "MaintenanceManagerTest",
+ boost::bind(&TestMaintenanceOp::set_ram_anchored, &op, 1100), &thread));
+ op.WaitForState(OP_FINISHED);
+ manager_->UnregisterOp(&op);
+ ThreadJoiner(thread.get()).Join();
+}
+
+// Test that ops are prioritized correctly when we add log retention.
+TEST_F(MaintenanceManagerTest, TestLogRetentionPrioritization) {
+ manager_->Shutdown();
+
+ TestMaintenanceOp op1("op1", MaintenanceOp::LOW_IO_USAGE, OP_RUNNABLE, test_tracker_);
+ op1.set_ram_anchored(0);
+ op1.set_logs_retained_bytes(100);
+
+ TestMaintenanceOp op2("op2", MaintenanceOp::HIGH_IO_USAGE, OP_RUNNABLE, test_tracker_);
+ op2.set_ram_anchored(100);
+ op2.set_logs_retained_bytes(100);
+
+ TestMaintenanceOp op3("op3", MaintenanceOp::HIGH_IO_USAGE, OP_RUNNABLE, test_tracker_);
+ op3.set_ram_anchored(200);
+ op3.set_logs_retained_bytes(100);
+
+ manager_->RegisterOp(&op1);
+ manager_->RegisterOp(&op2);
+ manager_->RegisterOp(&op3);
+
+ // We want to do the low IO op first since it clears up some log retention.
+ ASSERT_EQ(&op1, manager_->FindBestOp());
+
+ manager_->UnregisterOp(&op1);
+
+ // Low IO is taken care of, now we find the op clears the most log retention and ram.
+ ASSERT_EQ(&op3, manager_->FindBestOp());
+
+ manager_->UnregisterOp(&op3);
+
+ ASSERT_EQ(&op2, manager_->FindBestOp());
+
+ manager_->UnregisterOp(&op2);
+}
+
+// Test adding operations and make sure that the history of recently completed operations
+// is correct in that it wraps around and doesn't grow.
+TEST_F(MaintenanceManagerTest, TestCompletedOpsHistory) {
+ for (int i = 0; i < 5; i++) {
+ string name = Substitute("op$0", i);
+ TestMaintenanceOp op(name, MaintenanceOp::HIGH_IO_USAGE, OP_RUNNABLE, test_tracker_);
+ op.set_perf_improvement(1);
+ op.set_ram_anchored(100);
+ manager_->RegisterOp(&op);
+
+ CHECK_EQ(true, op.WaitForStateWithTimeout(OP_FINISHED, 200));
+ manager_->UnregisterOp(&op);
+
+ MaintenanceManagerStatusPB status_pb;
+ manager_->GetMaintenanceManagerStatusDump(&status_pb);
+ // The size should be at most the history_size.
+ ASSERT_GE(kHistorySize, status_pb.completed_operations_size());
+ // See that we have the right name, even if we wrap around.
+ ASSERT_EQ(name, status_pb.completed_operations(i % 4).name());
+ }
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/util/maintenance_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/maintenance_manager.cc b/src/kudu/util/maintenance_manager.cc
new file mode 100644
index 0000000..1060bf8
--- /dev/null
+++ b/src/kudu/util/maintenance_manager.cc
@@ -0,0 +1,415 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/maintenance_manager.h"
+
+#include <gflags/gflags.h>
+#include <memory>
+#include <stdint.h>
+#include <string>
+#include <utility>
+
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/debug/trace_event.h"
+#include "kudu/util/debug/trace_logging.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/mem_tracker.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/thread.h"
+
+using std::pair;
+using std::shared_ptr;
+using strings::Substitute;
+
+DEFINE_int32(maintenance_manager_num_threads, 1,
+ "Size of the maintenance manager thread pool. Beyond a value of '1', one thread is "
+ "reserved for emergency flushes. For spinning disks, the number of threads should "
+ "not be above the number of devices.");
+TAG_FLAG(maintenance_manager_num_threads, stable);
+
+DEFINE_int32(maintenance_manager_polling_interval_ms, 250,
+ "Polling interval for the maintenance manager scheduler, "
+ "in milliseconds.");
+TAG_FLAG(maintenance_manager_polling_interval_ms, hidden);
+
+DEFINE_int32(maintenance_manager_history_size, 8,
+ "Number of completed operations the manager is keeping track of.");
+TAG_FLAG(maintenance_manager_history_size, hidden);
+
+DEFINE_bool(enable_maintenance_manager, true,
+ "Enable the maintenance manager, runs compaction and tablet cleaning tasks.");
+TAG_FLAG(enable_maintenance_manager, unsafe);
+
+namespace kudu {
+
+MaintenanceOpStats::MaintenanceOpStats() {
+ Clear();
+}
+
+void MaintenanceOpStats::Clear() {
+ valid_ = false;
+ runnable_ = false;
+ ram_anchored_ = 0;
+ logs_retained_bytes_ = 0;
+ perf_improvement_ = 0;
+}
+
+MaintenanceOp::MaintenanceOp(std::string name, IOUsage io_usage)
+ : name_(std::move(name)), running_(0), io_usage_(io_usage) {}
+
+MaintenanceOp::~MaintenanceOp() {
+ CHECK(!manager_.get()) << "You must unregister the " << name_
+ << " Op before destroying it.";
+}
+
+void MaintenanceOp::Unregister() {
+ CHECK(manager_.get()) << "Op " << name_ << " was never registered.";
+ manager_->UnregisterOp(this);
+}
+
+const MaintenanceManager::Options MaintenanceManager::DEFAULT_OPTIONS = {
+ 0,
+ 0,
+ 0,
+ shared_ptr<MemTracker>(),
+};
+
+MaintenanceManager::MaintenanceManager(const Options& options)
+ : num_threads_(options.num_threads <= 0 ?
+ FLAGS_maintenance_manager_num_threads : options.num_threads),
+ cond_(&lock_),
+ shutdown_(false),
+ running_ops_(0),
+ polling_interval_ms_(options.polling_interval_ms <= 0 ?
+ FLAGS_maintenance_manager_polling_interval_ms :
+ options.polling_interval_ms),
+ completed_ops_count_(0),
+ parent_mem_tracker_(!options.parent_mem_tracker ?
+ MemTracker::GetRootTracker() : options.parent_mem_tracker) {
+ CHECK_OK(ThreadPoolBuilder("MaintenanceMgr").set_min_threads(num_threads_)
+ .set_max_threads(num_threads_).Build(&thread_pool_));
+ uint32_t history_size = options.history_size == 0 ?
+ FLAGS_maintenance_manager_history_size :
+ options.history_size;
+ completed_ops_.resize(history_size);
+}
+
+MaintenanceManager::~MaintenanceManager() {
+ Shutdown();
+}
+
+Status MaintenanceManager::Init() {
+ RETURN_NOT_OK(Thread::Create("maintenance", "maintenance_scheduler",
+ boost::bind(&MaintenanceManager::RunSchedulerThread, this),
+ &monitor_thread_));
+ return Status::OK();
+}
+
+void MaintenanceManager::Shutdown() {
+ {
+ std::lock_guard<Mutex> guard(lock_);
+ if (shutdown_) {
+ return;
+ }
+ shutdown_ = true;
+ cond_.Broadcast();
+ }
+ if (monitor_thread_.get()) {
+ CHECK_OK(ThreadJoiner(monitor_thread_.get()).Join());
+ monitor_thread_.reset();
+ thread_pool_->Shutdown();
+ }
+}
+
+void MaintenanceManager::RegisterOp(MaintenanceOp* op) {
+ std::lock_guard<Mutex> guard(lock_);
+ CHECK(!op->manager_.get()) << "Tried to register " << op->name()
+ << ", but it was already registered.";
+ pair<OpMapTy::iterator, bool> val
+ (ops_.insert(OpMapTy::value_type(op, MaintenanceOpStats())));
+ CHECK(val.second)
+ << "Tried to register " << op->name()
+ << ", but it already exists in ops_.";
+ op->manager_ = shared_from_this();
+ op->cond_.reset(new ConditionVariable(&lock_));
+ VLOG_AND_TRACE("maintenance", 1) << "Registered " << op->name();
+}
+
+void MaintenanceManager::UnregisterOp(MaintenanceOp* op) {
+ {
+ std::lock_guard<Mutex> guard(lock_);
+ CHECK(op->manager_.get() == this) << "Tried to unregister " << op->name()
+ << ", but it is not currently registered with this maintenance manager.";
+ auto iter = ops_.find(op);
+ CHECK(iter != ops_.end()) << "Tried to unregister " << op->name()
+ << ", but it was never registered";
+ // While the op is running, wait for it to be finished.
+ if (iter->first->running_ > 0) {
+ VLOG_AND_TRACE("maintenance", 1) << "Waiting for op " << op->name() << " to finish so "
+ << "we can unregister it.";
+ }
+ while (iter->first->running_ > 0) {
+ op->cond_->Wait();
+ iter = ops_.find(op);
+ CHECK(iter != ops_.end()) << "Tried to unregister " << op->name()
+ << ", but another thread unregistered it while we were "
+ << "waiting for it to complete";
+ }
+ ops_.erase(iter);
+ }
+ LOG(INFO) << "Unregistered op " << op->name();
+ op->cond_.reset();
+ // Remove the op's shared_ptr reference to us. This might 'delete this'.
+ op->manager_.reset();
+}
+
+void MaintenanceManager::RunSchedulerThread() {
+ MonoDelta polling_interval = MonoDelta::FromMilliseconds(polling_interval_ms_);
+
+ std::unique_lock<Mutex> guard(lock_);
+ while (true) {
+ // Loop until we are shutting down or it is time to run another op.
+ cond_.TimedWait(polling_interval);
+ if (shutdown_) {
+ VLOG_AND_TRACE("maintenance", 1) << "Shutting down maintenance manager.";
+ return;
+ }
+
+ // Find the best op.
+ MaintenanceOp* op = FindBestOp();
+ if (!op) {
+ VLOG_AND_TRACE("maintenance", 2) << "No maintenance operations look worth doing.";
+ continue;
+ }
+
+ // Prepare the maintenance operation.
+ op->running_++;
+ running_ops_++;
+ guard.unlock();
+ bool ready = op->Prepare();
+ guard.lock();
+ if (!ready) {
+ LOG(INFO) << "Prepare failed for " << op->name()
+ << ". Re-running scheduler.";
+ op->running_--;
+ op->cond_->Signal();
+ continue;
+ }
+
+ // Run the maintenance operation.
+ Status s = thread_pool_->SubmitFunc(boost::bind(
+ &MaintenanceManager::LaunchOp, this, op));
+ CHECK(s.ok());
+ }
+}
+
+// Finding the best operation goes through four filters:
+// - If there's an Op that we can run quickly that frees log retention, we run it.
+// - If we've hit the overall process memory limit (note: this includes memory that the Ops cannot
+// free), we run the Op with the highest RAM usage.
+// - If there are Ops that retain logs, we run the one that has the highest retention (and if many
+// qualify, then we run the one that also frees up the most RAM).
+// - Finally, if there's nothing else that we really need to do, we run the Op that will improve
+// performance the most.
+//
+// The reason it's done this way is that we want to prioritize limiting the amount of resources we
+// hold on to. Low IO Ops go first since we can quickly run them, then we can look at memory usage.
+// Reversing those can starve the low IO Ops when the system is under intense memory pressure.
+//
+// In the third priority we're at a point where nothing's urgent and there's nothing we can run
+// quickly.
+// TODO We currently optimize for freeing log retention but we could consider having some sort of
+// sliding priority between log retention and RAM usage. For example, is an Op that frees
+// 128MB of log retention and 12MB of RAM always better than an op that frees 12MB of log retention
+// and 128MB of RAM? Maybe a more holistic approach would be better.
+MaintenanceOp* MaintenanceManager::FindBestOp() {
+ TRACE_EVENT0("maintenance", "MaintenanceManager::FindBestOp");
+
+ if (!FLAGS_enable_maintenance_manager) {
+ VLOG_AND_TRACE("maintenance", 1) << "Maintenance manager is disabled. Doing nothing";
+ return nullptr;
+ }
+ size_t free_threads = num_threads_ - running_ops_;
+ if (free_threads == 0) {
+ VLOG_AND_TRACE("maintenance", 1) << "there are no free threads, so we can't run anything.";
+ return nullptr;
+ }
+
+ int64_t low_io_most_logs_retained_bytes = 0;
+ MaintenanceOp* low_io_most_logs_retained_bytes_op = nullptr;
+
+ uint64_t most_mem_anchored = 0;
+ MaintenanceOp* most_mem_anchored_op = nullptr;
+
+ int64_t most_logs_retained_bytes = 0;
+ int64_t most_logs_retained_bytes_ram_anchored = 0;
+ MaintenanceOp* most_logs_retained_bytes_op = nullptr;
+
+ double best_perf_improvement = 0;
+ MaintenanceOp* best_perf_improvement_op = nullptr;
+ for (OpMapTy::value_type &val : ops_) {
+ MaintenanceOp* op(val.first);
+ MaintenanceOpStats& stats(val.second);
+ // Update op stats.
+ stats.Clear();
+ op->UpdateStats(&stats);
+ if (!stats.valid() || !stats.runnable()) {
+ continue;
+ }
+ if (stats.logs_retained_bytes() > low_io_most_logs_retained_bytes &&
+ op->io_usage_ == MaintenanceOp::LOW_IO_USAGE) {
+ low_io_most_logs_retained_bytes_op = op;
+ low_io_most_logs_retained_bytes = stats.logs_retained_bytes();
+ }
+
+ if (stats.ram_anchored() > most_mem_anchored) {
+ most_mem_anchored_op = op;
+ most_mem_anchored = stats.ram_anchored();
+ }
+ // We prioritize ops that can free more logs, but when it's the same we pick the one that
+ // also frees up the most memory.
+ if (stats.logs_retained_bytes() > 0 &&
+ (stats.logs_retained_bytes() > most_logs_retained_bytes ||
+ (stats.logs_retained_bytes() == most_logs_retained_bytes &&
+ stats.ram_anchored() > most_logs_retained_bytes_ram_anchored))) {
+ most_logs_retained_bytes_op = op;
+ most_logs_retained_bytes = stats.logs_retained_bytes();
+ most_logs_retained_bytes_ram_anchored = stats.ram_anchored();
+ }
+ if ((!best_perf_improvement_op) ||
+ (stats.perf_improvement() > best_perf_improvement)) {
+ best_perf_improvement_op = op;
+ best_perf_improvement = stats.perf_improvement();
+ }
+ }
+
+ // Look at ops that we can run quickly that free up log retention.
+ if (low_io_most_logs_retained_bytes_op) {
+ if (low_io_most_logs_retained_bytes > 0) {
+ VLOG_AND_TRACE("maintenance", 1)
+ << "Performing " << low_io_most_logs_retained_bytes_op->name() << ", "
+ << "because it can free up more logs "
+ << "at " << low_io_most_logs_retained_bytes
+ << " bytes with a low IO cost";
+ return low_io_most_logs_retained_bytes_op;
+ }
+ }
+
+ // Look at free memory. If it is dangerously low, we must select something
+ // that frees memory-- the op with the most anchored memory.
+ double capacity_pct;
+ if (parent_mem_tracker_->AnySoftLimitExceeded(&capacity_pct)) {
+ if (!most_mem_anchored_op) {
+ string msg = StringPrintf("we have exceeded our soft memory limit "
+ "(current capacity is %.2f%%). However, there are no ops currently "
+ "runnable which would free memory.", capacity_pct);
+ LOG(INFO) << msg;
+ return nullptr;
+ }
+ VLOG_AND_TRACE("maintenance", 1) << "we have exceeded our soft memory limit "
+ << "(current capacity is " << capacity_pct << "%). Running the op "
+ << "which anchors the most memory: " << most_mem_anchored_op->name();
+ return most_mem_anchored_op;
+ }
+
+ if (most_logs_retained_bytes_op) {
+ VLOG_AND_TRACE("maintenance", 1)
+ << "Performing " << most_logs_retained_bytes_op->name() << ", "
+ << "because it can free up more logs " << "at " << most_logs_retained_bytes
+ << " bytes";
+ return most_logs_retained_bytes_op;
+ }
+
+ if (best_perf_improvement_op) {
+ if (best_perf_improvement > 0) {
+ VLOG_AND_TRACE("maintenance", 1) << "Performing " << best_perf_improvement_op->name() << ", "
+ << "because it had the best perf_improvement score, "
+ << "at " << best_perf_improvement;
+ return best_perf_improvement_op;
+ }
+ }
+ return nullptr;
+}
+
+void MaintenanceManager::LaunchOp(MaintenanceOp* op) {
+ MonoTime start_time(MonoTime::Now(MonoTime::FINE));
+ op->RunningGauge()->Increment();
+ LOG_TIMING(INFO, Substitute("running $0", op->name())) {
+ TRACE_EVENT1("maintenance", "MaintenanceManager::LaunchOp",
+ "name", op->name());
+ op->Perform();
+ }
+ op->RunningGauge()->Decrement();
+ MonoTime end_time(MonoTime::Now(MonoTime::FINE));
+ MonoDelta delta(end_time.GetDeltaSince(start_time));
+ std::lock_guard<Mutex> guard(lock_);
+
+ CompletedOp& completed_op = completed_ops_[completed_ops_count_ % completed_ops_.size()];
+ completed_op.name = op->name();
+ completed_op.duration = delta;
+ completed_op.start_mono_time = start_time;
+ completed_ops_count_++;
+
+ op->DurationHistogram()->Increment(delta.ToMilliseconds());
+
+ running_ops_--;
+ op->running_--;
+ op->cond_->Signal();
+}
+
+void MaintenanceManager::GetMaintenanceManagerStatusDump(MaintenanceManagerStatusPB* out_pb) {
+ DCHECK(out_pb != nullptr);
+ std::lock_guard<Mutex> guard(lock_);
+ MaintenanceOp* best_op = FindBestOp();
+ for (MaintenanceManager::OpMapTy::value_type& val : ops_) {
+ MaintenanceManagerStatusPB_MaintenanceOpPB* op_pb = out_pb->add_registered_operations();
+ MaintenanceOp* op(val.first);
+ MaintenanceOpStats& stat(val.second);
+ op_pb->set_name(op->name());
+ op_pb->set_running(op->running());
+ if (stat.valid()) {
+ op_pb->set_runnable(stat.runnable());
+ op_pb->set_ram_anchored_bytes(stat.ram_anchored());
+ op_pb->set_logs_retained_bytes(stat.logs_retained_bytes());
+ op_pb->set_perf_improvement(stat.perf_improvement());
+ } else {
+ op_pb->set_runnable(false);
+ op_pb->set_ram_anchored_bytes(0);
+ op_pb->set_logs_retained_bytes(0);
+ op_pb->set_perf_improvement(0.0);
+ }
+
+ if (best_op == op) {
+ out_pb->mutable_best_op()->CopyFrom(*op_pb);
+ }
+ }
+
+ for (const CompletedOp& completed_op : completed_ops_) {
+ if (!completed_op.name.empty()) {
+ MaintenanceManagerStatusPB_CompletedOpPB* completed_pb = out_pb->add_completed_operations();
+ completed_pb->set_name(completed_op.name);
+ completed_pb->set_duration_millis(completed_op.duration.ToMilliseconds());
+
+ MonoDelta delta(MonoTime::Now(MonoTime::FINE).GetDeltaSince(completed_op.start_mono_time));
+ completed_pb->set_secs_since_start(delta.ToSeconds());
+ }
+ }
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/util/maintenance_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/maintenance_manager.h b/src/kudu/util/maintenance_manager.h
new file mode 100644
index 0000000..ef3229e
--- /dev/null
+++ b/src/kudu/util/maintenance_manager.h
@@ -0,0 +1,277 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include <map>
+#include <memory>
+#include <set>
+#include <string>
+#include <vector>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/condition_variable.h"
+#include "kudu/util/maintenance_manager.pb.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/thread.h"
+#include "kudu/util/threadpool.h"
+
+namespace kudu {
+
+template<class T>
+class AtomicGauge;
+class Histogram;
+class MaintenanceManager;
+class MemTracker;
+
+class MaintenanceOpStats {
+ public:
+ MaintenanceOpStats();
+
+ // Zero all stats. They are invalid until the first setter is called.
+ void Clear();
+
+ bool runnable() const {
+ DCHECK(valid_);
+ return runnable_;
+ }
+
+ void set_runnable(bool runnable) {
+ UpdateLastModified();
+ runnable_ = runnable;
+ }
+
+ uint64_t ram_anchored() const {
+ DCHECK(valid_);
+ return ram_anchored_;
+ }
+
+ void set_ram_anchored(uint64_t ram_anchored) {
+ UpdateLastModified();
+ ram_anchored_ = ram_anchored;
+ }
+
+ int64_t logs_retained_bytes() const {
+ DCHECK(valid_);
+ return logs_retained_bytes_;
+ }
+
+ void set_logs_retained_bytes(int64_t logs_retained_bytes) {
+ UpdateLastModified();
+ logs_retained_bytes_ = logs_retained_bytes;
+ }
+
+ double perf_improvement() const {
+ DCHECK(valid_);
+ return perf_improvement_;
+ }
+
+ void set_perf_improvement(double perf_improvement) {
+ UpdateLastModified();
+ perf_improvement_ = perf_improvement;
+ }
+
+ const MonoTime& last_modified() const {
+ DCHECK(valid_);
+ return last_modified_;
+ }
+
+ bool valid() const {
+ return valid_;
+ }
+
+ private:
+ void UpdateLastModified() {
+ valid_ = true;
+ last_modified_ = MonoTime::Now(MonoTime::FINE);
+ }
+
+ // True if these stats are valid.
+ bool valid_;
+
+ // True if this op can be run now.
+ bool runnable_;
+
+ // The approximate amount of memory that not doing this operation keeps
+ // around. This number is used to decide when to start freeing memory, so it
+ // should be fairly accurate. May be 0.
+ uint64_t ram_anchored_;
+
+ // The approximate amount of disk space that not doing this operation keeps us from GCing from
+ // the logs. May be 0.
+ int64_t logs_retained_bytes_;
+
+ // The estimated performance improvement-- how good it is to do this on some
+ // absolute scale (yet TBD).
+ double perf_improvement_;
+
+ // The last time that the stats were modified.
+ MonoTime last_modified_;
+};
+
+// MaintenanceOp objects represent background operations that the
+// MaintenanceManager can schedule. Once a MaintenanceOp is registered, the
+// manager will periodically poll it for statistics. The registrant is
+// responsible for managing the memory associated with the MaintenanceOp object.
+// Op objects should be unregistered before being de-allocated.
+class MaintenanceOp {
+ public:
+ friend class MaintenanceManager;
+
+ // General indicator of how much IO the Op will use.
+ enum IOUsage {
+ LOW_IO_USAGE, // Low impact operations like removing a file, updating metadata.
+ HIGH_IO_USAGE // Everything else.
+ };
+
+ explicit MaintenanceOp(std::string name, IOUsage io_usage);
+ virtual ~MaintenanceOp();
+
+ // Unregister this op, if it is currently registered.
+ void Unregister();
+
+ // Update the op statistics. This will be called every scheduling period
+ // (about a few times a second), so it should not be too expensive. It's
+ // possible for the returned statistics to be invalid; the caller should
+ // call MaintenanceOpStats::valid() before using them. This will be run
+ // under the MaintenanceManager lock.
+ virtual void UpdateStats(MaintenanceOpStats* stats) = 0;
+
+ // Prepare to perform the operation. This will be run without holding the
+ // maintenance manager lock. It should be short, since it is run from the
+ // context of the maintenance op scheduler thread rather than a worker thread.
+ // If this returns false, we will abort the operation.
+ virtual bool Prepare() = 0;
+
+ // Perform the operation. This will be run without holding the maintenance
+ // manager lock, and may take a long time.
+ virtual void Perform() = 0;
+
+ // Returns the histogram for this op that tracks duration. Cannot be NULL.
+ virtual scoped_refptr<Histogram> DurationHistogram() const = 0;
+
+ // Returns the gauge for this op that tracks when this op is running. Cannot be NULL.
+ virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const = 0;
+
+ uint32_t running() { return running_; }
+
+ std::string name() const { return name_; }
+
+ IOUsage io_usage() const { return io_usage_; }
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(MaintenanceOp);
+
+ // The name of the operation. Op names must be unique.
+ const std::string name_;
+
+ // The number of times that this op is currently running.
+ uint32_t running_;
+
+ // Condition variable which the UnregisterOp function can wait on.
+ //
+ // Note: 'cond_' is used with the MaintenanceManager's mutex. As such,
+ // it only exists when the op is registered.
+ gscoped_ptr<ConditionVariable> cond_;
+
+ // The MaintenanceManager with which this op is registered, or null
+ // if it is not registered.
+ std::shared_ptr<MaintenanceManager> manager_;
+
+ IOUsage io_usage_;
+};
+
+struct MaintenanceOpComparator {
+ bool operator() (const MaintenanceOp* lhs,
+ const MaintenanceOp* rhs) const {
+ return lhs->name().compare(rhs->name()) < 0;
+ }
+};
+
+// Holds the information regarding a recently completed operation.
+struct CompletedOp {
+ std::string name;
+ MonoDelta duration;
+ MonoTime start_mono_time;
+};
+
+// The MaintenanceManager manages the scheduling of background operations such
+// as flushes or compactions. It runs these operations in the background, in a
+// thread pool. It uses information provided in MaintenanceOpStats objects to
+// decide which operations, if any, to run.
+class MaintenanceManager : public std::enable_shared_from_this<MaintenanceManager> {
+ public:
+ struct Options {
+ int32_t num_threads;
+ int32_t polling_interval_ms;
+ uint32_t history_size;
+ std::shared_ptr<MemTracker> parent_mem_tracker;
+ };
+
+ explicit MaintenanceManager(const Options& options);
+ ~MaintenanceManager();
+
+ Status Init();
+ void Shutdown();
+
+ // Register an op with the manager.
+ void RegisterOp(MaintenanceOp* op);
+
+ // Unregister an op with the manager.
+ // If the Op is currently running, it will not be interrupted. However, this
+ // function will block until the Op is finished.
+ void UnregisterOp(MaintenanceOp* op);
+
+ void GetMaintenanceManagerStatusDump(MaintenanceManagerStatusPB* out_pb);
+
+ static const Options DEFAULT_OPTIONS;
+
+ private:
+ FRIEND_TEST(MaintenanceManagerTest, TestLogRetentionPrioritization);
+ typedef std::map<MaintenanceOp*, MaintenanceOpStats,
+ MaintenanceOpComparator> OpMapTy;
+
+ void RunSchedulerThread();
+
+ // find the best op, or null if there is nothing we want to run
+ MaintenanceOp* FindBestOp();
+
+ void LaunchOp(MaintenanceOp* op);
+
+ const int32_t num_threads_;
+ OpMapTy ops_; // registered operations
+ Mutex lock_;
+ scoped_refptr<kudu::Thread> monitor_thread_;
+ gscoped_ptr<ThreadPool> thread_pool_;
+ ConditionVariable cond_;
+ bool shutdown_;
+ uint64_t running_ops_;
+ int32_t polling_interval_ms_;
+ // Vector used as a circular buffer for recently completed ops. Elements need to be added at
+ // the completed_ops_count_ % the vector's size and then the count needs to be incremented.
+ std::vector<CompletedOp> completed_ops_;
+ int64_t completed_ops_count_;
+ std::shared_ptr<MemTracker> parent_mem_tracker_;
+
+ DISALLOW_COPY_AND_ASSIGN(MaintenanceManager);
+};
+
+} // namespace kudu
[5/6] incubator-kudu git commit: Integrate the result tracker with
writes
Posted by mp...@apache.org.
Integrate the result tracker with writes
This patch integrates the result tracker with write transactions,
including:
- Support for caching responses on tablet bootstrap.
- Support for caching responses for follower transactions.
- Handling of races between client originated and (previous?) leader
originated transactions.
- An explanation of the interaction between the result tracker
and the transaction driver in transaction_driver.h.
For integration tests, this patch removes the check that allowed
Status::AlreadyPresent() that we added as we didn't have support for
exactly once semantics. Without this check, in slow mode, some tests
like raft_consensus-itest would fail 100% of the time, due to rows
being inserted multiple times.
Because we'd have 100% failure rate without replay cache for
writes and because testing it specifically is involved, this patch
doesn't include additional tests, these will be pushed as a follow up.
Change-Id: I1fa2f8db33653960f4749237b8993baba0929893
Reviewed-on: http://gerrit.cloudera.org:8080/3449
Reviewed-by: David Ribeiro Alves <dr...@apache.org>
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>
Tested-by: Kudu Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/6d2679bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/6d2679bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/6d2679bd
Branch: refs/heads/master
Commit: 6d2679bd14fdf454f2b194c83cd90316776d4dfe
Parents: 0412fa7
Author: dralves <dr...@apache.org>
Authored: Fri Jul 8 20:36:30 2016 -0700
Committer: David Ribeiro Alves <dr...@apache.org>
Committed: Fri Jul 15 00:10:44 2016 +0000
----------------------------------------------------------------------
src/kudu/client/batcher.cc | 3 +-
src/kudu/consensus/consensus.proto | 4 +
src/kudu/integration-tests/test_workload.cc | 12 ++-
src/kudu/integration-tests/test_workload.h | 10 ++
src/kudu/master/sys_catalog.cc | 7 +-
src/kudu/rpc/rpc_context.cc | 17 ++++
src/kudu/rpc/rpc_context.h | 6 ++
src/kudu/tablet/tablet_bootstrap-test.cc | 1 +
src/kudu/tablet/tablet_bootstrap.cc | 60 ++++++++++--
src/kudu/tablet/tablet_bootstrap.h | 5 +
src/kudu/tablet/tablet_peer-test.cc | 3 +
src/kudu/tablet/tablet_peer.cc | 17 +++-
src/kudu/tablet/tablet_peer.h | 5 +
src/kudu/tablet/transactions/transaction.h | 35 +++++++
.../tablet/transactions/transaction_driver.cc | 98 +++++++++++++++++++-
.../tablet/transactions/transaction_driver.h | 93 +++++++++++++++++++
.../tablet/transactions/write_transaction.cc | 12 ++-
.../tablet/transactions/write_transaction.h | 11 ++-
src/kudu/tools/insert-generated-rows.cc | 6 +-
src/kudu/tserver/CMakeLists.txt | 1 +
.../tserver/remote_bootstrap_session-test.cc | 6 +-
src/kudu/tserver/tablet_service.cc | 13 ++-
src/kudu/tserver/ts_tablet_manager.cc | 6 ++
src/kudu/tserver/ts_tablet_manager.h | 4 +
src/kudu/tserver/tserver_service.proto | 5 +-
25 files changed, 399 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/client/batcher.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/batcher.cc b/src/kudu/client/batcher.cc
index e15d135..8b68c5b 100644
--- a/src/kudu/client/batcher.cc
+++ b/src/kudu/client/batcher.cc
@@ -299,8 +299,7 @@ string WriteRpc::ToString() const {
}
void WriteRpc::Try(RemoteTabletServer* replica, const ResponseCallback& callback) {
- VLOG(2) << "Tablet " << tablet_id_ << ": Writing batch to replica "
- << replica->ToString();
+ VLOG(2) << "Tablet " << tablet_id_ << ": Writing batch to replica " << replica->ToString();
replica->proxy()->WriteAsync(req_, &resp_,
mutable_retrier()->mutable_controller(),
callback);
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/consensus/consensus.proto
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus.proto b/src/kudu/consensus/consensus.proto
index 3439d5b..d8fb1d3 100644
--- a/src/kudu/consensus/consensus.proto
+++ b/src/kudu/consensus/consensus.proto
@@ -22,6 +22,7 @@ import "kudu/common/common.proto";
import "kudu/common/wire_protocol.proto";
import "kudu/consensus/metadata.proto";
import "kudu/consensus/opid.proto";
+import "kudu/rpc/rpc_header.proto";
import "kudu/tablet/metadata.proto";
import "kudu/tablet/tablet.proto";
import "kudu/tserver/tserver_admin.proto";
@@ -177,6 +178,9 @@ message ReplicateMsg {
optional tserver.AlterSchemaRequestPB alter_schema_request = 6;
optional ChangeConfigRecordPB change_config_record = 7;
+ // The client's request id for this message, if it is set.
+ optional rpc.RequestIdPB request_id = 8;
+
optional NoOpRequestPB noop_request = 999;
}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/integration-tests/test_workload.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/test_workload.cc b/src/kudu/integration-tests/test_workload.cc
index 541647a..e3e83ad 100644
--- a/src/kudu/integration-tests/test_workload.cc
+++ b/src/kudu/integration-tests/test_workload.cc
@@ -58,6 +58,7 @@ TestWorkload::TestWorkload(ExternalMiniCluster* cluster)
write_timeout_millis_(20000),
timeout_allowed_(false),
not_found_allowed_(false),
+ already_present_allowed_(false),
num_replicas_(3),
num_tablets_(1),
table_name_(kDefaultTableName),
@@ -148,11 +149,12 @@ void TestWorkload::WriteThread() {
if (not_found_allowed_ && e->status().IsNotFound()) {
continue;
}
- // We don't handle write idempotency yet. (i.e making sure that when a leader fails
- // writes to it that were eventually committed by the new leader but un-ackd to the
- // client are not retried), so some errors are expected.
- // It's OK as long as the errors are Status::AlreadyPresent();
- CHECK(e->status().IsAlreadyPresent()) << "Unexpected error: " << e->status().ToString();
+
+ if (already_present_allowed_ && e->status().IsAlreadyPresent()) {
+ continue;
+ }
+
+ CHECK(e->status().ok()) << "Unexpected status: " << e->status().ToString();
}
inserted -= errors.size();
}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/integration-tests/test_workload.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/test_workload.h b/src/kudu/integration-tests/test_workload.h
index 0c271b9..fd3ec7e 100644
--- a/src/kudu/integration-tests/test_workload.h
+++ b/src/kudu/integration-tests/test_workload.h
@@ -78,6 +78,12 @@ class TestWorkload {
not_found_allowed_ = allowed;
}
+ // Whether per-row errors with Status::AlreadyPresent() are allowed.
+ // By default this triggers a check failure.
+ void set_already_present_allowed(bool allowed) {
+ already_present_allowed_ = allowed;
+ }
+
void set_num_replicas(int r) {
num_replicas_ = r;
}
@@ -116,6 +122,9 @@ class TestWorkload {
void set_write_pattern(WritePattern pattern) {
write_pattern_ = pattern;
+ // Since we're writing with dup keys we will get AlreadyPresent() errors on the response
+ // so allow it.
+ set_already_present_allowed(true);
}
// Sets up the internal client and creates the table which will be used for
@@ -156,6 +165,7 @@ class TestWorkload {
int write_timeout_millis_;
bool timeout_allowed_;
bool not_found_allowed_;
+ bool already_present_allowed_;
WritePattern write_pattern_ = INSERT_RANDOM_ROWS;
int num_replicas_;
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/master/sys_catalog.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index dba2d77..824561f 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -266,6 +266,7 @@ Status SysCatalogTable::SetupTablet(const scoped_refptr<tablet::TabletMetadata>&
RETURN_NOT_OK(BootstrapTablet(metadata,
scoped_refptr<server::Clock>(master_->clock()),
master_->mem_tracker(),
+ scoped_refptr<rpc::ResultTracker>(),
metric_registry_,
tablet_peer_->status_listener(),
&tablet,
@@ -279,6 +280,7 @@ Status SysCatalogTable::SetupTablet(const scoped_refptr<tablet::TabletMetadata>&
RETURN_NOT_OK_PREPEND(tablet_peer_->Init(tablet,
scoped_refptr<server::Clock>(master_->clock()),
master_->messenger(),
+ scoped_refptr<rpc::ResultTracker>(),
log,
tablet->GetMetricEntity()),
"Failed to Init() TabletPeer");
@@ -330,7 +332,10 @@ Status SysCatalogTable::SyncWrite(const WriteRequestPB *req, WriteResponsePB *re
gscoped_ptr<tablet::TransactionCompletionCallback> txn_callback(
new LatchTransactionCompletionCallback<WriteResponsePB>(&latch, resp));
unique_ptr<tablet::WriteTransactionState> tx_state(
- new tablet::WriteTransactionState(tablet_peer_.get(), req, resp));
+ new tablet::WriteTransactionState(tablet_peer_.get(),
+ req,
+ nullptr, // No RequestIdPB
+ resp));
tx_state->set_completion_callback(std::move(txn_callback));
RETURN_NOT_OK(tablet_peer_->SubmitWrite(std::move(tx_state)));
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/rpc/rpc_context.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_context.cc b/src/kudu/rpc/rpc_context.cc
index c39818b..02ca2cc 100644
--- a/src/kudu/rpc/rpc_context.cc
+++ b/src/kudu/rpc/rpc_context.cc
@@ -68,6 +68,23 @@ void RpcContext::RespondSuccess() {
}
}
+void RpcContext::RespondNoCache() {
+ if (AreResultsTracked()) {
+ result_tracker_->FailAndRespond(call_->header().request_id(),
+ response_pb_.get());
+ } else {
+ VLOG(4) << call_->remote_method().service_name() << ": Sending RPC failure response for "
+ << call_->ToString() << ": " << response_pb_->DebugString();
+ TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this,
+ "response", pb_util::PbTracer::TracePb(*response_pb_),
+ "trace", trace()->DumpToString());
+ // This is a bit counter intuitive, but when we get the failure but set the error on the
+ // call's response we call RespondSuccess() instead of RespondFailure().
+ call_->RespondSuccess(*response_pb_);
+ delete this;
+ }
+}
+
void RpcContext::RespondFailure(const Status &status) {
if (AreResultsTracked()) {
result_tracker_->FailAndRespond(call_->header().request_id(),
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/rpc/rpc_context.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_context.h b/src/kudu/rpc/rpc_context.h
index ec32e8b..2ee58b7 100644
--- a/src/kudu/rpc/rpc_context.h
+++ b/src/kudu/rpc/rpc_context.h
@@ -83,6 +83,12 @@ class RpcContext {
// and response protobufs are also destroyed.
void RespondSuccess();
+ // Like the above, but doesn't store the results of the service call, if results
+ // are being tracked.
+ // Used in cases where a call specific error was set on the response protobuf,
+ // the call should be considered failed, thus results shouldn't be cached.
+ void RespondNoCache();
+
// Respond with an error to the client. This sends back an error with the code
// ERROR_APPLICATION. Because there is no more specific error code passed back
// to the client, most applications should create a custom error PB extension
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/tablet/tablet_bootstrap-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_bootstrap-test.cc b/src/kudu/tablet/tablet_bootstrap-test.cc
index 9713073..17de026 100644
--- a/src/kudu/tablet/tablet_bootstrap-test.cc
+++ b/src/kudu/tablet/tablet_bootstrap-test.cc
@@ -107,6 +107,7 @@ class BootstrapTest : public LogTestBase {
meta,
scoped_refptr<Clock>(LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp)),
shared_ptr<MemTracker>(),
+ scoped_refptr<rpc::ResultTracker>(),
NULL,
listener.get(),
tablet,
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/tablet/tablet_bootstrap.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_bootstrap.cc b/src/kudu/tablet/tablet_bootstrap.cc
index 6395d4e..0fe9e09 100644
--- a/src/kudu/tablet/tablet_bootstrap.cc
+++ b/src/kudu/tablet/tablet_bootstrap.cc
@@ -41,6 +41,7 @@
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/strings/util.h"
#include "kudu/gutil/walltime.h"
+#include "kudu/rpc/result_tracker.h"
#include "kudu/server/clock.h"
#include "kudu/server/hybrid_clock.h"
#include "kudu/tablet/lock_manager.h"
@@ -97,6 +98,7 @@ using log::LogEntryPB;
using log::LogOptions;
using log::LogReader;
using log::ReadableLogSegment;
+using rpc::ResultTracker;
using server::Clock;
using std::map;
using std::shared_ptr;
@@ -106,6 +108,8 @@ using std::unordered_map;
using strings::Substitute;
using tserver::AlterSchemaRequestPB;
using tserver::WriteRequestPB;
+using tserver::WriteResponsePB;
+
struct ReplayState;
@@ -160,6 +164,7 @@ class TabletBootstrap {
TabletBootstrap(const scoped_refptr<TabletMetadata>& meta,
const scoped_refptr<Clock>& clock,
shared_ptr<MemTracker> mem_tracker,
+ const scoped_refptr<ResultTracker> result_tracker,
MetricRegistry* metric_registry,
TabletStatusListener* listener,
const scoped_refptr<LogAnchorRegistry>& log_anchor_registry);
@@ -239,10 +244,11 @@ class TabletBootstrap {
const TxResultPB& result,
const vector<bool>& already_flushed);
- // Determine which of the operations from 'result' correspond to already-flushed
- // stores.
- Status DetermineFlushedOps(const TxResultPB& result,
- vector<bool>* flushed_by_op);
+ // Determine which of the operations from 'result' correspond to already-flushed stores.
+ // At the same time this builds the WriteResponsePB that we'll store on the ResultTracker.
+ Status DetermineFlushedOpsAndBuildResponse(const TxResultPB& result,
+ vector<bool>* flushed_by_op,
+ WriteResponsePB* response);
// Pass through all of the decoded operations in tx_state. For
// each op:
@@ -305,6 +311,7 @@ class TabletBootstrap {
scoped_refptr<TabletMetadata> meta_;
scoped_refptr<Clock> clock_;
shared_ptr<MemTracker> mem_tracker_;
+ scoped_refptr<rpc::ResultTracker> result_tracker_;
MetricRegistry* metric_registry_;
TabletStatusListener* listener_;
gscoped_ptr<tablet::Tablet> tablet_;
@@ -399,6 +406,7 @@ void TabletStatusListener::StatusMessage(const string& status) {
Status BootstrapTablet(const scoped_refptr<TabletMetadata>& meta,
const scoped_refptr<Clock>& clock,
const shared_ptr<MemTracker>& mem_tracker,
+ const scoped_refptr<ResultTracker>& result_tracker,
MetricRegistry* metric_registry,
TabletStatusListener* listener,
shared_ptr<tablet::Tablet>* rebuilt_tablet,
@@ -407,7 +415,7 @@ Status BootstrapTablet(const scoped_refptr<TabletMetadata>& meta,
ConsensusBootstrapInfo* consensus_info) {
TRACE_EVENT1("tablet", "BootstrapTablet",
"tablet_id", meta->tablet_id());
- TabletBootstrap bootstrap(meta, clock, mem_tracker,
+ TabletBootstrap bootstrap(meta, clock, mem_tracker, result_tracker,
metric_registry, listener, log_anchor_registry);
RETURN_NOT_OK(bootstrap.Bootstrap(rebuilt_tablet, rebuilt_log, consensus_info));
// This is necessary since OpenNewLog() initially disables sync.
@@ -436,11 +444,13 @@ static string DebugInfo(const string& tablet_id,
TabletBootstrap::TabletBootstrap(
const scoped_refptr<TabletMetadata>& meta,
const scoped_refptr<Clock>& clock, shared_ptr<MemTracker> mem_tracker,
+ const scoped_refptr<ResultTracker> result_tracker,
MetricRegistry* metric_registry, TabletStatusListener* listener,
const scoped_refptr<LogAnchorRegistry>& log_anchor_registry)
: meta_(meta),
clock_(clock),
mem_tracker_(std::move(mem_tracker)),
+ result_tracker_(result_tracker),
metric_registry_(metric_registry),
listener_(listener),
log_anchor_registry_(log_anchor_registry) {}
@@ -1182,13 +1192,19 @@ Status TabletBootstrap::AppendCommitMsg(const CommitMsg& commit_msg) {
return log_->Append(&commit_entry);
}
-Status TabletBootstrap::DetermineFlushedOps(const TxResultPB& result,
- vector<bool>* flushed_by_op) {
+Status TabletBootstrap::DetermineFlushedOpsAndBuildResponse(const TxResultPB& result,
+ vector<bool>* flushed_by_op,
+ WriteResponsePB* response) {
int num_ops = result.ops_size();
flushed_by_op->resize(num_ops);
for (int i = 0; i < num_ops; i++) {
const auto& orig_op_result = result.ops(i);
+ if (orig_op_result.has_failed_status() && response) {
+ WriteResponsePB::PerRowErrorPB* error = response->add_per_row_errors();
+ error->set_row_index(i);
+ error->mutable_error()->CopyFrom(orig_op_result.failed_status());
+ }
bool f;
RETURN_NOT_OK(FilterOperation(orig_op_result, &f));
(*flushed_by_op)[i] = f;
@@ -1217,12 +1233,40 @@ Status TabletBootstrap::PlayWriteRequest(ReplicateMsg* replicate_msg,
tablet_->StartTransaction(&tx_state);
tablet_->StartApplying(&tx_state);
+ unique_ptr<WriteResponsePB> response;
+
+ bool tracking_results = result_tracker_.get() != nullptr && replicate_msg->has_request_id();
+
+ // If the results are being tracked and this write has a request id, register
+ // it with the result tracker.
+ ResultTracker::RpcState state;
+ if (tracking_results) {
+ VLOG(1) << result_tracker_.get() << " Boostrapping request for tablet: "
+ << write->tablet_id() << ". State: " << 0 << " id: "
+ << replicate_msg->request_id().DebugString();
+ // We only replay committed requests so the result of tracking this request can be:
+ // NEW - This is a previously untracked request, or we changed the driver -> store the result
+ // COMPLETED - We've bootstrapped this tablet twice, and previously stored the result -> do
+ // nothing.
+ state = result_tracker_->TrackRpcOrChangeDriver(replicate_msg->request_id());
+ CHECK(state == ResultTracker::RpcState::NEW || state == ResultTracker::RpcState::COMPLETED)
+ << "Wrong state: " << state;
+ response.reset(new WriteResponsePB());
+ response->set_timestamp(replicate_msg->timestamp());
+ }
+
// Determine which of the operations are already flushed to persistent
// storage and don't need to be re-applied. We can do this even before
// we decode any row operations, so we can short-circuit that decoding
// in the case that the entire op has been already flushed.
vector<bool> already_flushed;
- RETURN_NOT_OK(DetermineFlushedOps(commit_msg.result(), &already_flushed));
+ RETURN_NOT_OK(DetermineFlushedOpsAndBuildResponse(commit_msg.result(),
+ &already_flushed,
+ response.get()));
+
+ if (tracking_results && state == ResultTracker::NEW) {
+ result_tracker_->RecordCompletionAndRespond(replicate_msg->request_id(), response.get());
+ }
bool all_already_flushed = std::all_of(already_flushed.begin(),
already_flushed.end(),
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/tablet/tablet_bootstrap.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_bootstrap.h b/src/kudu/tablet/tablet_bootstrap.h
index 1566515..a7cdb6e 100644
--- a/src/kudu/tablet/tablet_bootstrap.h
+++ b/src/kudu/tablet/tablet_bootstrap.h
@@ -43,6 +43,10 @@ namespace consensus {
struct ConsensusBootstrapInfo;
} // namespace consensus
+namespace rpc {
+class ResultTracker;
+} // namespace rpc
+
namespace server {
class Clock;
}
@@ -94,6 +98,7 @@ extern const char* kLogRecoveryDir;
Status BootstrapTablet(const scoped_refptr<TabletMetadata>& meta,
const scoped_refptr<server::Clock>& clock,
const std::shared_ptr<MemTracker>& mem_tracker,
+ const scoped_refptr<rpc::ResultTracker>& result_tracker,
MetricRegistry* metric_registry,
TabletStatusListener* status_listener,
std::shared_ptr<Tablet>* rebuilt_tablet,
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/tablet/tablet_peer-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_peer-test.cc b/src/kudu/tablet/tablet_peer-test.cc
index b1c03ef..8a04d1f 100644
--- a/src/kudu/tablet/tablet_peer-test.cc
+++ b/src/kudu/tablet/tablet_peer-test.cc
@@ -138,6 +138,7 @@ class TabletPeerTest : public KuduTabletTest {
ASSERT_OK(tablet_peer_->Init(tablet(),
clock(),
messenger_,
+ scoped_refptr<rpc::ResultTracker>(),
log,
metric_entity_));
}
@@ -193,6 +194,7 @@ class TabletPeerTest : public KuduTabletTest {
gscoped_ptr<WriteResponsePB> resp(new WriteResponsePB());
unique_ptr<WriteTransactionState> tx_state(new WriteTransactionState(tablet_peer,
&req,
+ nullptr, // No RequestIdPB
resp.get()));
CountDownLatch rpc_latch(1);
@@ -464,6 +466,7 @@ TEST_F(TabletPeerTest, TestActiveTransactionPreventsLogGC) {
ASSERT_OK(GenerateSequentialDeleteRequest(req.get()));
unique_ptr<WriteTransactionState> tx_state(new WriteTransactionState(tablet_peer_.get(),
req.get(),
+ nullptr, // No RequestIdPB
resp.get()));
tx_state->set_completion_callback(gscoped_ptr<TransactionCompletionCallback>(
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/tablet/tablet_peer.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_peer.cc b/src/kudu/tablet/tablet_peer.cc
index 71bb522..44d3b30 100644
--- a/src/kudu/tablet/tablet_peer.cc
+++ b/src/kudu/tablet/tablet_peer.cc
@@ -36,6 +36,10 @@
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/sysinfo.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/remote_method.h"
+#include "kudu/rpc/rpc_service.h"
+#include "kudu/rpc/service_pool.h"
#include "kudu/tablet/transactions/transaction_driver.h"
#include "kudu/tablet/transactions/alter_schema_transaction.h"
#include "kudu/tablet/transactions/write_transaction.h"
@@ -91,6 +95,7 @@ using consensus::WRITE_OP;
using log::Log;
using log::LogAnchorRegistry;
using rpc::Messenger;
+using rpc::ResultTracker;
using strings::Substitute;
using tserver::TabletServerErrorPB;
@@ -122,6 +127,7 @@ TabletPeer::~TabletPeer() {
Status TabletPeer::Init(const shared_ptr<Tablet>& tablet,
const scoped_refptr<server::Clock>& clock,
const shared_ptr<Messenger>& messenger,
+ const scoped_refptr<ResultTracker>& result_tracker,
const scoped_refptr<Log>& log,
const scoped_refptr<MetricEntity>& metric_entity) {
@@ -143,6 +149,7 @@ Status TabletPeer::Init(const shared_ptr<Tablet>& tablet,
clock_ = clock;
messenger_ = messenger;
log_ = log;
+ result_tracker_ = result_tracker;
ConsensusOptions options;
options.tablet_id = meta_->tablet_id();
@@ -319,10 +326,12 @@ Status TabletPeer::WaitUntilConsensusRunning(const MonoDelta& timeout) {
Status TabletPeer::SubmitWrite(unique_ptr<WriteTransactionState> state) {
RETURN_NOT_OK(CheckRunning());
+ state->SetResultTracker(result_tracker_);
gscoped_ptr<WriteTransaction> transaction(new WriteTransaction(std::move(state),
consensus::LEADER));
scoped_refptr<TransactionDriver> driver;
- RETURN_NOT_OK(NewLeaderTransactionDriver(transaction.PassAs<Transaction>(), &driver));
+ RETURN_NOT_OK(NewLeaderTransactionDriver(transaction.PassAs<Transaction>(),
+ &driver));
return driver->ExecuteAsync();
}
@@ -483,7 +492,11 @@ Status TabletPeer::StartReplicaTransaction(const scoped_refptr<ConsensusRound>&
DCHECK(replicate_msg->has_write_request()) << "WRITE_OP replica"
" transaction must receive a WriteRequestPB";
unique_ptr<WriteTransactionState> tx_state(
- new WriteTransactionState(this, &replicate_msg->write_request()));
+ new WriteTransactionState(
+ this,
+ &replicate_msg->write_request(),
+ replicate_msg->has_request_id() ? &replicate_msg->request_id() : nullptr));
+ tx_state->SetResultTracker(result_tracker_);
transaction.reset(new WriteTransaction(std::move(tx_state), consensus::REPLICA));
break;
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/tablet/tablet_peer.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_peer.h b/src/kudu/tablet/tablet_peer.h
index e926f79..2ced0cb 100644
--- a/src/kudu/tablet/tablet_peer.h
+++ b/src/kudu/tablet/tablet_peer.h
@@ -42,6 +42,7 @@ class LogAnchorRegistry;
namespace rpc {
class Messenger;
+class ResultTracker;
}
namespace tserver {
@@ -78,6 +79,7 @@ class TabletPeer : public RefCountedThreadSafe<TabletPeer>,
Status Init(const std::shared_ptr<tablet::Tablet>& tablet,
const scoped_refptr<server::Clock>& clock,
const std::shared_ptr<rpc::Messenger>& messenger,
+ const scoped_refptr<rpc::ResultTracker>& result_tracker,
const scoped_refptr<log::Log>& log,
const scoped_refptr<MetricEntity>& metric_entity);
@@ -330,6 +332,9 @@ class TabletPeer : public RefCountedThreadSafe<TabletPeer>,
// can provide.
std::vector<MaintenanceOp*> maintenance_ops_;
+ // The result tracker for writes.
+ scoped_refptr<rpc::ResultTracker> result_tracker_;
+
DISALLOW_COPY_AND_ASSIGN(TabletPeer);
};
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/tablet/transactions/transaction.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction.h b/src/kudu/tablet/transactions/transaction.h
index 0a133f5..29fcbee 100644
--- a/src/kudu/tablet/transactions/transaction.h
+++ b/src/kudu/tablet/transactions/transaction.h
@@ -20,6 +20,7 @@
#include <string>
#include <mutex>
+#include <kudu/rpc/result_tracker.h>
#include "kudu/common/timestamp.h"
#include "kudu/common/wire_protocol.h"
@@ -31,6 +32,10 @@
namespace kudu {
+namespace rpc {
+class ResultTracker;
+} // namespace rpc
+
namespace tablet {
class TabletPeer;
class TransactionCompletionCallback;
@@ -137,6 +142,17 @@ class TransactionState {
// This will only return a non-null object for leader-side transactions.
virtual google::protobuf::Message* response() const { return NULL; }
+ // Returns whether the results of the transaction are being tracked.
+ bool are_results_tracked() const {
+ return result_tracker_.get() != nullptr && has_request_id();
+ }
+
+ rpc::ResultTracker* result_tracker() const { return result_tracker_.get(); }
+
+ void SetResultTracker(const scoped_refptr<rpc::ResultTracker> result_tracker) {
+ result_tracker_ = result_tracker;
+ }
+
// Sets the ConsensusRound for this transaction, if this transaction is
// being executed through the consensus system.
void set_consensus_round(const scoped_refptr<consensus::ConsensusRound>& consensus_round) {
@@ -224,6 +240,19 @@ class TransactionState {
return external_consistency_mode_;
}
+ // Returns where the transaction associated with this TransactionState had an
+ // associated transaction id.
+ bool has_request_id() const {
+ return request_id_.has_client_id();
+ }
+
+ // Returns the request id for the transaction associated with this TransactionState.
+ // Not all transactions will have a request id so users of this method should call
+ // 'has_request_id()' first to make sure it is set.
+ const rpc::RequestIdPB& request_id() const {
+ return request_id_;
+ }
+
protected:
explicit TransactionState(TabletPeer* tablet_peer);
virtual ~TransactionState();
@@ -233,6 +262,9 @@ class TransactionState {
// The tablet peer that is coordinating this transaction.
TabletPeer* const tablet_peer_;
+ // The result tracker that will cache the result of this transaction.
+ scoped_refptr<rpc::ResultTracker> result_tracker_;
+
// Optional callback to be called once the transaction completes.
gscoped_ptr<TransactionCompletionCallback> completion_clbk_;
@@ -249,6 +281,9 @@ class TransactionState {
// This OpId stores the canonical "anchor" OpId for this transaction.
consensus::OpId op_id_;
+ // The client's id for this transaction, if there is one.
+ rpc::RequestIdPB request_id_;
+
scoped_refptr<consensus::ConsensusRound> consensus_round_;
// The defined consistency mode for this transaction.
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/tablet/transactions/transaction_driver.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction_driver.cc b/src/kudu/tablet/transactions/transaction_driver.cc
index fd68ca6..1b90564 100644
--- a/src/kudu/tablet/transactions/transaction_driver.cc
+++ b/src/kudu/tablet/transactions/transaction_driver.cc
@@ -21,6 +21,7 @@
#include "kudu/consensus/consensus.h"
#include "kudu/gutil/strings/strcat.h"
+#include "kudu/rpc/result_tracker.h"
#include "kudu/tablet/tablet_peer.h"
#include "kudu/tablet/transactions/transaction_tracker.h"
#include "kudu/util/debug-util.h"
@@ -39,10 +40,42 @@ using consensus::ReplicateMsg;
using consensus::CommitMsg;
using consensus::DriverType;
using log::Log;
+using rpc::RequestIdPB;
+using rpc::ResultTracker;
using std::shared_ptr;
static const char* kTimestampFieldName = "timestamp";
+class FollowerTransactionCompletionCallback : public TransactionCompletionCallback {
+ public:
+ FollowerTransactionCompletionCallback(const RequestIdPB& request_id,
+ const google::protobuf::Message* response,
+ const scoped_refptr<ResultTracker>& result_tracker)
+ : request_id_(request_id),
+ response_(response),
+ result_tracker_(result_tracker) {}
+
+ virtual void TransactionCompleted() {
+ if (status_.ok()) {
+ result_tracker_->RecordCompletionAndRespond(request_id_, response_);
+ } else {
+ // For now we always respond with TOO_BUSY, meaning the client will retry (even if
+ // this is an unretryable failure), that works as the client-driven version of this
+ // transaction will get the right error.
+ result_tracker_->FailAndRespond(request_id_,
+ rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY,
+ status_);
+ }
+ }
+
+ virtual ~FollowerTransactionCompletionCallback() {}
+
+ private:
+ const RequestIdPB& request_id_;
+ const google::protobuf::Message* response_;
+ scoped_refptr<ResultTracker> result_tracker_;
+};
+
////////////////////////////////////////////////////////////
// TransactionDriver
@@ -79,6 +112,17 @@ Status TransactionDriver::Init(gscoped_ptr<Transaction> transaction,
DCHECK(op_id_copy_.IsInitialized());
replication_state_ = REPLICATING;
replication_start_time_ = MonoTime::Now(MonoTime::FINE);
+ if (state()->are_results_tracked()) {
+ // If this is a follower transaction, make sure to set the transaction completion callback
+ // before the transaction has a chance to fail.
+ const rpc::RequestIdPB& request_id = state()->request_id();
+ const google::protobuf::Message* response = state()->response();
+ gscoped_ptr<TransactionCompletionCallback> callback(
+ new FollowerTransactionCompletionCallback(request_id,
+ response,
+ state()->result_tracker()));
+ mutable_state()->set_completion_callback(callback.Pass());
+ }
} else {
DCHECK_EQ(type, consensus::LEADER);
gscoped_ptr<ReplicateMsg> replicate_msg;
@@ -162,16 +206,42 @@ void TransactionDriver::PrepareAndStartTask() {
}
}
+void TransactionDriver::RegisterFollowerTransactionOnResultTracker() {
+ // If this is a transaction being executed by a follower and its result is being
+ // tracked, make sure that we're the driver of the transaction.
+ if (!state()->are_results_tracked()) return;
+
+ ResultTracker::RpcState rpc_state = state()->result_tracker()->TrackRpcOrChangeDriver(
+ state()->request_id());
+ switch (rpc_state) {
+ case ResultTracker::RpcState::NEW:
+ // We're the only ones trying to execute the transaction (normal case). Proceed.
+ return;
+ // If this RPC was previously completed (like if the same tablet was bootstrapped twice)
+ // stop tracking the result. Only follower transactions can observe this state so we
+ // simply reset the callback and the result will not be tracked anymore.
+ case ResultTracker::RpcState::COMPLETED: {
+ mutable_state()->set_completion_callback(
+ gscoped_ptr<TransactionCompletionCallback>(new TransactionCompletionCallback()));
+ VLOG(1) << state()->result_tracker() << " Follower Rpc was not NEW or IN_PROGRESS: "
+ << rpc_state << " OpId: " << state()->op_id().ShortDebugString()
+ << " RequestId: " << state()->request_id().ShortDebugString();
+ return;
+ }
+ default:
+ LOG(FATAL) << "Unexpected state: " << rpc_state;
+ }
+}
+
Status TransactionDriver::PrepareAndStart() {
TRACE_EVENT1("txn", "PrepareAndStart", "txn", this);
VLOG_WITH_PREFIX(4) << "PrepareAndStart()";
// Actually prepare and start the transaction.
prepare_physical_timestamp_ = GetMonoTimeMicros();
- RETURN_NOT_OK(transaction_->Prepare());
+ RETURN_NOT_OK(transaction_->Prepare());
RETURN_NOT_OK(transaction_->Start());
-
// Only take the lock long enough to take a local copy of the
// replication state and set our prepare state. This ensures that
// exactly one of Replicate/Prepare callbacks will trigger the apply
@@ -182,12 +252,30 @@ Status TransactionDriver::PrepareAndStart() {
CHECK_EQ(prepare_state_, NOT_PREPARED);
prepare_state_ = PREPARED;
repl_state_copy = replication_state_;
+
+ // If this is a follower transaction we need to register the transaction on the tracker here,
+ // atomically with the change of the prepared state. Otherwise if the prepare thread gets
+ // preempted after the state is prepared apply can be triggered by another thread without the
+ // rpc being registered.
+ if (transaction_->type() == consensus::REPLICA) {
+ RegisterFollowerTransactionOnResultTracker();
+ // ... else we're a client-started transaction. Make sure we're still the driver of the
+ // RPC and give up if we aren't.
+ } else {
+ if (state()->are_results_tracked()
+ && !state()->result_tracker()->IsCurrentDriver(state()->request_id())) {
+ transaction_status_ = Status::AlreadyPresent(strings::Substitute(
+ "There's already an attempt of the same operation on the server for request id: $0",
+ state()->request_id().ShortDebugString()));
+ replication_state_ = REPLICATION_FAILED;
+ return transaction_status_;
+ }
+ }
}
switch (repl_state_copy) {
case NOT_REPLICATING:
{
-
// Set the timestamp in the message, now that it's prepared.
transaction_->state()->consensus_round()->replicate_msg()->set_timestamp(
transaction_->state()->timestamp().ToUint64());
@@ -200,8 +288,8 @@ Status TransactionDriver::PrepareAndStart() {
replication_state_ = REPLICATING;
replication_start_time_ = MonoTime::Now(MonoTime::FINE);
}
- Status s = consensus_->Replicate(mutable_state()->consensus_round());
+ Status s = consensus_->Replicate(mutable_state()->consensus_round());
if (PREDICT_FALSE(!s.ok())) {
std::lock_guard<simple_spinlock> lock(lock_);
CHECK_EQ(replication_state_, REPLICATING);
@@ -244,7 +332,6 @@ void TransactionDriver::HandleFailure(const Status& s) {
repl_state_copy = replication_state_;
}
-
switch (repl_state_copy) {
case NOT_REPLICATING:
case REPLICATION_FAILED:
@@ -297,6 +384,7 @@ void TransactionDriver::ReplicationFinished(const Status& status) {
replication_duration = replication_finished_time.GetDeltaSince(replication_start_time_);
}
+
TRACE_COUNTER_INCREMENT("replication_time_us", replication_duration.ToMicroseconds());
// If we have prepared and replicated, we're ready
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/tablet/transactions/transaction_driver.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction_driver.h b/src/kudu/tablet/transactions/transaction_driver.h
index 874fd55..0501299 100644
--- a/src/kudu/tablet/transactions/transaction_driver.h
+++ b/src/kudu/tablet/transactions/transaction_driver.h
@@ -19,6 +19,7 @@
#define KUDU_TABLET_TRANSACTION_DRIVER_H_
#include <string>
+#include <kudu/rpc/result_tracker.h>
#include "kudu/consensus/consensus.h"
#include "kudu/gutil/ref_counted.h"
@@ -34,6 +35,10 @@ namespace log {
class Log;
} // namespace log
+namespace rpc {
+class ResultTracker;
+}
+
namespace tablet {
class TransactionOrderVerifier;
class TransactionTracker;
@@ -89,6 +94,89 @@ class TransactionTracker;
//
// [1] - see 'Implementation Techniques for Main Memory Database Systems', DeWitt et. al.
//
+// ===========================================================================================
+//
+// Tracking transaction results for exactly once semantics
+//
+// Exactly once semantics for transactions require that the results of previous executions
+// of a transaction be cached and replayed to the client, when a duplicate request is received.
+// For single server operations, this can be encapsulated on the rpc layer, but for replicated
+// ones, like transactions, it needs additional care, as multiple copies of an RPC can arrive
+// from different sources. For instance a client might be retrying an operation on a different
+// tablet server, while that tablet server actually received the same operation from a previous
+// leader.
+//
+// The prepare phase of transactions is single threaded, so it's an ideal place to register
+// follower transactions with the result tracker and deduplicate possible multiple attempts.
+// However rpc's from clients are first registered as they first arrive, outside of the prepare
+// phase, and requests from another replica might arrive just as we're becoming leader, so we
+// need to account for all the possible interleavings.
+//
+// Relevant constraints:
+//
+// 1 - Before a replica becomes leader (i.e. accepts new client requests), it has already enqueued
+// all current requests on the prepare queue.
+//
+// 2 - If a replica is not leader it rejects client requests on the prepare phase.
+//
+// 3 - Replicated transaction, i.e. ones received as a follower of another (leader) replica, are
+// registered with the result tracker on the prepare phase itself. This constrains the possible
+// interleavings because when a client-originated request prepares, it will either observe a
+// previous request and abort, or it won't observe it and know it is the first attempt at
+// executing that transaction.
+//
+// Given these constraints the following interleavings might happen, on a newly elected leader:
+//
+// CR - Client Request
+// RR - Replica Request
+//
+//
+// a) --------- b) ---------
+// | |
+// RR prepares->1 CR arrives->1
+// | |
+// 2<-CR arrives 2<-RR prepares
+// | |
+// 3<-CR attaches 3<-CR prepares/aborts
+// | |
+// RR completes->4 CR retries->4
+// | |
+// CR replies cached->5 CR attaches/replies cached->5
+// | |
+// --------- ---------
+//
+// Case a) is the simplest. The client retries a request (2) after the duplicate request from the
+// previous leader has prepared (1). When trying to register the result in the ResultTracker (2)
+// the client's request will be "attached" (3) to the execution of the previously running request.
+// When it completes (4) the client will receive the cached response (5).
+//
+// Case b) is a slightly more complex. When the client request arrives (1) the request from the
+// previous leader hasn't yet prepared, meaning that the client request will be marked as a NEW
+// request and its handler will proceed to try and prepare. In the meanwhile the replica request
+// prepares (2). Since the replica request must take precedence over the client's request, the
+// replica request must be the one to actually execute the operations, i.e. it must force the client
+// request's handler to abort (3).
+// It does this in two ways:
+//
+// - It immediately calls ResultTracker::TrackRpcOrChangeDriver(), this will make sure that it is
+// registered as the handler driver and that it is the only one whose response will be stored.
+//
+// - Later on, when the handler for the client request finally tries to prepare (3), it will observe
+// that it is no longer the driver of the transaction (his attempt_no has been replaced by
+// the replica request's attempt_no) and will later call ResultTracker::FailAndRespond()
+// causing the client to retry later (4,5).
+//
+// After the client receives the error sent on (2) it retries the operation (4) which will then
+// attach to the replica request's execution and receive the same response when it is done (5).
+//
+// Additional notes:
+//
+// The above diagrams don't distinguish between arrive/prepare for replica requests as registering
+// with the result tracker and preparing in that case happen atomically.
+//
+// If the client request was to proceed to prepare _before_ the replica request prepared, it would
+// still be ok, as it would get aborted because the replica wasn't a leader yet (constraints 1/2).
+//
// This class is thread safe.
class TransactionDriver : public RefCountedThreadSafe<TransactionDriver> {
@@ -212,6 +300,11 @@ class TransactionDriver : public RefCountedThreadSafe<TransactionDriver> {
void SetResponseTimestamp(TransactionState* transaction_state,
const Timestamp& timestamp);
+ // If this driver is executing a follower transaction then it is possible
+ // it never went through the rpc system so we have to register it with the
+ // ResultTracker.
+ void RegisterFollowerTransactionOnResultTracker();
+
TransactionTracker* const txn_tracker_;
consensus::Consensus* const consensus_;
log::Log* const log_;
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/tablet/transactions/write_transaction.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/write_transaction.cc b/src/kudu/tablet/transactions/write_transaction.cc
index df4bb33..6b4c0c6 100644
--- a/src/kudu/tablet/transactions/write_transaction.cc
+++ b/src/kudu/tablet/transactions/write_transaction.cc
@@ -65,6 +65,9 @@ void WriteTransaction::NewReplicateMsg(gscoped_ptr<ReplicateMsg>* replicate_msg)
replicate_msg->reset(new ReplicateMsg);
(*replicate_msg)->set_op_type(WRITE_OP);
(*replicate_msg)->mutable_write_request()->CopyFrom(*state()->request());
+ if (state()->are_results_tracked()) {
+ (*replicate_msg)->mutable_request_id()->CopyFrom(state()->request_id());
+ }
}
Status WriteTransaction::Prepare() {
@@ -126,7 +129,7 @@ Status WriteTransaction::Apply(gscoped_ptr<CommitMsg>* commit_msg) {
// Add per-row errors to the result, update metrics.
int i = 0;
for (const RowOp* op : state()->row_ops()) {
- if (state()->response() != nullptr && op->result->has_failed_status()) {
+ if (op->result->has_failed_status()) {
// Replicas disregard the per row errors, for now
// TODO check the per-row errors against the leader's, at least in debug mode
WriteResponsePB::PerRowErrorPB* error = state()->response()->add_per_row_errors();
@@ -201,6 +204,7 @@ string WriteTransaction::ToString() const {
WriteTransactionState::WriteTransactionState(TabletPeer* tablet_peer,
const tserver::WriteRequestPB *request,
+ const rpc::RequestIdPB* request_id,
tserver::WriteResponsePB *response)
: TransactionState(tablet_peer),
request_(DCHECK_NOTNULL(request)),
@@ -208,6 +212,12 @@ WriteTransactionState::WriteTransactionState(TabletPeer* tablet_peer,
mvcc_tx_(nullptr),
schema_at_decode_time_(nullptr) {
external_consistency_mode_ = request_->external_consistency_mode();
+ if (!response_) {
+ response_ = &owned_response_;
+ }
+ if (request_id) {
+ request_id_ = *request_id;
+ }
}
void WriteTransactionState::SetMvccTxAndTimestamp(gscoped_ptr<ScopedTransaction> mvcc_tx) {
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/tablet/transactions/write_transaction.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/write_transaction.h b/src/kudu/tablet/transactions/write_transaction.h
index 134c04b..05f84ff 100644
--- a/src/kudu/tablet/transactions/write_transaction.h
+++ b/src/kudu/tablet/transactions/write_transaction.h
@@ -74,6 +74,7 @@ class WriteTransactionState : public TransactionState {
public:
WriteTransactionState(TabletPeer* tablet_peer,
const tserver::WriteRequestPB *request,
+ const rpc::RequestIdPB* request_id,
tserver::WriteResponsePB *response = NULL);
virtual ~WriteTransactionState();
@@ -185,9 +186,13 @@ class WriteTransactionState : public TransactionState {
// from the request).
void ResetRpcFields();
- // pointers to the rpc context, request and response, lifecyle
- // is managed by the rpc subsystem. These pointers maybe NULL if the
- // transaction was not initiated by an RPC call.
+ // An owned version of the response, for follower transactions.
+ tserver::WriteResponsePB owned_response_;
+
+ // The lifecycle of these pointers request and response, is not managed by this class.
+ // These pointers are never null: 'request_' is always set on construction and 'response_' is
+ // either set to the response passed on the ctor, if there is one, or to point to
+ // 'owned_response_' if there isn't.
const tserver::WriteRequestPB* request_;
tserver::WriteResponsePB* response_;
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/tools/insert-generated-rows.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/insert-generated-rows.cc b/src/kudu/tools/insert-generated-rows.cc
index 8e59857..f0b0b1b 100644
--- a/src/kudu/tools/insert-generated-rows.cc
+++ b/src/kudu/tools/insert-generated-rows.cc
@@ -107,11 +107,7 @@ static int WriteRandomDataToTable(int argc, char** argv) {
session->GetPendingErrors(&errors, &overflow);
CHECK(!overflow);
for (const client::KuduError* e : errors) {
- if (e->status().IsAlreadyPresent()) {
- LOG(WARNING) << "Ignoring insert error: " << e->status().ToString();
- } else {
- LOG(FATAL) << "Unexpected insert error: " << e->status().ToString();
- }
+ LOG(FATAL) << "Unexpected insert error: " << e->status().ToString();
}
continue;
}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/tserver/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/CMakeLists.txt b/src/kudu/tserver/CMakeLists.txt
index 6b104b2..8568fea 100644
--- a/src/kudu/tserver/CMakeLists.txt
+++ b/src/kudu/tserver/CMakeLists.txt
@@ -47,6 +47,7 @@ PROTOBUF_GENERATE_CPP(
PROTO_FILES tserver.proto)
set(TSERVER_PROTO_LIBS
kudu_common_proto
+ krpc
consensus_metadata_proto
tablet_proto
wire_protocol_proto)
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/tserver/remote_bootstrap_session-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/remote_bootstrap_session-test.cc b/src/kudu/tserver/remote_bootstrap_session-test.cc
index bd34df8..d354699 100644
--- a/src/kudu/tserver/remote_bootstrap_session-test.cc
+++ b/src/kudu/tserver/remote_bootstrap_session-test.cc
@@ -133,6 +133,7 @@ class RemoteBootstrapTest : public KuduTabletTest {
ASSERT_OK(tablet_peer_->Init(tablet(),
clock(),
messenger,
+ scoped_refptr<rpc::ResultTracker>(),
log,
metric_entity));
consensus::ConsensusBootstrapInfo boot_info;
@@ -163,7 +164,10 @@ class RemoteBootstrapTest : public KuduTabletTest {
CountDownLatch latch(1);
unique_ptr<tablet::WriteTransactionState> state(
- new tablet::WriteTransactionState(tablet_peer_.get(), &req, &resp));
+ new tablet::WriteTransactionState(tablet_peer_.get(),
+ &req,
+ nullptr, // No RequestIdPB
+ &resp));
state->set_completion_callback(gscoped_ptr<tablet::TransactionCompletionCallback>(
new tablet::LatchTransactionCompletionCallback<WriteResponsePB>(&latch, &resp)));
ASSERT_OK(tablet_peer_->SubmitWrite(std::move(state)));
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/tserver/tablet_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index f1f9738..85e24a2 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -259,10 +259,7 @@ static void SetupErrorAndRespond(TabletServerErrorPB* error,
StatusToPB(s, error->mutable_status());
error->set_code(code);
- // TODO: rename RespondSuccess() to just "Respond" or
- // "SendResponse" since we use it for application-level error
- // responses, and this just looks confusing!
- context->RespondSuccess();
+ context->RespondNoCache();
}
template <class ReqType, class RespType>
@@ -732,9 +729,11 @@ void TabletServiceImpl::Write(const WriteRequestPB* req,
return;
}
- unique_ptr<WriteTransactionState> tx_state(new WriteTransactionState(tablet_peer.get(),
- req,
- resp));
+ unique_ptr<WriteTransactionState> tx_state(new WriteTransactionState(
+ tablet_peer.get(),
+ req,
+ context->AreResultsTracked() ? context->request_id() : nullptr,
+ resp));
// If the client sent us a timestamp, decode it and update the clock so that all future
// timestamps are greater than the passed timestamp.
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/tserver/ts_tablet_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index a528871..319011a 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -36,6 +36,8 @@
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/strings/util.h"
#include "kudu/master/master.pb.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/result_tracker.h"
#include "kudu/tablet/metadata.pb.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet.pb.h"
@@ -45,6 +47,7 @@
#include "kudu/tserver/heartbeater.h"
#include "kudu/tserver/remote_bootstrap_client.h"
#include "kudu/tserver/tablet_server.h"
+#include "kudu/tserver/tablet_service.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/env.h"
#include "kudu/util/env_util.h"
@@ -126,6 +129,7 @@ using consensus::StartRemoteBootstrapRequestPB;
using log::Log;
using master::ReportedTabletPB;
using master::TabletReportPB;
+using rpc::ResultTracker;
using std::shared_ptr;
using std::string;
using std::vector;
@@ -640,6 +644,7 @@ void TSTabletManager::OpenTablet(const scoped_refptr<TabletMetadata>& meta,
s = BootstrapTablet(meta,
scoped_refptr<server::Clock>(server_->clock()),
server_->mem_tracker(),
+ server_->result_tracker(),
metric_registry_,
tablet_peer->status_listener(),
&tablet,
@@ -660,6 +665,7 @@ void TSTabletManager::OpenTablet(const scoped_refptr<TabletMetadata>& meta,
s = tablet_peer->Init(tablet,
scoped_refptr<server::Clock>(server_->clock()),
server_->messenger(),
+ server_->result_tracker(),
log,
tablet->GetMetricEntity());
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/tserver/ts_tablet_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/ts_tablet_manager.h b/src/kudu/tserver/ts_tablet_manager.h
index 820016f..a0d362b 100644
--- a/src/kudu/tserver/ts_tablet_manager.h
+++ b/src/kudu/tserver/ts_tablet_manager.h
@@ -52,6 +52,10 @@ class ReportedTabletPB;
class TabletReportPB;
} // namespace master
+namespace rpc {
+class ResultTracker;
+} // namespace rpc
+
namespace tablet {
class TabletMetadata;
class TabletPeer;
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/tserver/tserver_service.proto
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tserver_service.proto b/src/kudu/tserver/tserver_service.proto
index b6010fd..eba8fa9 100644
--- a/src/kudu/tserver/tserver_service.proto
+++ b/src/kudu/tserver/tserver_service.proto
@@ -18,12 +18,15 @@ package kudu.tserver;
option java_package = "org.kududb.tserver";
+import "kudu/rpc/rpc_header.proto";
import "kudu/tserver/tserver.proto";
service TabletServerService {
rpc Ping(PingRequestPB) returns (PingResponsePB);
- rpc Write(WriteRequestPB) returns (WriteResponsePB);
+ rpc Write(WriteRequestPB) returns (WriteResponsePB) {
+ option (kudu.rpc.track_rpc_result) = true;
+ }
rpc Scan(ScanRequestPB) returns (ScanResponsePB);
rpc ScannerKeepAlive(ScannerKeepAliveRequestPB) returns (ScannerKeepAliveResponsePB);
rpc ListTablets(ListTabletsRequestPB) returns (ListTabletsResponsePB);