You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2016/07/15 00:57:43 UTC

[1/6] incubator-kudu git commit: Disable exactly once semantics by default and add a flag to enable it for tests

Repository: incubator-kudu
Updated Branches:
  refs/heads/master b666cc07e -> 59ab14b9a


Disable exactly once semantics by default and add a flag to enable it for tests

Since exactly once semantics is still missing some pieces, like garbage collection
this disables it by default on the server, but adds a flag to allow enabling
it and enables it in all tablet server tests, by default.

Change-Id: I77096be608afb31194f62f04a946bd3f42537a35
Reviewed-on: http://gerrit.cloudera.org:8080/3506
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/257ba292
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/257ba292
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/257ba292

Branch: refs/heads/master
Commit: 257ba2923ad962193fbca49b3d22b9fb6a578509
Parents: b666cc0
Author: David Alves <da...@cloudera.com>
Authored: Mon Jun 27 02:09:09 2016 -0700
Committer: David Ribeiro Alves <dr...@apache.org>
Committed: Thu Jul 14 22:57:20 2016 +0000

----------------------------------------------------------------------
 .../integration-tests/external_mini_cluster-itest-base.h  |  2 ++
 src/kudu/integration-tests/remote_bootstrap-itest.cc      |  3 +++
 src/kudu/integration-tests/ts_itest-base.h                |  4 ++++
 src/kudu/rpc/rpc-test-base.h                              |  3 +++
 src/kudu/rpc/service_if.cc                                | 10 +++++++++-
 src/kudu/tserver/tablet_server-test-base.h                |  5 +++++
 6 files changed, 26 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/257ba292/src/kudu/integration-tests/external_mini_cluster-itest-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/external_mini_cluster-itest-base.h b/src/kudu/integration-tests/external_mini_cluster-itest-base.h
index e2a9cf5..0fde8d4 100644
--- a/src/kudu/integration-tests/external_mini_cluster-itest-base.h
+++ b/src/kudu/integration-tests/external_mini_cluster-itest-base.h
@@ -77,6 +77,8 @@ void ExternalMiniClusterITestBase::StartCluster(const std::vector<std::string>&
   opts.num_tablet_servers = num_tablet_servers;
   opts.extra_master_flags = extra_master_flags;
   opts.extra_tserver_flags = extra_ts_flags;
+  // TODO remove when this is enabled by default.
+  opts.extra_tserver_flags.push_back("--enable_exactly_once");
   opts.extra_tserver_flags.push_back("--never_fsync"); // fsync causes flakiness on EC2.
   cluster_.reset(new ExternalMiniCluster(opts));
   ASSERT_OK(cluster_->Start());

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/257ba292/src/kudu/integration-tests/remote_bootstrap-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/remote_bootstrap-itest.cc b/src/kudu/integration-tests/remote_bootstrap-itest.cc
index e9bf883..661ed21 100644
--- a/src/kudu/integration-tests/remote_bootstrap-itest.cc
+++ b/src/kudu/integration-tests/remote_bootstrap-itest.cc
@@ -112,6 +112,9 @@ void RemoteBootstrapITest::StartCluster(const vector<string>& extra_tserver_flag
   ExternalMiniClusterOptions opts;
   opts.num_tablet_servers = num_tablet_servers;
   opts.extra_tserver_flags = extra_tserver_flags;
+  // Enable EO semantics for tests.
+  // TODO remove this once EO is the default.
+  opts.extra_tserver_flags.push_back("--enable_exactly_once");
   opts.extra_tserver_flags.push_back("--never_fsync"); // fsync causes flakiness on EC2.
   opts.extra_master_flags = extra_master_flags;
   cluster_.reset(new ExternalMiniCluster(opts));

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/257ba292/src/kudu/integration-tests/ts_itest-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/ts_itest-base.h b/src/kudu/integration-tests/ts_itest-base.h
index ad38b3e..225c942 100644
--- a/src/kudu/integration-tests/ts_itest-base.h
+++ b/src/kudu/integration-tests/ts_itest-base.h
@@ -97,6 +97,10 @@ class TabletServerIntegrationTestBase : public TabletServerTestBase {
     opts.num_tablet_servers = FLAGS_num_tablet_servers;
     opts.data_root = GetTestPath(data_root_path);
 
+    // Enable exactly once semantics for tests.
+    // TODO remove this once we have ResultTracker GC
+    opts.extra_tserver_flags.push_back("--enable_exactly_once");
+
     // If the caller passed no flags use the default ones, where we stress consensus by setting
     // low timeouts and frequent cache misses.
     if (non_default_ts_flags.empty()) {

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/257ba292/src/kudu/rpc/rpc-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h
index ec0edc9..7b82b08 100644
--- a/src/kudu/rpc/rpc-test-base.h
+++ b/src/kudu/rpc/rpc-test-base.h
@@ -45,6 +45,8 @@
 #include "kudu/util/test_util.h"
 #include "kudu/util/trace.h"
 
+DECLARE_bool(enable_exactly_once);
+
 namespace kudu { namespace rpc {
 
 using kudu::rpc_test::AddRequestPB;
@@ -318,6 +320,7 @@ class RpcTestBase : public KuduTest {
       n_server_reactor_threads_(3),
       keepalive_time_ms_(1000),
       metric_entity_(METRIC_ENTITY_server.Instantiate(&metric_registry_, "test.rpc_test")) {
+    FLAGS_enable_exactly_once = true;
   }
 
   void SetUp() override {

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/257ba292/src/kudu/rpc/service_if.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/service_if.cc b/src/kudu/rpc/service_if.cc
index 4a8c387..8b47ce9 100644
--- a/src/kudu/rpc/service_if.cc
+++ b/src/kudu/rpc/service_if.cc
@@ -27,6 +27,12 @@
 #include "kudu/rpc/inbound_call.h"
 #include "kudu/rpc/rpc_context.h"
 #include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/util/flag_tags.h"
+
+// TODO remove this once we have ResultTracker GC
+DEFINE_bool(enable_exactly_once, false, "Whether to enable exactly once semantics on the client "
+    "(experimental).");
+TAG_FLAG(enable_exactly_once, experimental);
 
 using google::protobuf::Message;
 using std::string;
@@ -92,7 +98,9 @@ void GeneratedServiceIf::Handle(InboundCall *call) {
   }
   Message* resp = method_info->resp_prototype->New();
 
-  bool track_result = call->header().has_request_id() && method_info->track_result;
+  bool track_result = call->header().has_request_id()
+                      && method_info->track_result
+                      && FLAGS_enable_exactly_once;
   RpcContext* ctx = new RpcContext(call,
                                    req.release(),
                                    resp,

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/257ba292/src/kudu/tserver/tablet_server-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server-test-base.h b/src/kudu/tserver/tablet_server-test-base.h
index 8f0b757..1d5022e 100644
--- a/src/kudu/tserver/tablet_server-test-base.h
+++ b/src/kudu/tserver/tablet_server-test-base.h
@@ -58,6 +58,7 @@
 DEFINE_int32(rpc_timeout, 1000, "Timeout for RPC calls, in seconds");
 DEFINE_int32(num_updater_threads, 1, "Number of updating threads to launch");
 DECLARE_bool(log_force_fsync_all);
+DECLARE_bool(enable_exactly_once);
 DECLARE_bool(enable_maintenance_manager);
 DECLARE_bool(enable_data_block_fsync);
 DECLARE_int32(heartbeat_rpc_timeout_ms);
@@ -80,6 +81,10 @@ class TabletServerTestBase : public KuduTest {
     // maintenance operations at predetermined times.
     FLAGS_enable_maintenance_manager = false;
 
+    // Enable exactly once semantics, for tests.
+    // TODO remove this once we have ResultTracker GC
+    FLAGS_enable_exactly_once = true;
+
     // Decrease heartbeat timeout: we keep re-trying heartbeats when a
     // single master server fails due to a network error. Decreasing
     // the hearbeat timeout to 1 second speeds up unit tests which


[4/6] incubator-kudu git commit: Add a way to include request ids in log-dump

Posted by mp...@apache.org.
Add a way to include request ids in log-dump

This adds a way to output the request ids, if they exist, in log-dump.cc.
This is helpful when debugging as it allows to see which writes the
request ids get associated with.

Change-Id: I88d7c65887a98544ee83b5b4bc0817bea7222131
Reviewed-on: http://gerrit.cloudera.org:8080/3612
Reviewed-by: David Ribeiro Alves <dr...@apache.org>
Tested-by: David Ribeiro Alves <dr...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/0412fa76
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/0412fa76
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/0412fa76

Branch: refs/heads/master
Commit: 0412fa7686d70720b7ac67b66ae85dc832963636
Parents: ee38093
Author: David Alves <da...@cloudera.com>
Authored: Mon Jul 4 14:58:53 2016 -0700
Committer: David Ribeiro Alves <dr...@apache.org>
Committed: Fri Jul 15 00:05:41 2016 +0000

----------------------------------------------------------------------
 src/kudu/consensus/log-dump.cc | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/0412fa76/src/kudu/consensus/log-dump.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log-dump.cc b/src/kudu/consensus/log-dump.cc
index e26d778..8bab8fc 100644
--- a/src/kudu/consensus/log-dump.cc
+++ b/src/kudu/consensus/log-dump.cc
@@ -28,6 +28,7 @@
 #include "kudu/consensus/log_reader.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/numbers.h"
+#include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/util/env.h"
 #include "kudu/util/flags.h"
 #include "kudu/util/logging.h"
@@ -104,7 +105,8 @@ void PrintIdOnly(const LogEntryPB& entry) {
 
 Status PrintDecodedWriteRequestPB(const string& indent,
                                   const Schema& tablet_schema,
-                                  const WriteRequestPB& write) {
+                                  const WriteRequestPB& write,
+                                  const rpc::RequestIdPB* request_id) {
   Schema request_schema;
   RETURN_NOT_OK(SchemaFromPB(write.schema(), &request_schema));
 
@@ -114,6 +116,8 @@ Status PrintDecodedWriteRequestPB(const string& indent,
   RETURN_NOT_OK(dec.DecodeOperations(&ops));
 
   cout << indent << "Tablet: " << write.tablet_id() << endl;
+  cout << indent << "RequestId: "
+      << (request_id ? request_id->ShortDebugString() : "None") << endl;
   cout << indent << "Consistency: "
        << ExternalConsistencyMode_Name(write.external_consistency_mode()) << endl;
   if (write.has_propagated_timestamp()) {
@@ -139,7 +143,11 @@ Status PrintDecoded(const LogEntryPB& entry, const Schema& tablet_schema) {
 
     const ReplicateMsg& replicate = entry.replicate();
     if (replicate.op_type() == consensus::WRITE_OP) {
-      RETURN_NOT_OK(PrintDecodedWriteRequestPB(indent, tablet_schema, replicate.write_request()));
+      RETURN_NOT_OK(PrintDecodedWriteRequestPB(
+          indent,
+          tablet_schema,
+          replicate.write_request(),
+          replicate.has_request_id() ? &replicate.request_id() : nullptr));
     } else {
       cout << indent << replicate.ShortDebugString() << endl;
     }


[6/6] incubator-kudu git commit: KUDU-1530: Update docs about OS X build dependency on Xcode package

Posted by mp...@apache.org.
KUDU-1530: Update docs about OS X build dependency on Xcode package

Change-Id: I8962d5539f437dba8b120c70c90c1e384ed550c9
Reviewed-on: http://gerrit.cloudera.org:8080/3653
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Mike Percy <mp...@apache.org>
Reviewed-by: Mike Percy <mp...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/59ab14b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/59ab14b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/59ab14b9

Branch: refs/heads/master
Commit: 59ab14b9a8927dd08b34c3e4e6be9649f6e7dd9c
Parents: 6d2679b
Author: Dinesh Bhat <di...@cloudera.com>
Authored: Thu Jul 14 12:25:36 2016 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Fri Jul 15 00:57:01 2016 +0000

----------------------------------------------------------------------
 docs/installation.adoc | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/59ab14b9/docs/installation.adoc
----------------------------------------------------------------------
diff --git a/docs/installation.adoc b/docs/installation.adoc
index 2c3918c..c7a343f 100644
--- a/docs/installation.adoc
+++ b/docs/installation.adoc
@@ -471,10 +471,9 @@ make -j4
 
 [[osx_from_source]]
 === OS X
-The Xcode toolchain is necessary for compiling Kudu. Use `xcode-select --install`
-to install the Xcode Command Line Tools if Xcode is not already installed. These
-instructions use link:http://brew.sh/[Homebrew] to install dependencies, but
-manual dependency installation is possible.
+The link:https://developer.apple.com/xcode/[Xcode] package is necessary for
+compiling Kudu. Some of the instructions below use link:http://brew.sh/[Homebrew]
+to install dependencies, but manual dependency installation is possible.
 
 [WARNING]
 .OS X Known Issues


[2/6] incubator-kudu git commit: Move the maintenance manager to util

Posted by mp...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/util/maintenance_manager.proto
----------------------------------------------------------------------
diff --git a/src/kudu/util/maintenance_manager.proto b/src/kudu/util/maintenance_manager.proto
new file mode 100644
index 0000000..04b4519
--- /dev/null
+++ b/src/kudu/util/maintenance_manager.proto
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package kudu;
+
+option java_package = "org.kududb";
+
+// Used to present the maintenance manager's internal state.
+message MaintenanceManagerStatusPB {
+  message MaintenanceOpPB {
+    required string name = 1;
+    // Number of times this operation is currently running.
+    required uint32 running = 2;
+    required bool runnable = 3;
+    required uint64 ram_anchored_bytes = 4;
+    required int64 logs_retained_bytes = 5;
+    required double perf_improvement = 6;
+  }
+
+  message CompletedOpPB {
+    required string name = 1;
+    required int32 duration_millis = 2;
+    // Number of seconds since this operation started.
+    required int32 secs_since_start = 3;
+  }
+
+  // The next operation that would run.
+  optional MaintenanceOpPB best_op = 1;
+
+  // List of all the operations.
+  repeated MaintenanceOpPB registered_operations = 2;
+
+  // This list isn't in order of anything. Can contain the same operation mutiple times.
+  repeated CompletedOpPB completed_operations = 3;
+}
\ No newline at end of file


[3/6] incubator-kudu git commit: Move the maintenance manager to util

Posted by mp...@apache.org.
Move the maintenance manager to util

This moves the maintenance manager to util, along with its required protobufs.
This move is required as we'll need a server-level MaintenanceMananager to
run garbage collection on the ResultTracker.

Change-Id: Ifcf072d443ac3d069bda15b9dc0f8c442b9ac5c0
Reviewed-on: http://gerrit.cloudera.org:8080/3656
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/ee380939
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/ee380939
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/ee380939

Branch: refs/heads/master
Commit: ee38093949fd7ccb773025e6ec7e8e60daa31149
Parents: 257ba29
Author: dralves <dr...@apache.org>
Authored: Thu Jul 14 14:29:26 2016 -0700
Committer: David Ribeiro Alves <dr...@apache.org>
Committed: Thu Jul 14 23:48:30 2016 +0000

----------------------------------------------------------------------
 .../full_stack-insert-scan-test.cc              |   2 +-
 src/kudu/master/master.cc                       |   2 +-
 src/kudu/tablet/CMakeLists.txt                  |   2 -
 src/kudu/tablet/maintenance_manager-test.cc     | 287 -------------
 src/kudu/tablet/maintenance_manager.cc          | 419 -------------------
 src/kudu/tablet/maintenance_manager.h           | 280 -------------
 src/kudu/tablet/tablet.cc                       |   2 +-
 src/kudu/tablet/tablet.proto                    |  29 --
 src/kudu/tablet/tablet_mm_ops.h                 |   2 +-
 src/kudu/tablet/tablet_peer-test.cc             |   2 +-
 src/kudu/tablet/tablet_peer_mm_ops.cc           |   2 +-
 src/kudu/tablet/tablet_peer_mm_ops.h            |   2 +-
 src/kudu/tserver/mini_tablet_server.cc          |   2 +-
 src/kudu/tserver/tablet_server-test-base.h      |   2 +-
 src/kudu/tserver/tablet_server.cc               |   2 +-
 src/kudu/tserver/tserver-path-handlers.cc       |   8 +-
 src/kudu/util/CMakeLists.txt                    |  17 +
 src/kudu/util/maintenance_manager-test.cc       | 286 +++++++++++++
 src/kudu/util/maintenance_manager.cc            | 415 ++++++++++++++++++
 src/kudu/util/maintenance_manager.h             | 277 ++++++++++++
 src/kudu/util/maintenance_manager.proto         |  48 +++
 21 files changed, 1057 insertions(+), 1031 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/integration-tests/full_stack-insert-scan-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/full_stack-insert-scan-test.cc b/src/kudu/integration-tests/full_stack-insert-scan-test.cc
index 78f2f9d..5a1a61b 100644
--- a/src/kudu/integration-tests/full_stack-insert-scan-test.cc
+++ b/src/kudu/integration-tests/full_stack-insert-scan-test.cc
@@ -37,7 +37,6 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/integration-tests/mini_cluster.h"
 #include "kudu/master/mini_master.h"
-#include "kudu/tablet/maintenance_manager.h"
 #include "kudu/tablet/tablet.h"
 #include "kudu/tablet/tablet_metrics.h"
 #include "kudu/tablet/tablet_peer.h"
@@ -47,6 +46,7 @@
 #include "kudu/util/async_util.h"
 #include "kudu/util/countdown_latch.h"
 #include "kudu/util/errno.h"
+#include "kudu/util/maintenance_manager.h"
 #include "kudu/util/stopwatch.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/master/master.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/master.cc b/src/kudu/master/master.cc
index adb9533..c42d7f0 100644
--- a/src/kudu/master/master.cc
+++ b/src/kudu/master/master.cc
@@ -36,9 +36,9 @@
 #include "kudu/rpc/service_if.h"
 #include "kudu/rpc/service_pool.h"
 #include "kudu/server/rpc_server.h"
-#include "kudu/tablet/maintenance_manager.h"
 #include "kudu/tserver/tablet_service.h"
 #include "kudu/util/flag_tags.h"
+#include "kudu/util/maintenance_manager.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/status.h"

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/tablet/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/CMakeLists.txt b/src/kudu/tablet/CMakeLists.txt
index e7e94c9..c6baa67 100644
--- a/src/kudu/tablet/CMakeLists.txt
+++ b/src/kudu/tablet/CMakeLists.txt
@@ -33,7 +33,6 @@ set(TABLET_SRCS
   delta_key.cc
   diskrowset.cc
   lock_manager.cc
-  maintenance_manager.cc
   memrowset.cc
   multi_column_writer.cc
   mutation.cc
@@ -98,7 +97,6 @@ ADD_KUDU_TEST(cfile_set-test)
 ADD_KUDU_TEST(tablet-pushdown-test)
 ADD_KUDU_TEST(tablet-schema-test)
 ADD_KUDU_TEST(tablet_bootstrap-test)
-ADD_KUDU_TEST(maintenance_manager-test)
 ADD_KUDU_TEST(metadata-test)
 ADD_KUDU_TEST(mvcc-test)
 ADD_KUDU_TEST(compaction-test)

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/tablet/maintenance_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/maintenance_manager-test.cc b/src/kudu/tablet/maintenance_manager-test.cc
deleted file mode 100644
index 663ed47..0000000
--- a/src/kudu/tablet/maintenance_manager-test.cc
+++ /dev/null
@@ -1,287 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <gflags/gflags.h>
-#include <gtest/gtest.h>
-#include <memory>
-#include <mutex>
-#include <vector>
-
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/tablet/maintenance_manager.h"
-#include "kudu/tablet/tablet.pb.h"
-#include "kudu/util/mem_tracker.h"
-#include "kudu/util/metrics.h"
-#include "kudu/util/test_macros.h"
-#include "kudu/util/test_util.h"
-#include "kudu/util/thread.h"
-
-using kudu::tablet::MaintenanceManagerStatusPB;
-using std::shared_ptr;
-using std::vector;
-using strings::Substitute;
-
-METRIC_DEFINE_entity(test);
-METRIC_DEFINE_gauge_uint32(test, maintenance_ops_running,
-                           "Number of Maintenance Operations Running",
-                           kudu::MetricUnit::kMaintenanceOperations,
-                           "The number of background maintenance operations currently running.");
-METRIC_DEFINE_histogram(test, maintenance_op_duration,
-                        "Maintenance Operation Duration",
-                        kudu::MetricUnit::kSeconds, "", 60000000LU, 2);
-
-namespace kudu {
-
-const int kHistorySize = 4;
-
-class MaintenanceManagerTest : public KuduTest {
- public:
-  MaintenanceManagerTest() {
-    test_tracker_ = MemTracker::CreateTracker(1000, "test");
-    MaintenanceManager::Options options;
-    options.num_threads = 2;
-    options.polling_interval_ms = 1;
-    options.history_size = kHistorySize;
-    options.parent_mem_tracker = test_tracker_;
-    manager_.reset(new MaintenanceManager(options));
-    manager_->Init();
-  }
-  ~MaintenanceManagerTest() {
-    manager_->Shutdown();
-  }
-
- protected:
-  shared_ptr<MemTracker> test_tracker_;
-  shared_ptr<MaintenanceManager> manager_;
-};
-
-// Just create the MaintenanceManager and then shut it down, to make sure
-// there are no race conditions there.
-TEST_F(MaintenanceManagerTest, TestCreateAndShutdown) {
-}
-
-enum TestMaintenanceOpState {
-  OP_DISABLED,
-  OP_RUNNABLE,
-  OP_RUNNING,
-  OP_FINISHED,
-};
-
-class TestMaintenanceOp : public MaintenanceOp {
- public:
-  TestMaintenanceOp(const std::string& name,
-                    IOUsage io_usage,
-                    TestMaintenanceOpState state,
-                    const shared_ptr<MemTracker>& tracker)
-    : MaintenanceOp(name, io_usage),
-      state_change_cond_(&lock_),
-      state_(state),
-      consumption_(tracker, 500),
-      logs_retained_bytes_(0),
-      perf_improvement_(0),
-      metric_entity_(METRIC_ENTITY_test.Instantiate(&metric_registry_, "test")),
-      maintenance_op_duration_(METRIC_maintenance_op_duration.Instantiate(metric_entity_)),
-      maintenance_ops_running_(METRIC_maintenance_ops_running.Instantiate(metric_entity_, 0)) {
-  }
-
-  virtual ~TestMaintenanceOp() {}
-
-  virtual bool Prepare() OVERRIDE {
-    std::lock_guard<Mutex> guard(lock_);
-    if (state_ != OP_RUNNABLE) {
-      return false;
-    }
-    state_ = OP_RUNNING;
-    state_change_cond_.Broadcast();
-    DLOG(INFO) << "Prepared op " << name();
-    return true;
-  }
-
-  virtual void Perform() OVERRIDE {
-    DLOG(INFO) << "Performing op " << name();
-    std::lock_guard<Mutex> guard(lock_);
-    CHECK_EQ(OP_RUNNING, state_);
-    state_ = OP_FINISHED;
-    state_change_cond_.Broadcast();
-  }
-
-  virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE {
-    std::lock_guard<Mutex> guard(lock_);
-    stats->set_runnable(state_ == OP_RUNNABLE);
-    stats->set_ram_anchored(consumption_.consumption());
-    stats->set_logs_retained_bytes(logs_retained_bytes_);
-    stats->set_perf_improvement(perf_improvement_);
-  }
-
-  void Enable() {
-    std::lock_guard<Mutex> guard(lock_);
-    DCHECK((state_ == OP_DISABLED) || (state_ == OP_FINISHED));
-    state_ = OP_RUNNABLE;
-    state_change_cond_.Broadcast();
-  }
-
-  void WaitForState(TestMaintenanceOpState state) {
-    std::lock_guard<Mutex> guard(lock_);
-    while (true) {
-      if (state_ == state) {
-        return;
-      }
-      state_change_cond_.Wait();
-    }
-  }
-
-  bool WaitForStateWithTimeout(TestMaintenanceOpState state, int ms) {
-    MonoDelta to_wait = MonoDelta::FromMilliseconds(ms);
-    std::lock_guard<Mutex> guard(lock_);
-    while (true) {
-      if (state_ == state) {
-        return true;
-      }
-      if (!state_change_cond_.TimedWait(to_wait)) {
-        return false;
-      }
-    }
-  }
-
-  void set_ram_anchored(uint64_t ram_anchored) {
-    std::lock_guard<Mutex> guard(lock_);
-    consumption_.Reset(ram_anchored);
-  }
-
-  void set_logs_retained_bytes(uint64_t logs_retained_bytes) {
-    std::lock_guard<Mutex> guard(lock_);
-    logs_retained_bytes_ = logs_retained_bytes;
-  }
-
-  void set_perf_improvement(uint64_t perf_improvement) {
-    std::lock_guard<Mutex> guard(lock_);
-    perf_improvement_ = perf_improvement;
-  }
-
-  virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE {
-    return maintenance_op_duration_;
-  }
-
-  virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE {
-    return maintenance_ops_running_;
-  }
-
- private:
-  Mutex lock_;
-  ConditionVariable state_change_cond_;
-  enum TestMaintenanceOpState state_;
-  ScopedTrackedConsumption consumption_;
-  uint64_t logs_retained_bytes_;
-  uint64_t perf_improvement_;
-  MetricRegistry metric_registry_;
-  scoped_refptr<MetricEntity> metric_entity_;
-  scoped_refptr<Histogram> maintenance_op_duration_;
-  scoped_refptr<AtomicGauge<uint32_t> > maintenance_ops_running_;
-};
-
-// Create an op and wait for it to start running.  Unregister it while it is
-// running and verify that UnregisterOp waits for it to finish before
-// proceeding.
-TEST_F(MaintenanceManagerTest, TestRegisterUnregister) {
-  TestMaintenanceOp op1("1", MaintenanceOp::HIGH_IO_USAGE, OP_DISABLED, test_tracker_);
-  op1.set_ram_anchored(1001);
-  manager_->RegisterOp(&op1);
-  scoped_refptr<kudu::Thread> thread;
-  CHECK_OK(Thread::Create("TestThread", "TestRegisterUnregister",
-        boost::bind(&TestMaintenanceOp::Enable, &op1), &thread));
-  op1.WaitForState(OP_FINISHED);
-  manager_->UnregisterOp(&op1);
-  ThreadJoiner(thread.get()).Join();
-}
-
-// Test that we'll run an operation that doesn't improve performance when memory
-// pressure gets high.
-TEST_F(MaintenanceManagerTest, TestMemoryPressure) {
-  TestMaintenanceOp op("op", MaintenanceOp::HIGH_IO_USAGE, OP_RUNNABLE, test_tracker_);
-  op.set_ram_anchored(100);
-  manager_->RegisterOp(&op);
-
-  // At first, we don't want to run this, since there is no perf_improvement.
-  CHECK_EQ(false, op.WaitForStateWithTimeout(OP_FINISHED, 20));
-
-  // set the ram_anchored by the high mem op so high that we'll have to run it.
-  scoped_refptr<kudu::Thread> thread;
-  CHECK_OK(Thread::Create("TestThread", "MaintenanceManagerTest",
-      boost::bind(&TestMaintenanceOp::set_ram_anchored, &op, 1100), &thread));
-  op.WaitForState(OP_FINISHED);
-  manager_->UnregisterOp(&op);
-  ThreadJoiner(thread.get()).Join();
-}
-
-// Test that ops are prioritized correctly when we add log retention.
-TEST_F(MaintenanceManagerTest, TestLogRetentionPrioritization) {
-  manager_->Shutdown();
-
-  TestMaintenanceOp op1("op1", MaintenanceOp::LOW_IO_USAGE, OP_RUNNABLE, test_tracker_);
-  op1.set_ram_anchored(0);
-  op1.set_logs_retained_bytes(100);
-
-  TestMaintenanceOp op2("op2", MaintenanceOp::HIGH_IO_USAGE, OP_RUNNABLE, test_tracker_);
-  op2.set_ram_anchored(100);
-  op2.set_logs_retained_bytes(100);
-
-  TestMaintenanceOp op3("op3", MaintenanceOp::HIGH_IO_USAGE, OP_RUNNABLE, test_tracker_);
-  op3.set_ram_anchored(200);
-  op3.set_logs_retained_bytes(100);
-
-  manager_->RegisterOp(&op1);
-  manager_->RegisterOp(&op2);
-  manager_->RegisterOp(&op3);
-
-  // We want to do the low IO op first since it clears up some log retention.
-  ASSERT_EQ(&op1, manager_->FindBestOp());
-
-  manager_->UnregisterOp(&op1);
-
-  // Low IO is taken care of, now we find the op clears the most log retention and ram.
-  ASSERT_EQ(&op3, manager_->FindBestOp());
-
-  manager_->UnregisterOp(&op3);
-
-  ASSERT_EQ(&op2, manager_->FindBestOp());
-
-  manager_->UnregisterOp(&op2);
-}
-
-// Test adding operations and make sure that the history of recently completed operations
-// is correct in that it wraps around and doesn't grow.
-TEST_F(MaintenanceManagerTest, TestCompletedOpsHistory) {
-  for (int i = 0; i < 5; i++) {
-    string name = Substitute("op$0", i);
-    TestMaintenanceOp op(name, MaintenanceOp::HIGH_IO_USAGE, OP_RUNNABLE, test_tracker_);
-    op.set_perf_improvement(1);
-    op.set_ram_anchored(100);
-    manager_->RegisterOp(&op);
-
-    CHECK_EQ(true, op.WaitForStateWithTimeout(OP_FINISHED, 200));
-    manager_->UnregisterOp(&op);
-
-    MaintenanceManagerStatusPB status_pb;
-    manager_->GetMaintenanceManagerStatusDump(&status_pb);
-    // The size should be at most the history_size.
-    ASSERT_GE(kHistorySize, status_pb.completed_operations_size());
-    // See that we have the right name, even if we wrap around.
-    ASSERT_EQ(name, status_pb.completed_operations(i % 4).name());
-  }
-}
-
-} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/tablet/maintenance_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/maintenance_manager.cc b/src/kudu/tablet/maintenance_manager.cc
deleted file mode 100644
index b611e10..0000000
--- a/src/kudu/tablet/maintenance_manager.cc
+++ /dev/null
@@ -1,419 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "kudu/tablet/maintenance_manager.h"
-
-#include <gflags/gflags.h>
-#include <memory>
-#include <stdint.h>
-#include <string>
-#include <utility>
-
-#include "kudu/gutil/stringprintf.h"
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/util/debug/trace_event.h"
-#include "kudu/util/debug/trace_logging.h"
-#include "kudu/util/flag_tags.h"
-#include "kudu/util/mem_tracker.h"
-#include "kudu/util/metrics.h"
-#include "kudu/util/stopwatch.h"
-#include "kudu/util/thread.h"
-
-using std::pair;
-using std::shared_ptr;
-using strings::Substitute;
-
-DEFINE_int32(maintenance_manager_num_threads, 1,
-       "Size of the maintenance manager thread pool. Beyond a value of '1', one thread is "
-       "reserved for emergency flushes. For spinning disks, the number of threads should "
-       "not be above the number of devices.");
-TAG_FLAG(maintenance_manager_num_threads, stable);
-
-DEFINE_int32(maintenance_manager_polling_interval_ms, 250,
-       "Polling interval for the maintenance manager scheduler, "
-       "in milliseconds.");
-TAG_FLAG(maintenance_manager_polling_interval_ms, hidden);
-
-DEFINE_int32(maintenance_manager_history_size, 8,
-       "Number of completed operations the manager is keeping track of.");
-TAG_FLAG(maintenance_manager_history_size, hidden);
-
-DEFINE_bool(enable_maintenance_manager, true,
-       "Enable the maintenance manager, runs compaction and tablet cleaning tasks.");
-TAG_FLAG(enable_maintenance_manager, unsafe);
-
-namespace kudu {
-
-using kudu::tablet::MaintenanceManagerStatusPB;
-using kudu::tablet::MaintenanceManagerStatusPB_CompletedOpPB;
-using kudu::tablet::MaintenanceManagerStatusPB_MaintenanceOpPB;
-
-MaintenanceOpStats::MaintenanceOpStats() {
-  Clear();
-}
-
-void MaintenanceOpStats::Clear() {
-  valid_ = false;
-  runnable_ = false;
-  ram_anchored_ = 0;
-  logs_retained_bytes_ = 0;
-  perf_improvement_ = 0;
-}
-
-MaintenanceOp::MaintenanceOp(std::string name, IOUsage io_usage)
-    : name_(std::move(name)), running_(0), io_usage_(io_usage) {}
-
-MaintenanceOp::~MaintenanceOp() {
-  CHECK(!manager_.get()) << "You must unregister the " << name_
-         << " Op before destroying it.";
-}
-
-void MaintenanceOp::Unregister() {
-  CHECK(manager_.get()) << "Op " << name_ << " was never registered.";
-  manager_->UnregisterOp(this);
-}
-
-const MaintenanceManager::Options MaintenanceManager::DEFAULT_OPTIONS = {
-  0,
-  0,
-  0,
-  shared_ptr<MemTracker>(),
-};
-
-MaintenanceManager::MaintenanceManager(const Options& options)
-  : num_threads_(options.num_threads <= 0 ?
-      FLAGS_maintenance_manager_num_threads : options.num_threads),
-    cond_(&lock_),
-    shutdown_(false),
-    running_ops_(0),
-    polling_interval_ms_(options.polling_interval_ms <= 0 ?
-          FLAGS_maintenance_manager_polling_interval_ms :
-          options.polling_interval_ms),
-    completed_ops_count_(0),
-    parent_mem_tracker_(!options.parent_mem_tracker ?
-        MemTracker::GetRootTracker() : options.parent_mem_tracker) {
-  CHECK_OK(ThreadPoolBuilder("MaintenanceMgr").set_min_threads(num_threads_)
-               .set_max_threads(num_threads_).Build(&thread_pool_));
-  uint32_t history_size = options.history_size == 0 ?
-                          FLAGS_maintenance_manager_history_size :
-                          options.history_size;
-  completed_ops_.resize(history_size);
-}
-
-MaintenanceManager::~MaintenanceManager() {
-  Shutdown();
-}
-
-Status MaintenanceManager::Init() {
-  RETURN_NOT_OK(Thread::Create("maintenance", "maintenance_scheduler",
-      boost::bind(&MaintenanceManager::RunSchedulerThread, this),
-      &monitor_thread_));
-  return Status::OK();
-}
-
-void MaintenanceManager::Shutdown() {
-  {
-    std::lock_guard<Mutex> guard(lock_);
-    if (shutdown_) {
-      return;
-    }
-    shutdown_ = true;
-    cond_.Broadcast();
-  }
-  if (monitor_thread_.get()) {
-    CHECK_OK(ThreadJoiner(monitor_thread_.get()).Join());
-    monitor_thread_.reset();
-    thread_pool_->Shutdown();
-  }
-}
-
-void MaintenanceManager::RegisterOp(MaintenanceOp* op) {
-  std::lock_guard<Mutex> guard(lock_);
-  CHECK(!op->manager_.get()) << "Tried to register " << op->name()
-          << ", but it was already registered.";
-  pair<OpMapTy::iterator, bool> val
-    (ops_.insert(OpMapTy::value_type(op, MaintenanceOpStats())));
-  CHECK(val.second)
-      << "Tried to register " << op->name()
-      << ", but it already exists in ops_.";
-  op->manager_ = shared_from_this();
-  op->cond_.reset(new ConditionVariable(&lock_));
-  VLOG_AND_TRACE("maintenance", 1) << "Registered " << op->name();
-}
-
-void MaintenanceManager::UnregisterOp(MaintenanceOp* op) {
-  {
-    std::lock_guard<Mutex> guard(lock_);
-    CHECK(op->manager_.get() == this) << "Tried to unregister " << op->name()
-          << ", but it is not currently registered with this maintenance manager.";
-    auto iter = ops_.find(op);
-    CHECK(iter != ops_.end()) << "Tried to unregister " << op->name()
-        << ", but it was never registered";
-    // While the op is running, wait for it to be finished.
-    if (iter->first->running_ > 0) {
-      VLOG_AND_TRACE("maintenance", 1) << "Waiting for op " << op->name() << " to finish so "
-            << "we can unregister it.";
-    }
-    while (iter->first->running_ > 0) {
-      op->cond_->Wait();
-      iter = ops_.find(op);
-      CHECK(iter != ops_.end()) << "Tried to unregister " << op->name()
-          << ", but another thread unregistered it while we were "
-          << "waiting for it to complete";
-    }
-    ops_.erase(iter);
-  }
-  LOG(INFO) << "Unregistered op " << op->name();
-  op->cond_.reset();
-  // Remove the op's shared_ptr reference to us.  This might 'delete this'.
-  op->manager_.reset();
-}
-
-void MaintenanceManager::RunSchedulerThread() {
-  MonoDelta polling_interval = MonoDelta::FromMilliseconds(polling_interval_ms_);
-
-  std::unique_lock<Mutex> guard(lock_);
-  while (true) {
-    // Loop until we are shutting down or it is time to run another op.
-    cond_.TimedWait(polling_interval);
-    if (shutdown_) {
-      VLOG_AND_TRACE("maintenance", 1) << "Shutting down maintenance manager.";
-      return;
-    }
-
-    // Find the best op.
-    MaintenanceOp* op = FindBestOp();
-    if (!op) {
-      VLOG_AND_TRACE("maintenance", 2) << "No maintenance operations look worth doing.";
-      continue;
-    }
-
-    // Prepare the maintenance operation.
-    op->running_++;
-    running_ops_++;
-    guard.unlock();
-    bool ready = op->Prepare();
-    guard.lock();
-    if (!ready) {
-      LOG(INFO) << "Prepare failed for " << op->name()
-                << ".  Re-running scheduler.";
-      op->running_--;
-      op->cond_->Signal();
-      continue;
-    }
-
-    // Run the maintenance operation.
-    Status s = thread_pool_->SubmitFunc(boost::bind(
-          &MaintenanceManager::LaunchOp, this, op));
-    CHECK(s.ok());
-  }
-}
-
-// Finding the best operation goes through four filters:
-// - If there's an Op that we can run quickly that frees log retention, we run it.
-// - If we've hit the overall process memory limit (note: this includes memory that the Ops cannot
-//   free), we run the Op with the highest RAM usage.
-// - If there are Ops that retain logs, we run the one that has the highest retention (and if many
-//   qualify, then we run the one that also frees up the most RAM).
-// - Finally, if there's nothing else that we really need to do, we run the Op that will improve
-//   performance the most.
-//
-// The reason it's done this way is that we want to prioritize limiting the amount of resources we
-// hold on to. Low IO Ops go first since we can quickly run them, then we can look at memory usage.
-// Reversing those can starve the low IO Ops when the system is under intense memory pressure.
-//
-// In the third priority we're at a point where nothing's urgent and there's nothing we can run
-// quickly.
-// TODO We currently optimize for freeing log retention but we could consider having some sort of
-// sliding priority between log retention and RAM usage. For example, is an Op that frees
-// 128MB of log retention and 12MB of RAM always better than an op that frees 12MB of log retention
-// and 128MB of RAM? Maybe a more holistic approach would be better.
-MaintenanceOp* MaintenanceManager::FindBestOp() {
-  TRACE_EVENT0("maintenance", "MaintenanceManager::FindBestOp");
-
-  if (!FLAGS_enable_maintenance_manager) {
-    VLOG_AND_TRACE("maintenance", 1) << "Maintenance manager is disabled. Doing nothing";
-    return nullptr;
-  }
-  size_t free_threads = num_threads_ - running_ops_;
-  if (free_threads == 0) {
-    VLOG_AND_TRACE("maintenance", 1) << "there are no free threads, so we can't run anything.";
-    return nullptr;
-  }
-
-  int64_t low_io_most_logs_retained_bytes = 0;
-  MaintenanceOp* low_io_most_logs_retained_bytes_op = nullptr;
-
-  uint64_t most_mem_anchored = 0;
-  MaintenanceOp* most_mem_anchored_op = nullptr;
-
-  int64_t most_logs_retained_bytes = 0;
-  int64_t most_logs_retained_bytes_ram_anchored = 0;
-  MaintenanceOp* most_logs_retained_bytes_op = nullptr;
-
-  double best_perf_improvement = 0;
-  MaintenanceOp* best_perf_improvement_op = nullptr;
-  for (OpMapTy::value_type &val : ops_) {
-    MaintenanceOp* op(val.first);
-    MaintenanceOpStats& stats(val.second);
-    // Update op stats.
-    stats.Clear();
-    op->UpdateStats(&stats);
-    if (!stats.valid() || !stats.runnable()) {
-      continue;
-    }
-    if (stats.logs_retained_bytes() > low_io_most_logs_retained_bytes &&
-        op->io_usage_ == MaintenanceOp::LOW_IO_USAGE) {
-      low_io_most_logs_retained_bytes_op = op;
-      low_io_most_logs_retained_bytes = stats.logs_retained_bytes();
-    }
-
-    if (stats.ram_anchored() > most_mem_anchored) {
-      most_mem_anchored_op = op;
-      most_mem_anchored = stats.ram_anchored();
-    }
-    // We prioritize ops that can free more logs, but when it's the same we pick the one that
-    // also frees up the most memory.
-    if (stats.logs_retained_bytes() > 0 &&
-        (stats.logs_retained_bytes() > most_logs_retained_bytes ||
-            (stats.logs_retained_bytes() == most_logs_retained_bytes &&
-                stats.ram_anchored() > most_logs_retained_bytes_ram_anchored))) {
-      most_logs_retained_bytes_op = op;
-      most_logs_retained_bytes = stats.logs_retained_bytes();
-      most_logs_retained_bytes_ram_anchored = stats.ram_anchored();
-    }
-    if ((!best_perf_improvement_op) ||
-        (stats.perf_improvement() > best_perf_improvement)) {
-      best_perf_improvement_op = op;
-      best_perf_improvement = stats.perf_improvement();
-    }
-  }
-
-  // Look at ops that we can run quickly that free up log retention.
-  if (low_io_most_logs_retained_bytes_op) {
-    if (low_io_most_logs_retained_bytes > 0) {
-      VLOG_AND_TRACE("maintenance", 1)
-                    << "Performing " << low_io_most_logs_retained_bytes_op->name() << ", "
-                    << "because it can free up more logs "
-                    << "at " << low_io_most_logs_retained_bytes
-                    << " bytes with a low IO cost";
-      return low_io_most_logs_retained_bytes_op;
-    }
-  }
-
-  // Look at free memory. If it is dangerously low, we must select something
-  // that frees memory-- the op with the most anchored memory.
-  double capacity_pct;
-  if (parent_mem_tracker_->AnySoftLimitExceeded(&capacity_pct)) {
-    if (!most_mem_anchored_op) {
-      string msg = StringPrintf("we have exceeded our soft memory limit "
-          "(current capacity is %.2f%%).  However, there are no ops currently "
-          "runnable which would free memory.", capacity_pct);
-      LOG(INFO) << msg;
-      return nullptr;
-    }
-    VLOG_AND_TRACE("maintenance", 1) << "we have exceeded our soft memory limit "
-            << "(current capacity is " << capacity_pct << "%).  Running the op "
-            << "which anchors the most memory: " << most_mem_anchored_op->name();
-    return most_mem_anchored_op;
-  }
-
-  if (most_logs_retained_bytes_op) {
-    VLOG_AND_TRACE("maintenance", 1)
-            << "Performing " << most_logs_retained_bytes_op->name() << ", "
-            << "because it can free up more logs " << "at " << most_logs_retained_bytes
-            << " bytes";
-    return most_logs_retained_bytes_op;
-  }
-
-  if (best_perf_improvement_op) {
-    if (best_perf_improvement > 0) {
-      VLOG_AND_TRACE("maintenance", 1) << "Performing " << best_perf_improvement_op->name() << ", "
-                 << "because it had the best perf_improvement score, "
-                 << "at " << best_perf_improvement;
-      return best_perf_improvement_op;
-    }
-  }
-  return nullptr;
-}
-
-void MaintenanceManager::LaunchOp(MaintenanceOp* op) {
-  MonoTime start_time(MonoTime::Now(MonoTime::FINE));
-  op->RunningGauge()->Increment();
-  LOG_TIMING(INFO, Substitute("running $0", op->name())) {
-    TRACE_EVENT1("maintenance", "MaintenanceManager::LaunchOp",
-                 "name", op->name());
-    op->Perform();
-  }
-  op->RunningGauge()->Decrement();
-  MonoTime end_time(MonoTime::Now(MonoTime::FINE));
-  MonoDelta delta(end_time.GetDeltaSince(start_time));
-  std::lock_guard<Mutex> guard(lock_);
-
-  CompletedOp& completed_op = completed_ops_[completed_ops_count_ % completed_ops_.size()];
-  completed_op.name = op->name();
-  completed_op.duration = delta;
-  completed_op.start_mono_time = start_time;
-  completed_ops_count_++;
-
-  op->DurationHistogram()->Increment(delta.ToMilliseconds());
-
-  running_ops_--;
-  op->running_--;
-  op->cond_->Signal();
-}
-
-void MaintenanceManager::GetMaintenanceManagerStatusDump(MaintenanceManagerStatusPB* out_pb) {
-  DCHECK(out_pb != nullptr);
-  std::lock_guard<Mutex> guard(lock_);
-  MaintenanceOp* best_op = FindBestOp();
-  for (MaintenanceManager::OpMapTy::value_type& val : ops_) {
-    MaintenanceManagerStatusPB_MaintenanceOpPB* op_pb = out_pb->add_registered_operations();
-    MaintenanceOp* op(val.first);
-    MaintenanceOpStats& stat(val.second);
-    op_pb->set_name(op->name());
-    op_pb->set_running(op->running());
-    if (stat.valid()) {
-      op_pb->set_runnable(stat.runnable());
-      op_pb->set_ram_anchored_bytes(stat.ram_anchored());
-      op_pb->set_logs_retained_bytes(stat.logs_retained_bytes());
-      op_pb->set_perf_improvement(stat.perf_improvement());
-    } else {
-      op_pb->set_runnable(false);
-      op_pb->set_ram_anchored_bytes(0);
-      op_pb->set_logs_retained_bytes(0);
-      op_pb->set_perf_improvement(0.0);
-    }
-
-    if (best_op == op) {
-      out_pb->mutable_best_op()->CopyFrom(*op_pb);
-    }
-  }
-
-  for (const CompletedOp& completed_op : completed_ops_) {
-    if (!completed_op.name.empty()) {
-      MaintenanceManagerStatusPB_CompletedOpPB* completed_pb = out_pb->add_completed_operations();
-      completed_pb->set_name(completed_op.name);
-      completed_pb->set_duration_millis(completed_op.duration.ToMilliseconds());
-
-      MonoDelta delta(MonoTime::Now(MonoTime::FINE).GetDeltaSince(completed_op.start_mono_time));
-      completed_pb->set_secs_since_start(delta.ToSeconds());
-    }
-  }
-}
-
-} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/tablet/maintenance_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/maintenance_manager.h b/src/kudu/tablet/maintenance_manager.h
deleted file mode 100644
index 1d1a609..0000000
--- a/src/kudu/tablet/maintenance_manager.h
+++ /dev/null
@@ -1,280 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#ifndef KUDU_TABLET_MAINTENANCE_MANAGER_H
-#define KUDU_TABLET_MAINTENANCE_MANAGER_H
-
-#include <stdint.h>
-
-#include <map>
-#include <memory>
-#include <set>
-#include <string>
-#include <vector>
-
-#include "kudu/gutil/macros.h"
-#include "kudu/tablet/mvcc.h"
-#include "kudu/tablet/tablet.pb.h"
-#include "kudu/util/condition_variable.h"
-#include "kudu/util/monotime.h"
-#include "kudu/util/mutex.h"
-#include "kudu/util/countdown_latch.h"
-#include "kudu/util/thread.h"
-#include "kudu/util/threadpool.h"
-
-namespace kudu {
-
-template<class T>
-class AtomicGauge;
-class Histogram;
-class MaintenanceManager;
-class MemTracker;
-
-class MaintenanceOpStats {
- public:
-  MaintenanceOpStats();
-
-  // Zero all stats. They are invalid until the first setter is called.
-  void Clear();
-
-  bool runnable() const {
-    DCHECK(valid_);
-    return runnable_;
-  }
-
-  void set_runnable(bool runnable) {
-    UpdateLastModified();
-    runnable_ = runnable;
-  }
-
-  uint64_t ram_anchored() const {
-    DCHECK(valid_);
-    return ram_anchored_;
-  }
-
-  void set_ram_anchored(uint64_t ram_anchored) {
-    UpdateLastModified();
-    ram_anchored_ = ram_anchored;
-  }
-
-  int64_t logs_retained_bytes() const {
-    DCHECK(valid_);
-    return logs_retained_bytes_;
-  }
-
-  void set_logs_retained_bytes(int64_t logs_retained_bytes) {
-    UpdateLastModified();
-    logs_retained_bytes_ = logs_retained_bytes;
-  }
-
-  double perf_improvement() const {
-    DCHECK(valid_);
-    return perf_improvement_;
-  }
-
-  void set_perf_improvement(double perf_improvement) {
-    UpdateLastModified();
-    perf_improvement_ = perf_improvement;
-  }
-
-  const MonoTime& last_modified() const {
-    DCHECK(valid_);
-    return last_modified_;
-  }
-
-  bool valid() const {
-    return valid_;
-  }
-
- private:
-  void UpdateLastModified() {
-    valid_ = true;
-    last_modified_ = MonoTime::Now(MonoTime::FINE);
-  }
-
-  // True if these stats are valid.
-  bool valid_;
-
-  // True if this op can be run now.
-  bool runnable_;
-
-  // The approximate amount of memory that not doing this operation keeps
-  // around.  This number is used to decide when to start freeing memory, so it
-  // should be fairly accurate.  May be 0.
-  uint64_t ram_anchored_;
-
-  // The approximate amount of disk space that not doing this operation keeps us from GCing from
-  // the logs. May be 0.
-  int64_t logs_retained_bytes_;
-
-  // The estimated performance improvement-- how good it is to do this on some
-  // absolute scale (yet TBD).
-  double perf_improvement_;
-
-  // The last time that the stats were modified.
-  MonoTime last_modified_;
-};
-
-// MaintenanceOp objects represent background operations that the
-// MaintenanceManager can schedule.  Once a MaintenanceOp is registered, the
-// manager will periodically poll it for statistics.  The registrant is
-// responsible for managing the memory associated with the MaintenanceOp object.
-// Op objects should be unregistered before being de-allocated.
-class MaintenanceOp {
- public:
-  friend class MaintenanceManager;
-
-  // General indicator of how much IO the Op will use.
-  enum IOUsage {
-    LOW_IO_USAGE, // Low impact operations like removing a file, updating metadata.
-    HIGH_IO_USAGE // Everything else.
-  };
-
-  explicit MaintenanceOp(std::string name, IOUsage io_usage);
-  virtual ~MaintenanceOp();
-
-  // Unregister this op, if it is currently registered.
-  void Unregister();
-
-  // Update the op statistics.  This will be called every scheduling period
-  // (about a few times a second), so it should not be too expensive.  It's
-  // possible for the returned statistics to be invalid; the caller should
-  // call MaintenanceOpStats::valid() before using them.  This will be run
-  // under the MaintenanceManager lock.
-  virtual void UpdateStats(MaintenanceOpStats* stats) = 0;
-
-  // Prepare to perform the operation.  This will be run without holding the
-  // maintenance manager lock.  It should be short, since it is run from the
-  // context of the maintenance op scheduler thread rather than a worker thread.
-  // If this returns false, we will abort the operation.
-  virtual bool Prepare() = 0;
-
-  // Perform the operation.  This will be run without holding the maintenance
-  // manager lock, and may take a long time.
-  virtual void Perform() = 0;
-
-  // Returns the histogram for this op that tracks duration. Cannot be NULL.
-  virtual scoped_refptr<Histogram> DurationHistogram() const = 0;
-
-  // Returns the gauge for this op that tracks when this op is running. Cannot be NULL.
-  virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const = 0;
-
-  uint32_t running() { return running_; }
-
-  std::string name() const { return name_; }
-
-  IOUsage io_usage() const { return io_usage_; }
-
- private:
-  DISALLOW_COPY_AND_ASSIGN(MaintenanceOp);
-
-  // The name of the operation.  Op names must be unique.
-  const std::string name_;
-
-  // The number of times that this op is currently running.
-  uint32_t running_;
-
-  // Condition variable which the UnregisterOp function can wait on.
-  //
-  // Note: 'cond_' is used with the MaintenanceManager's mutex. As such,
-  // it only exists when the op is registered.
-  gscoped_ptr<ConditionVariable> cond_;
-
-  // The MaintenanceManager with which this op is registered, or null
-  // if it is not registered.
-  std::shared_ptr<MaintenanceManager> manager_;
-
-  IOUsage io_usage_;
-};
-
-struct MaintenanceOpComparator {
-  bool operator() (const MaintenanceOp* lhs,
-                   const MaintenanceOp* rhs) const {
-    return lhs->name().compare(rhs->name()) < 0;
-  }
-};
-
-// Holds the information regarding a recently completed operation.
-struct CompletedOp {
-  std::string name;
-  MonoDelta duration;
-  MonoTime start_mono_time;
-};
-
-// The MaintenanceManager manages the scheduling of background operations such
-// as flushes or compactions.  It runs these operations in the background, in a
-// thread pool.  It uses information provided in MaintenanceOpStats objects to
-// decide which operations, if any, to run.
-class MaintenanceManager : public std::enable_shared_from_this<MaintenanceManager> {
- public:
-  struct Options {
-    int32_t num_threads;
-    int32_t polling_interval_ms;
-    uint32_t history_size;
-    std::shared_ptr<MemTracker> parent_mem_tracker;
-  };
-
-  explicit MaintenanceManager(const Options& options);
-  ~MaintenanceManager();
-
-  Status Init();
-  void Shutdown();
-
-  // Register an op with the manager.
-  void RegisterOp(MaintenanceOp* op);
-
-  // Unregister an op with the manager.
-  // If the Op is currently running, it will not be interrupted.  However, this
-  // function will block until the Op is finished.
-  void UnregisterOp(MaintenanceOp* op);
-
-  void GetMaintenanceManagerStatusDump(tablet::MaintenanceManagerStatusPB* out_pb);
-
-  static const Options DEFAULT_OPTIONS;
-
- private:
-  FRIEND_TEST(MaintenanceManagerTest, TestLogRetentionPrioritization);
-  typedef std::map<MaintenanceOp*, MaintenanceOpStats,
-          MaintenanceOpComparator> OpMapTy;
-
-  void RunSchedulerThread();
-
-  // find the best op, or null if there is nothing we want to run
-  MaintenanceOp* FindBestOp();
-
-  void LaunchOp(MaintenanceOp* op);
-
-  const int32_t num_threads_;
-  OpMapTy ops_; // registered operations
-  Mutex lock_;
-  scoped_refptr<kudu::Thread> monitor_thread_;
-  gscoped_ptr<ThreadPool> thread_pool_;
-  ConditionVariable cond_;
-  bool shutdown_;
-  uint64_t running_ops_;
-  int32_t polling_interval_ms_;
-  // Vector used as a circular buffer for recently completed ops. Elements need to be added at
-  // the completed_ops_count_ % the vector's size and then the count needs to be incremented.
-  std::vector<CompletedOp> completed_ops_;
-  int64_t completed_ops_count_;
-  std::shared_ptr<MemTracker> parent_mem_tracker_;
-
-  DISALLOW_COPY_AND_ASSIGN(MaintenanceManager);
-};
-
-} // namespace kudu
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/tablet/tablet.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index ff63aef..383e739 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -45,7 +45,6 @@
 #include "kudu/tablet/compaction_policy.h"
 #include "kudu/tablet/delta_compaction.h"
 #include "kudu/tablet/diskrowset.h"
-#include "kudu/tablet/maintenance_manager.h"
 #include "kudu/tablet/row_op.h"
 #include "kudu/tablet/rowset_info.h"
 #include "kudu/tablet/rowset_tree.h"
@@ -61,6 +60,7 @@
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/logging.h"
+#include "kudu/util/maintenance_manager.h"
 #include "kudu/util/mem_tracker.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/stopwatch.h"

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/tablet/tablet.proto
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet.proto b/src/kudu/tablet/tablet.proto
index ae02e35..18ebe50 100644
--- a/src/kudu/tablet/tablet.proto
+++ b/src/kudu/tablet/tablet.proto
@@ -96,32 +96,3 @@ message TabletStatusPB {
   optional PartitionPB partition = 9;
   optional int64 estimated_on_disk_size = 7;
 }
-
-// Used to present the maintenance manager's internal state.
-message MaintenanceManagerStatusPB {
-  message MaintenanceOpPB {
-    required string name = 1;
-    // Number of times this operation is currently running.
-    required uint32 running = 2;
-    required bool runnable = 3;
-    required uint64 ram_anchored_bytes = 4;
-    required int64 logs_retained_bytes = 5;
-    required double perf_improvement = 6;
-  }
-
-  message CompletedOpPB {
-    required string name = 1;
-    required int32 duration_millis = 2;
-    // Number of seconds since this operation started.
-    required int32 secs_since_start = 3;
-  }
-
-  // The next operation that would run.
-  optional MaintenanceOpPB best_op = 1;
-
-  // List of all the operations.
-  repeated MaintenanceOpPB registered_operations = 2;
-
-  // This list isn't in order of anything. Can contain the same operation mutiple times.
-  repeated CompletedOpPB completed_operations = 3;
-}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/tablet/tablet_mm_ops.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_mm_ops.h b/src/kudu/tablet/tablet_mm_ops.h
index cecf443..f4ee1c3 100644
--- a/src/kudu/tablet/tablet_mm_ops.h
+++ b/src/kudu/tablet/tablet_mm_ops.h
@@ -18,7 +18,7 @@
 #ifndef KUDU_TABLET_TABLET_MM_OPS_H_
 #define KUDU_TABLET_TABLET_MM_OPS_H_
 
-#include "kudu/tablet/maintenance_manager.h"
+#include "kudu/util/maintenance_manager.h"
 
 namespace kudu {
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/tablet/tablet_peer-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_peer-test.cc b/src/kudu/tablet/tablet_peer-test.cc
index ca43117..b1c03ef 100644
--- a/src/kudu/tablet/tablet_peer-test.cc
+++ b/src/kudu/tablet/tablet_peer-test.cc
@@ -32,7 +32,6 @@
 #include "kudu/rpc/messenger.h"
 #include "kudu/server/clock.h"
 #include "kudu/server/logical_clock.h"
-#include "kudu/tablet/maintenance_manager.h"
 #include "kudu/tablet/transactions/transaction.h"
 #include "kudu/tablet/transactions/transaction_driver.h"
 #include "kudu/tablet/transactions/write_transaction.h"
@@ -40,6 +39,7 @@
 #include "kudu/tablet/tablet_peer_mm_ops.h"
 #include "kudu/tablet/tablet-test-util.h"
 #include "kudu/tserver/tserver.pb.h"
+#include "kudu/util/maintenance_manager.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/test_util.h"
 #include "kudu/util/test_macros.h"

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/tablet/tablet_peer_mm_ops.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_peer_mm_ops.cc b/src/kudu/tablet/tablet_peer_mm_ops.cc
index 3520d1b..d116cda 100644
--- a/src/kudu/tablet/tablet_peer_mm_ops.cc
+++ b/src/kudu/tablet/tablet_peer_mm_ops.cc
@@ -24,9 +24,9 @@
 #include <string>
 
 #include "kudu/gutil/strings/substitute.h"
-#include "kudu/tablet/maintenance_manager.h"
 #include "kudu/tablet/tablet_metrics.h"
 #include "kudu/util/flag_tags.h"
+#include "kudu/util/maintenance_manager.h"
 #include "kudu/util/metrics.h"
 
 DEFINE_int32(flush_threshold_mb, 1024,

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/tablet/tablet_peer_mm_ops.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_peer_mm_ops.h b/src/kudu/tablet/tablet_peer_mm_ops.h
index a4b475d..a985802 100644
--- a/src/kudu/tablet/tablet_peer_mm_ops.h
+++ b/src/kudu/tablet/tablet_peer_mm_ops.h
@@ -18,8 +18,8 @@
 #ifndef KUDU_TABLET_TABLET_PEER_MM_OPS_H_
 #define KUDU_TABLET_TABLET_PEER_MM_OPS_H_
 
-#include "kudu/tablet/maintenance_manager.h"
 #include "kudu/tablet/tablet_peer.h"
+#include "kudu/util/maintenance_manager.h"
 #include "kudu/util/stopwatch.h"
 
 namespace kudu {

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/tserver/mini_tablet_server.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/mini_tablet_server.cc b/src/kudu/tserver/mini_tablet_server.cc
index 380946d..92f1ad2 100644
--- a/src/kudu/tserver/mini_tablet_server.cc
+++ b/src/kudu/tserver/mini_tablet_server.cc
@@ -31,12 +31,12 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/server/rpc_server.h"
 #include "kudu/server/webserver.h"
-#include "kudu/tablet/maintenance_manager.h"
 #include "kudu/tablet/tablet.h"
 #include "kudu/tablet/tablet_peer.h"
 #include "kudu/tablet/tablet-test-util.h"
 #include "kudu/tserver/tablet_server.h"
 #include "kudu/tserver/ts_tablet_manager.h"
+#include "kudu/util/maintenance_manager.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/status.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/tserver/tablet_server-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server-test-base.h b/src/kudu/tserver/tablet_server-test-base.h
index 1d5022e..3bcf34a 100644
--- a/src/kudu/tserver/tablet_server-test-base.h
+++ b/src/kudu/tserver/tablet_server-test-base.h
@@ -40,7 +40,6 @@
 #include "kudu/rpc/messenger.h"
 #include "kudu/server/server_base.proxy.h"
 #include "kudu/tablet/local_tablet_writer.h"
-#include "kudu/tablet/maintenance_manager.h"
 #include "kudu/tablet/tablet.h"
 #include "kudu/tablet/tablet_peer.h"
 #include "kudu/tserver/mini_tablet_server.h"
@@ -51,6 +50,7 @@
 #include "kudu/tserver/tserver_admin.proxy.h"
 #include "kudu/tserver/tserver_service.proxy.h"
 #include "kudu/tserver/ts_tablet_manager.h"
+#include "kudu/util/maintenance_manager.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/test_graph.h"
 #include "kudu/util/test_util.h"

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/tserver/tablet_server.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server.cc b/src/kudu/tserver/tablet_server.cc
index ea3faad..d933491 100644
--- a/src/kudu/tserver/tablet_server.cc
+++ b/src/kudu/tserver/tablet_server.cc
@@ -27,13 +27,13 @@
 #include "kudu/rpc/service_if.h"
 #include "kudu/server/rpc_server.h"
 #include "kudu/server/webserver.h"
-#include "kudu/tablet/maintenance_manager.h"
 #include "kudu/tserver/heartbeater.h"
 #include "kudu/tserver/scanners.h"
 #include "kudu/tserver/tablet_service.h"
 #include "kudu/tserver/ts_tablet_manager.h"
 #include "kudu/tserver/tserver-path-handlers.h"
 #include "kudu/tserver/remote_bootstrap_service.h"
+#include "kudu/util/maintenance_manager.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/status.h"

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/tserver/tserver-path-handlers.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tserver-path-handlers.cc b/src/kudu/tserver/tserver-path-handlers.cc
index 67afcb6..6cad63f 100644
--- a/src/kudu/tserver/tserver-path-handlers.cc
+++ b/src/kudu/tserver/tserver-path-handlers.cc
@@ -32,13 +32,13 @@
 #include "kudu/gutil/strings/numbers.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/server/webui_util.h"
-#include "kudu/tablet/maintenance_manager.h"
 #include "kudu/tablet/tablet.pb.h"
 #include "kudu/tablet/tablet_bootstrap.h"
 #include "kudu/tablet/tablet_peer.h"
 #include "kudu/tserver/scanners.h"
 #include "kudu/tserver/tablet_server.h"
 #include "kudu/tserver/ts_tablet_manager.h"
+#include "kudu/util/maintenance_manager.h"
 #include "kudu/util/url-coding.h"
 
 using kudu::consensus::GetConsensusRole;
@@ -46,9 +46,9 @@ using kudu::consensus::CONSENSUS_CONFIG_COMMITTED;
 using kudu::consensus::ConsensusStatePB;
 using kudu::consensus::RaftPeerPB;
 using kudu::consensus::TransactionStatusPB;
-using kudu::tablet::MaintenanceManagerStatusPB;
-using kudu::tablet::MaintenanceManagerStatusPB_CompletedOpPB;
-using kudu::tablet::MaintenanceManagerStatusPB_MaintenanceOpPB;
+using kudu::MaintenanceManagerStatusPB;
+using kudu::MaintenanceManagerStatusPB_CompletedOpPB;
+using kudu::MaintenanceManagerStatusPB_MaintenanceOpPB;
 using kudu::tablet::Tablet;
 using kudu::tablet::TabletPeer;
 using kudu::tablet::TabletStatusPB;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt
index 01f9b1a..a924f38 100644
--- a/src/kudu/util/CMakeLists.txt
+++ b/src/kudu/util/CMakeLists.txt
@@ -30,6 +30,20 @@ ADD_EXPORTABLE_LIBRARY(histogram_proto
   NONLINK_DEPS ${HISTOGRAM_PROTO_TGTS})
 
 #######################################
+# maintenance_manager_proto
+#######################################
+
+PROTOBUF_GENERATE_CPP(
+    MAINTENANCE_MANAGER_PROTO_SRCS MAINTENANCE_MANAGER_PROTO_HDRS MAINTENANCE_MANAGER_PROTO_TGTS
+    SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
+    BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
+    PROTO_FILES maintenance_manager.proto)
+ADD_EXPORTABLE_LIBRARY(maintenance_manager_proto
+    SRCS ${MAINTENANCE_MANAGER_PROTO_SRCS}
+    DEPS protobuf
+    NONLINK_DEPS ${MAINTENANCE_MANAGER_PROTO_TGTS})
+
+#######################################
 # pb_util_proto
 #######################################
 
@@ -118,6 +132,7 @@ set(UTIL_SRCS
   kernel_stack_watchdog.cc
   locks.cc
   logging.cc
+  maintenance_manager.cc
   malloc.cc
   memcmpable_varint.cc
   memory/arena.cc
@@ -181,6 +196,7 @@ set(UTIL_LIBS
   glog
   gutil
   histogram_proto
+  maintenance_manager_proto
   pb_util_proto
   protobuf
   version_info_proto
@@ -279,6 +295,7 @@ ADD_KUDU_TEST(interval_tree-test)
 ADD_KUDU_TEST(jsonreader-test)
 ADD_KUDU_TEST(knapsack_solver-test)
 ADD_KUDU_TEST(logging-test)
+ADD_KUDU_TEST(maintenance_manager-test)
 ADD_KUDU_TEST(map-util-test)
 ADD_KUDU_TEST(mem_tracker-test)
 ADD_KUDU_TEST(memcmpable_varint-test LABELS no_tsan)

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/util/maintenance_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/maintenance_manager-test.cc b/src/kudu/util/maintenance_manager-test.cc
new file mode 100644
index 0000000..87023dc
--- /dev/null
+++ b/src/kudu/util/maintenance_manager-test.cc
@@ -0,0 +1,286 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gflags/gflags.h>
+#include <gtest/gtest.h>
+#include <memory>
+#include <mutex>
+#include <vector>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/tablet/tablet.pb.h"
+#include "kudu/util/maintenance_manager.h"
+#include "kudu/util/mem_tracker.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/thread.h"
+
+using std::shared_ptr;
+using std::vector;
+using strings::Substitute;
+
+METRIC_DEFINE_entity(test);
+METRIC_DEFINE_gauge_uint32(test, maintenance_ops_running,
+                           "Number of Maintenance Operations Running",
+                           kudu::MetricUnit::kMaintenanceOperations,
+                           "The number of background maintenance operations currently running.");
+METRIC_DEFINE_histogram(test, maintenance_op_duration,
+                        "Maintenance Operation Duration",
+                        kudu::MetricUnit::kSeconds, "", 60000000LU, 2);
+
+namespace kudu {
+
+const int kHistorySize = 4;
+
+class MaintenanceManagerTest : public KuduTest {
+ public:
+  MaintenanceManagerTest() {
+    test_tracker_ = MemTracker::CreateTracker(1000, "test");
+    MaintenanceManager::Options options;
+    options.num_threads = 2;
+    options.polling_interval_ms = 1;
+    options.history_size = kHistorySize;
+    options.parent_mem_tracker = test_tracker_;
+    manager_.reset(new MaintenanceManager(options));
+    manager_->Init();
+  }
+  ~MaintenanceManagerTest() {
+    manager_->Shutdown();
+  }
+
+ protected:
+  shared_ptr<MemTracker> test_tracker_;
+  shared_ptr<MaintenanceManager> manager_;
+};
+
+// Just create the MaintenanceManager and then shut it down, to make sure
+// there are no race conditions there.
+TEST_F(MaintenanceManagerTest, TestCreateAndShutdown) {
+}
+
+enum TestMaintenanceOpState {
+  OP_DISABLED,
+  OP_RUNNABLE,
+  OP_RUNNING,
+  OP_FINISHED,
+};
+
+class TestMaintenanceOp : public MaintenanceOp {
+ public:
+  TestMaintenanceOp(const std::string& name,
+                    IOUsage io_usage,
+                    TestMaintenanceOpState state,
+                    const shared_ptr<MemTracker>& tracker)
+    : MaintenanceOp(name, io_usage),
+      state_change_cond_(&lock_),
+      state_(state),
+      consumption_(tracker, 500),
+      logs_retained_bytes_(0),
+      perf_improvement_(0),
+      metric_entity_(METRIC_ENTITY_test.Instantiate(&metric_registry_, "test")),
+      maintenance_op_duration_(METRIC_maintenance_op_duration.Instantiate(metric_entity_)),
+      maintenance_ops_running_(METRIC_maintenance_ops_running.Instantiate(metric_entity_, 0)) {
+  }
+
+  virtual ~TestMaintenanceOp() {}
+
+  virtual bool Prepare() OVERRIDE {
+    std::lock_guard<Mutex> guard(lock_);
+    if (state_ != OP_RUNNABLE) {
+      return false;
+    }
+    state_ = OP_RUNNING;
+    state_change_cond_.Broadcast();
+    DLOG(INFO) << "Prepared op " << name();
+    return true;
+  }
+
+  virtual void Perform() OVERRIDE {
+    DLOG(INFO) << "Performing op " << name();
+    std::lock_guard<Mutex> guard(lock_);
+    CHECK_EQ(OP_RUNNING, state_);
+    state_ = OP_FINISHED;
+    state_change_cond_.Broadcast();
+  }
+
+  virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE {
+    std::lock_guard<Mutex> guard(lock_);
+    stats->set_runnable(state_ == OP_RUNNABLE);
+    stats->set_ram_anchored(consumption_.consumption());
+    stats->set_logs_retained_bytes(logs_retained_bytes_);
+    stats->set_perf_improvement(perf_improvement_);
+  }
+
+  void Enable() {
+    std::lock_guard<Mutex> guard(lock_);
+    DCHECK((state_ == OP_DISABLED) || (state_ == OP_FINISHED));
+    state_ = OP_RUNNABLE;
+    state_change_cond_.Broadcast();
+  }
+
+  void WaitForState(TestMaintenanceOpState state) {
+    std::lock_guard<Mutex> guard(lock_);
+    while (true) {
+      if (state_ == state) {
+        return;
+      }
+      state_change_cond_.Wait();
+    }
+  }
+
+  bool WaitForStateWithTimeout(TestMaintenanceOpState state, int ms) {
+    MonoDelta to_wait = MonoDelta::FromMilliseconds(ms);
+    std::lock_guard<Mutex> guard(lock_);
+    while (true) {
+      if (state_ == state) {
+        return true;
+      }
+      if (!state_change_cond_.TimedWait(to_wait)) {
+        return false;
+      }
+    }
+  }
+
+  void set_ram_anchored(uint64_t ram_anchored) {
+    std::lock_guard<Mutex> guard(lock_);
+    consumption_.Reset(ram_anchored);
+  }
+
+  void set_logs_retained_bytes(uint64_t logs_retained_bytes) {
+    std::lock_guard<Mutex> guard(lock_);
+    logs_retained_bytes_ = logs_retained_bytes;
+  }
+
+  void set_perf_improvement(uint64_t perf_improvement) {
+    std::lock_guard<Mutex> guard(lock_);
+    perf_improvement_ = perf_improvement;
+  }
+
+  virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE {
+    return maintenance_op_duration_;
+  }
+
+  virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE {
+    return maintenance_ops_running_;
+  }
+
+ private:
+  Mutex lock_;
+  ConditionVariable state_change_cond_;
+  enum TestMaintenanceOpState state_;
+  ScopedTrackedConsumption consumption_;
+  uint64_t logs_retained_bytes_;
+  uint64_t perf_improvement_;
+  MetricRegistry metric_registry_;
+  scoped_refptr<MetricEntity> metric_entity_;
+  scoped_refptr<Histogram> maintenance_op_duration_;
+  scoped_refptr<AtomicGauge<uint32_t> > maintenance_ops_running_;
+};
+
+// Create an op and wait for it to start running.  Unregister it while it is
+// running and verify that UnregisterOp waits for it to finish before
+// proceeding.
+TEST_F(MaintenanceManagerTest, TestRegisterUnregister) {
+  TestMaintenanceOp op1("1", MaintenanceOp::HIGH_IO_USAGE, OP_DISABLED, test_tracker_);
+  op1.set_ram_anchored(1001);
+  manager_->RegisterOp(&op1);
+  scoped_refptr<kudu::Thread> thread;
+  CHECK_OK(Thread::Create("TestThread", "TestRegisterUnregister",
+        boost::bind(&TestMaintenanceOp::Enable, &op1), &thread));
+  op1.WaitForState(OP_FINISHED);
+  manager_->UnregisterOp(&op1);
+  ThreadJoiner(thread.get()).Join();
+}
+
+// Test that we'll run an operation that doesn't improve performance when memory
+// pressure gets high.
+TEST_F(MaintenanceManagerTest, TestMemoryPressure) {
+  TestMaintenanceOp op("op", MaintenanceOp::HIGH_IO_USAGE, OP_RUNNABLE, test_tracker_);
+  op.set_ram_anchored(100);
+  manager_->RegisterOp(&op);
+
+  // At first, we don't want to run this, since there is no perf_improvement.
+  CHECK_EQ(false, op.WaitForStateWithTimeout(OP_FINISHED, 20));
+
+  // set the ram_anchored by the high mem op so high that we'll have to run it.
+  scoped_refptr<kudu::Thread> thread;
+  CHECK_OK(Thread::Create("TestThread", "MaintenanceManagerTest",
+      boost::bind(&TestMaintenanceOp::set_ram_anchored, &op, 1100), &thread));
+  op.WaitForState(OP_FINISHED);
+  manager_->UnregisterOp(&op);
+  ThreadJoiner(thread.get()).Join();
+}
+
+// Test that ops are prioritized correctly when we add log retention.
+TEST_F(MaintenanceManagerTest, TestLogRetentionPrioritization) {
+  manager_->Shutdown();
+
+  TestMaintenanceOp op1("op1", MaintenanceOp::LOW_IO_USAGE, OP_RUNNABLE, test_tracker_);
+  op1.set_ram_anchored(0);
+  op1.set_logs_retained_bytes(100);
+
+  TestMaintenanceOp op2("op2", MaintenanceOp::HIGH_IO_USAGE, OP_RUNNABLE, test_tracker_);
+  op2.set_ram_anchored(100);
+  op2.set_logs_retained_bytes(100);
+
+  TestMaintenanceOp op3("op3", MaintenanceOp::HIGH_IO_USAGE, OP_RUNNABLE, test_tracker_);
+  op3.set_ram_anchored(200);
+  op3.set_logs_retained_bytes(100);
+
+  manager_->RegisterOp(&op1);
+  manager_->RegisterOp(&op2);
+  manager_->RegisterOp(&op3);
+
+  // We want to do the low IO op first since it clears up some log retention.
+  ASSERT_EQ(&op1, manager_->FindBestOp());
+
+  manager_->UnregisterOp(&op1);
+
+  // Low IO is taken care of, now we find the op clears the most log retention and ram.
+  ASSERT_EQ(&op3, manager_->FindBestOp());
+
+  manager_->UnregisterOp(&op3);
+
+  ASSERT_EQ(&op2, manager_->FindBestOp());
+
+  manager_->UnregisterOp(&op2);
+}
+
+// Test adding operations and make sure that the history of recently completed operations
+// is correct in that it wraps around and doesn't grow.
+TEST_F(MaintenanceManagerTest, TestCompletedOpsHistory) {
+  for (int i = 0; i < 5; i++) {
+    string name = Substitute("op$0", i);
+    TestMaintenanceOp op(name, MaintenanceOp::HIGH_IO_USAGE, OP_RUNNABLE, test_tracker_);
+    op.set_perf_improvement(1);
+    op.set_ram_anchored(100);
+    manager_->RegisterOp(&op);
+
+    CHECK_EQ(true, op.WaitForStateWithTimeout(OP_FINISHED, 200));
+    manager_->UnregisterOp(&op);
+
+    MaintenanceManagerStatusPB status_pb;
+    manager_->GetMaintenanceManagerStatusDump(&status_pb);
+    // The size should be at most the history_size.
+    ASSERT_GE(kHistorySize, status_pb.completed_operations_size());
+    // See that we have the right name, even if we wrap around.
+    ASSERT_EQ(name, status_pb.completed_operations(i % 4).name());
+  }
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/util/maintenance_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/maintenance_manager.cc b/src/kudu/util/maintenance_manager.cc
new file mode 100644
index 0000000..1060bf8
--- /dev/null
+++ b/src/kudu/util/maintenance_manager.cc
@@ -0,0 +1,415 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/maintenance_manager.h"
+
+#include <gflags/gflags.h>
+#include <memory>
+#include <stdint.h>
+#include <string>
+#include <utility>
+
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/debug/trace_event.h"
+#include "kudu/util/debug/trace_logging.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/mem_tracker.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/thread.h"
+
+using std::pair;
+using std::shared_ptr;
+using strings::Substitute;
+
+DEFINE_int32(maintenance_manager_num_threads, 1,
+       "Size of the maintenance manager thread pool. Beyond a value of '1', one thread is "
+       "reserved for emergency flushes. For spinning disks, the number of threads should "
+       "not be above the number of devices.");
+TAG_FLAG(maintenance_manager_num_threads, stable);
+
+DEFINE_int32(maintenance_manager_polling_interval_ms, 250,
+       "Polling interval for the maintenance manager scheduler, "
+       "in milliseconds.");
+TAG_FLAG(maintenance_manager_polling_interval_ms, hidden);
+
+DEFINE_int32(maintenance_manager_history_size, 8,
+       "Number of completed operations the manager is keeping track of.");
+TAG_FLAG(maintenance_manager_history_size, hidden);
+
+DEFINE_bool(enable_maintenance_manager, true,
+       "Enable the maintenance manager, runs compaction and tablet cleaning tasks.");
+TAG_FLAG(enable_maintenance_manager, unsafe);
+
+namespace kudu {
+
+MaintenanceOpStats::MaintenanceOpStats() {
+  Clear();
+}
+
+void MaintenanceOpStats::Clear() {
+  valid_ = false;
+  runnable_ = false;
+  ram_anchored_ = 0;
+  logs_retained_bytes_ = 0;
+  perf_improvement_ = 0;
+}
+
+MaintenanceOp::MaintenanceOp(std::string name, IOUsage io_usage)
+    : name_(std::move(name)), running_(0), io_usage_(io_usage) {}
+
+MaintenanceOp::~MaintenanceOp() {
+  CHECK(!manager_.get()) << "You must unregister the " << name_
+         << " Op before destroying it.";
+}
+
+void MaintenanceOp::Unregister() {
+  CHECK(manager_.get()) << "Op " << name_ << " was never registered.";
+  manager_->UnregisterOp(this);
+}
+
+const MaintenanceManager::Options MaintenanceManager::DEFAULT_OPTIONS = {
+  0,
+  0,
+  0,
+  shared_ptr<MemTracker>(),
+};
+
+MaintenanceManager::MaintenanceManager(const Options& options)
+  : num_threads_(options.num_threads <= 0 ?
+      FLAGS_maintenance_manager_num_threads : options.num_threads),
+    cond_(&lock_),
+    shutdown_(false),
+    running_ops_(0),
+    polling_interval_ms_(options.polling_interval_ms <= 0 ?
+          FLAGS_maintenance_manager_polling_interval_ms :
+          options.polling_interval_ms),
+    completed_ops_count_(0),
+    parent_mem_tracker_(!options.parent_mem_tracker ?
+        MemTracker::GetRootTracker() : options.parent_mem_tracker) {
+  CHECK_OK(ThreadPoolBuilder("MaintenanceMgr").set_min_threads(num_threads_)
+               .set_max_threads(num_threads_).Build(&thread_pool_));
+  uint32_t history_size = options.history_size == 0 ?
+                          FLAGS_maintenance_manager_history_size :
+                          options.history_size;
+  completed_ops_.resize(history_size);
+}
+
+MaintenanceManager::~MaintenanceManager() {
+  Shutdown();
+}
+
+Status MaintenanceManager::Init() {
+  RETURN_NOT_OK(Thread::Create("maintenance", "maintenance_scheduler",
+      boost::bind(&MaintenanceManager::RunSchedulerThread, this),
+      &monitor_thread_));
+  return Status::OK();
+}
+
+void MaintenanceManager::Shutdown() {
+  {
+    std::lock_guard<Mutex> guard(lock_);
+    if (shutdown_) {
+      return;
+    }
+    shutdown_ = true;
+    cond_.Broadcast();
+  }
+  if (monitor_thread_.get()) {
+    CHECK_OK(ThreadJoiner(monitor_thread_.get()).Join());
+    monitor_thread_.reset();
+    thread_pool_->Shutdown();
+  }
+}
+
+void MaintenanceManager::RegisterOp(MaintenanceOp* op) {
+  std::lock_guard<Mutex> guard(lock_);
+  CHECK(!op->manager_.get()) << "Tried to register " << op->name()
+          << ", but it was already registered.";
+  pair<OpMapTy::iterator, bool> val
+    (ops_.insert(OpMapTy::value_type(op, MaintenanceOpStats())));
+  CHECK(val.second)
+      << "Tried to register " << op->name()
+      << ", but it already exists in ops_.";
+  op->manager_ = shared_from_this();
+  op->cond_.reset(new ConditionVariable(&lock_));
+  VLOG_AND_TRACE("maintenance", 1) << "Registered " << op->name();
+}
+
+void MaintenanceManager::UnregisterOp(MaintenanceOp* op) {
+  {
+    std::lock_guard<Mutex> guard(lock_);
+    CHECK(op->manager_.get() == this) << "Tried to unregister " << op->name()
+          << ", but it is not currently registered with this maintenance manager.";
+    auto iter = ops_.find(op);
+    CHECK(iter != ops_.end()) << "Tried to unregister " << op->name()
+        << ", but it was never registered";
+    // While the op is running, wait for it to be finished.
+    if (iter->first->running_ > 0) {
+      VLOG_AND_TRACE("maintenance", 1) << "Waiting for op " << op->name() << " to finish so "
+            << "we can unregister it.";
+    }
+    while (iter->first->running_ > 0) {
+      op->cond_->Wait();
+      iter = ops_.find(op);
+      CHECK(iter != ops_.end()) << "Tried to unregister " << op->name()
+          << ", but another thread unregistered it while we were "
+          << "waiting for it to complete";
+    }
+    ops_.erase(iter);
+  }
+  LOG(INFO) << "Unregistered op " << op->name();
+  op->cond_.reset();
+  // Remove the op's shared_ptr reference to us.  This might 'delete this'.
+  op->manager_.reset();
+}
+
+void MaintenanceManager::RunSchedulerThread() {
+  MonoDelta polling_interval = MonoDelta::FromMilliseconds(polling_interval_ms_);
+
+  std::unique_lock<Mutex> guard(lock_);
+  while (true) {
+    // Loop until we are shutting down or it is time to run another op.
+    cond_.TimedWait(polling_interval);
+    if (shutdown_) {
+      VLOG_AND_TRACE("maintenance", 1) << "Shutting down maintenance manager.";
+      return;
+    }
+
+    // Find the best op.
+    MaintenanceOp* op = FindBestOp();
+    if (!op) {
+      VLOG_AND_TRACE("maintenance", 2) << "No maintenance operations look worth doing.";
+      continue;
+    }
+
+    // Prepare the maintenance operation.
+    op->running_++;
+    running_ops_++;
+    guard.unlock();
+    bool ready = op->Prepare();
+    guard.lock();
+    if (!ready) {
+      LOG(INFO) << "Prepare failed for " << op->name()
+                << ".  Re-running scheduler.";
+      op->running_--;
+      op->cond_->Signal();
+      continue;
+    }
+
+    // Run the maintenance operation.
+    Status s = thread_pool_->SubmitFunc(boost::bind(
+          &MaintenanceManager::LaunchOp, this, op));
+    CHECK(s.ok());
+  }
+}
+
+// Finding the best operation goes through four filters:
+// - If there's an Op that we can run quickly that frees log retention, we run it.
+// - If we've hit the overall process memory limit (note: this includes memory that the Ops cannot
+//   free), we run the Op with the highest RAM usage.
+// - If there are Ops that retain logs, we run the one that has the highest retention (and if many
+//   qualify, then we run the one that also frees up the most RAM).
+// - Finally, if there's nothing else that we really need to do, we run the Op that will improve
+//   performance the most.
+//
+// The reason it's done this way is that we want to prioritize limiting the amount of resources we
+// hold on to. Low IO Ops go first since we can quickly run them, then we can look at memory usage.
+// Reversing those can starve the low IO Ops when the system is under intense memory pressure.
+//
+// In the third priority we're at a point where nothing's urgent and there's nothing we can run
+// quickly.
+// TODO We currently optimize for freeing log retention but we could consider having some sort of
+// sliding priority between log retention and RAM usage. For example, is an Op that frees
+// 128MB of log retention and 12MB of RAM always better than an op that frees 12MB of log retention
+// and 128MB of RAM? Maybe a more holistic approach would be better.
+MaintenanceOp* MaintenanceManager::FindBestOp() {
+  TRACE_EVENT0("maintenance", "MaintenanceManager::FindBestOp");
+
+  if (!FLAGS_enable_maintenance_manager) {
+    VLOG_AND_TRACE("maintenance", 1) << "Maintenance manager is disabled. Doing nothing";
+    return nullptr;
+  }
+  size_t free_threads = num_threads_ - running_ops_;
+  if (free_threads == 0) {
+    VLOG_AND_TRACE("maintenance", 1) << "there are no free threads, so we can't run anything.";
+    return nullptr;
+  }
+
+  int64_t low_io_most_logs_retained_bytes = 0;
+  MaintenanceOp* low_io_most_logs_retained_bytes_op = nullptr;
+
+  uint64_t most_mem_anchored = 0;
+  MaintenanceOp* most_mem_anchored_op = nullptr;
+
+  int64_t most_logs_retained_bytes = 0;
+  int64_t most_logs_retained_bytes_ram_anchored = 0;
+  MaintenanceOp* most_logs_retained_bytes_op = nullptr;
+
+  double best_perf_improvement = 0;
+  MaintenanceOp* best_perf_improvement_op = nullptr;
+  for (OpMapTy::value_type &val : ops_) {
+    MaintenanceOp* op(val.first);
+    MaintenanceOpStats& stats(val.second);
+    // Update op stats.
+    stats.Clear();
+    op->UpdateStats(&stats);
+    if (!stats.valid() || !stats.runnable()) {
+      continue;
+    }
+    if (stats.logs_retained_bytes() > low_io_most_logs_retained_bytes &&
+        op->io_usage_ == MaintenanceOp::LOW_IO_USAGE) {
+      low_io_most_logs_retained_bytes_op = op;
+      low_io_most_logs_retained_bytes = stats.logs_retained_bytes();
+    }
+
+    if (stats.ram_anchored() > most_mem_anchored) {
+      most_mem_anchored_op = op;
+      most_mem_anchored = stats.ram_anchored();
+    }
+    // We prioritize ops that can free more logs, but when it's the same we pick the one that
+    // also frees up the most memory.
+    if (stats.logs_retained_bytes() > 0 &&
+        (stats.logs_retained_bytes() > most_logs_retained_bytes ||
+            (stats.logs_retained_bytes() == most_logs_retained_bytes &&
+                stats.ram_anchored() > most_logs_retained_bytes_ram_anchored))) {
+      most_logs_retained_bytes_op = op;
+      most_logs_retained_bytes = stats.logs_retained_bytes();
+      most_logs_retained_bytes_ram_anchored = stats.ram_anchored();
+    }
+    if ((!best_perf_improvement_op) ||
+        (stats.perf_improvement() > best_perf_improvement)) {
+      best_perf_improvement_op = op;
+      best_perf_improvement = stats.perf_improvement();
+    }
+  }
+
+  // Look at ops that we can run quickly that free up log retention.
+  if (low_io_most_logs_retained_bytes_op) {
+    if (low_io_most_logs_retained_bytes > 0) {
+      VLOG_AND_TRACE("maintenance", 1)
+                    << "Performing " << low_io_most_logs_retained_bytes_op->name() << ", "
+                    << "because it can free up more logs "
+                    << "at " << low_io_most_logs_retained_bytes
+                    << " bytes with a low IO cost";
+      return low_io_most_logs_retained_bytes_op;
+    }
+  }
+
+  // Look at free memory. If it is dangerously low, we must select something
+  // that frees memory-- the op with the most anchored memory.
+  double capacity_pct;
+  if (parent_mem_tracker_->AnySoftLimitExceeded(&capacity_pct)) {
+    if (!most_mem_anchored_op) {
+      string msg = StringPrintf("we have exceeded our soft memory limit "
+          "(current capacity is %.2f%%).  However, there are no ops currently "
+          "runnable which would free memory.", capacity_pct);
+      LOG(INFO) << msg;
+      return nullptr;
+    }
+    VLOG_AND_TRACE("maintenance", 1) << "we have exceeded our soft memory limit "
+            << "(current capacity is " << capacity_pct << "%).  Running the op "
+            << "which anchors the most memory: " << most_mem_anchored_op->name();
+    return most_mem_anchored_op;
+  }
+
+  if (most_logs_retained_bytes_op) {
+    VLOG_AND_TRACE("maintenance", 1)
+            << "Performing " << most_logs_retained_bytes_op->name() << ", "
+            << "because it can free up more logs " << "at " << most_logs_retained_bytes
+            << " bytes";
+    return most_logs_retained_bytes_op;
+  }
+
+  if (best_perf_improvement_op) {
+    if (best_perf_improvement > 0) {
+      VLOG_AND_TRACE("maintenance", 1) << "Performing " << best_perf_improvement_op->name() << ", "
+                 << "because it had the best perf_improvement score, "
+                 << "at " << best_perf_improvement;
+      return best_perf_improvement_op;
+    }
+  }
+  return nullptr;
+}
+
+void MaintenanceManager::LaunchOp(MaintenanceOp* op) {
+  MonoTime start_time(MonoTime::Now(MonoTime::FINE));
+  op->RunningGauge()->Increment();
+  LOG_TIMING(INFO, Substitute("running $0", op->name())) {
+    TRACE_EVENT1("maintenance", "MaintenanceManager::LaunchOp",
+                 "name", op->name());
+    op->Perform();
+  }
+  op->RunningGauge()->Decrement();
+  MonoTime end_time(MonoTime::Now(MonoTime::FINE));
+  MonoDelta delta(end_time.GetDeltaSince(start_time));
+  std::lock_guard<Mutex> guard(lock_);
+
+  CompletedOp& completed_op = completed_ops_[completed_ops_count_ % completed_ops_.size()];
+  completed_op.name = op->name();
+  completed_op.duration = delta;
+  completed_op.start_mono_time = start_time;
+  completed_ops_count_++;
+
+  op->DurationHistogram()->Increment(delta.ToMilliseconds());
+
+  running_ops_--;
+  op->running_--;
+  op->cond_->Signal();
+}
+
+void MaintenanceManager::GetMaintenanceManagerStatusDump(MaintenanceManagerStatusPB* out_pb) {
+  DCHECK(out_pb != nullptr);
+  std::lock_guard<Mutex> guard(lock_);
+  MaintenanceOp* best_op = FindBestOp();
+  for (MaintenanceManager::OpMapTy::value_type& val : ops_) {
+    MaintenanceManagerStatusPB_MaintenanceOpPB* op_pb = out_pb->add_registered_operations();
+    MaintenanceOp* op(val.first);
+    MaintenanceOpStats& stat(val.second);
+    op_pb->set_name(op->name());
+    op_pb->set_running(op->running());
+    if (stat.valid()) {
+      op_pb->set_runnable(stat.runnable());
+      op_pb->set_ram_anchored_bytes(stat.ram_anchored());
+      op_pb->set_logs_retained_bytes(stat.logs_retained_bytes());
+      op_pb->set_perf_improvement(stat.perf_improvement());
+    } else {
+      op_pb->set_runnable(false);
+      op_pb->set_ram_anchored_bytes(0);
+      op_pb->set_logs_retained_bytes(0);
+      op_pb->set_perf_improvement(0.0);
+    }
+
+    if (best_op == op) {
+      out_pb->mutable_best_op()->CopyFrom(*op_pb);
+    }
+  }
+
+  for (const CompletedOp& completed_op : completed_ops_) {
+    if (!completed_op.name.empty()) {
+      MaintenanceManagerStatusPB_CompletedOpPB* completed_pb = out_pb->add_completed_operations();
+      completed_pb->set_name(completed_op.name);
+      completed_pb->set_duration_millis(completed_op.duration.ToMilliseconds());
+
+      MonoDelta delta(MonoTime::Now(MonoTime::FINE).GetDeltaSince(completed_op.start_mono_time));
+      completed_pb->set_secs_since_start(delta.ToSeconds());
+    }
+  }
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/ee380939/src/kudu/util/maintenance_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/maintenance_manager.h b/src/kudu/util/maintenance_manager.h
new file mode 100644
index 0000000..ef3229e
--- /dev/null
+++ b/src/kudu/util/maintenance_manager.h
@@ -0,0 +1,277 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include <map>
+#include <memory>
+#include <set>
+#include <string>
+#include <vector>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/condition_variable.h"
+#include "kudu/util/maintenance_manager.pb.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/thread.h"
+#include "kudu/util/threadpool.h"
+
+namespace kudu {
+
+template<class T>
+class AtomicGauge;
+class Histogram;
+class MaintenanceManager;
+class MemTracker;
+
+class MaintenanceOpStats {
+ public:
+  MaintenanceOpStats();
+
+  // Zero all stats. They are invalid until the first setter is called.
+  void Clear();
+
+  bool runnable() const {
+    DCHECK(valid_);
+    return runnable_;
+  }
+
+  void set_runnable(bool runnable) {
+    UpdateLastModified();
+    runnable_ = runnable;
+  }
+
+  uint64_t ram_anchored() const {
+    DCHECK(valid_);
+    return ram_anchored_;
+  }
+
+  void set_ram_anchored(uint64_t ram_anchored) {
+    UpdateLastModified();
+    ram_anchored_ = ram_anchored;
+  }
+
+  int64_t logs_retained_bytes() const {
+    DCHECK(valid_);
+    return logs_retained_bytes_;
+  }
+
+  void set_logs_retained_bytes(int64_t logs_retained_bytes) {
+    UpdateLastModified();
+    logs_retained_bytes_ = logs_retained_bytes;
+  }
+
+  double perf_improvement() const {
+    DCHECK(valid_);
+    return perf_improvement_;
+  }
+
+  void set_perf_improvement(double perf_improvement) {
+    UpdateLastModified();
+    perf_improvement_ = perf_improvement;
+  }
+
+  const MonoTime& last_modified() const {
+    DCHECK(valid_);
+    return last_modified_;
+  }
+
+  bool valid() const {
+    return valid_;
+  }
+
+ private:
+  void UpdateLastModified() {
+    valid_ = true;
+    last_modified_ = MonoTime::Now(MonoTime::FINE);
+  }
+
+  // True if these stats are valid.
+  bool valid_;
+
+  // True if this op can be run now.
+  bool runnable_;
+
+  // The approximate amount of memory that not doing this operation keeps
+  // around.  This number is used to decide when to start freeing memory, so it
+  // should be fairly accurate.  May be 0.
+  uint64_t ram_anchored_;
+
+  // The approximate amount of disk space that not doing this operation keeps us from GCing from
+  // the logs. May be 0.
+  int64_t logs_retained_bytes_;
+
+  // The estimated performance improvement-- how good it is to do this on some
+  // absolute scale (yet TBD).
+  double perf_improvement_;
+
+  // The last time that the stats were modified.
+  MonoTime last_modified_;
+};
+
+// MaintenanceOp objects represent background operations that the
+// MaintenanceManager can schedule.  Once a MaintenanceOp is registered, the
+// manager will periodically poll it for statistics.  The registrant is
+// responsible for managing the memory associated with the MaintenanceOp object.
+// Op objects should be unregistered before being de-allocated.
+class MaintenanceOp {
+ public:
+  friend class MaintenanceManager;
+
+  // General indicator of how much IO the Op will use.
+  enum IOUsage {
+    LOW_IO_USAGE, // Low impact operations like removing a file, updating metadata.
+    HIGH_IO_USAGE // Everything else.
+  };
+
+  explicit MaintenanceOp(std::string name, IOUsage io_usage);
+  virtual ~MaintenanceOp();
+
+  // Unregister this op, if it is currently registered.
+  void Unregister();
+
+  // Update the op statistics.  This will be called every scheduling period
+  // (about a few times a second), so it should not be too expensive.  It's
+  // possible for the returned statistics to be invalid; the caller should
+  // call MaintenanceOpStats::valid() before using them.  This will be run
+  // under the MaintenanceManager lock.
+  virtual void UpdateStats(MaintenanceOpStats* stats) = 0;
+
+  // Prepare to perform the operation.  This will be run without holding the
+  // maintenance manager lock.  It should be short, since it is run from the
+  // context of the maintenance op scheduler thread rather than a worker thread.
+  // If this returns false, we will abort the operation.
+  virtual bool Prepare() = 0;
+
+  // Perform the operation.  This will be run without holding the maintenance
+  // manager lock, and may take a long time.
+  virtual void Perform() = 0;
+
+  // Returns the histogram for this op that tracks duration. Cannot be NULL.
+  virtual scoped_refptr<Histogram> DurationHistogram() const = 0;
+
+  // Returns the gauge for this op that tracks when this op is running. Cannot be NULL.
+  virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const = 0;
+
+  uint32_t running() { return running_; }
+
+  std::string name() const { return name_; }
+
+  IOUsage io_usage() const { return io_usage_; }
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(MaintenanceOp);
+
+  // The name of the operation.  Op names must be unique.
+  const std::string name_;
+
+  // The number of times that this op is currently running.
+  uint32_t running_;
+
+  // Condition variable which the UnregisterOp function can wait on.
+  //
+  // Note: 'cond_' is used with the MaintenanceManager's mutex. As such,
+  // it only exists when the op is registered.
+  gscoped_ptr<ConditionVariable> cond_;
+
+  // The MaintenanceManager with which this op is registered, or null
+  // if it is not registered.
+  std::shared_ptr<MaintenanceManager> manager_;
+
+  IOUsage io_usage_;
+};
+
+struct MaintenanceOpComparator {
+  bool operator() (const MaintenanceOp* lhs,
+                   const MaintenanceOp* rhs) const {
+    return lhs->name().compare(rhs->name()) < 0;
+  }
+};
+
+// Holds the information regarding a recently completed operation.
+struct CompletedOp {
+  std::string name;
+  MonoDelta duration;
+  MonoTime start_mono_time;
+};
+
+// The MaintenanceManager manages the scheduling of background operations such
+// as flushes or compactions.  It runs these operations in the background, in a
+// thread pool.  It uses information provided in MaintenanceOpStats objects to
+// decide which operations, if any, to run.
+class MaintenanceManager : public std::enable_shared_from_this<MaintenanceManager> {
+ public:
+  struct Options {
+    int32_t num_threads;
+    int32_t polling_interval_ms;
+    uint32_t history_size;
+    std::shared_ptr<MemTracker> parent_mem_tracker;
+  };
+
+  explicit MaintenanceManager(const Options& options);
+  ~MaintenanceManager();
+
+  Status Init();
+  void Shutdown();
+
+  // Register an op with the manager.
+  void RegisterOp(MaintenanceOp* op);
+
+  // Unregister an op with the manager.
+  // If the Op is currently running, it will not be interrupted.  However, this
+  // function will block until the Op is finished.
+  void UnregisterOp(MaintenanceOp* op);
+
+  void GetMaintenanceManagerStatusDump(MaintenanceManagerStatusPB* out_pb);
+
+  static const Options DEFAULT_OPTIONS;
+
+ private:
+  FRIEND_TEST(MaintenanceManagerTest, TestLogRetentionPrioritization);
+  typedef std::map<MaintenanceOp*, MaintenanceOpStats,
+          MaintenanceOpComparator> OpMapTy;
+
+  void RunSchedulerThread();
+
+  // find the best op, or null if there is nothing we want to run
+  MaintenanceOp* FindBestOp();
+
+  void LaunchOp(MaintenanceOp* op);
+
+  const int32_t num_threads_;
+  OpMapTy ops_; // registered operations
+  Mutex lock_;
+  scoped_refptr<kudu::Thread> monitor_thread_;
+  gscoped_ptr<ThreadPool> thread_pool_;
+  ConditionVariable cond_;
+  bool shutdown_;
+  uint64_t running_ops_;
+  int32_t polling_interval_ms_;
+  // Vector used as a circular buffer for recently completed ops. Elements need to be added at
+  // the completed_ops_count_ % the vector's size and then the count needs to be incremented.
+  std::vector<CompletedOp> completed_ops_;
+  int64_t completed_ops_count_;
+  std::shared_ptr<MemTracker> parent_mem_tracker_;
+
+  DISALLOW_COPY_AND_ASSIGN(MaintenanceManager);
+};
+
+} // namespace kudu



[5/6] incubator-kudu git commit: Integrate the result tracker with writes

Posted by mp...@apache.org.
Integrate the result tracker with writes

This patch integrates the result tracker with write transactions,
including:
- Support for caching responses on tablet bootstrap.
- Support for caching responses for follower transactions.
- Handling of races between client originated and (previous?) leader
  originated transactions.
- An explanation of the interaction between the result tracker
  and the transaction driver in transaction_driver.h.

For integration tests, this patch removes the check that allowed
Status::AlreadyPresent() that we added as we didn't have support for
exactly once semantics. Without this check, in slow mode, some tests
like raft_consensus-itest would fail 100% of the time, due to rows
being inserted multiple times.

Because we'd have 100% failure rate without replay cache for
writes and because testing it specifically is involved, this patch
doesn't include additional tests, these will be pushed as a follow up.

Change-Id: I1fa2f8db33653960f4749237b8993baba0929893
Reviewed-on: http://gerrit.cloudera.org:8080/3449
Reviewed-by: David Ribeiro Alves <dr...@apache.org>
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/6d2679bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/6d2679bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/6d2679bd

Branch: refs/heads/master
Commit: 6d2679bd14fdf454f2b194c83cd90316776d4dfe
Parents: 0412fa7
Author: dralves <dr...@apache.org>
Authored: Fri Jul 8 20:36:30 2016 -0700
Committer: David Ribeiro Alves <dr...@apache.org>
Committed: Fri Jul 15 00:10:44 2016 +0000

----------------------------------------------------------------------
 src/kudu/client/batcher.cc                      |  3 +-
 src/kudu/consensus/consensus.proto              |  4 +
 src/kudu/integration-tests/test_workload.cc     | 12 ++-
 src/kudu/integration-tests/test_workload.h      | 10 ++
 src/kudu/master/sys_catalog.cc                  |  7 +-
 src/kudu/rpc/rpc_context.cc                     | 17 ++++
 src/kudu/rpc/rpc_context.h                      |  6 ++
 src/kudu/tablet/tablet_bootstrap-test.cc        |  1 +
 src/kudu/tablet/tablet_bootstrap.cc             | 60 ++++++++++--
 src/kudu/tablet/tablet_bootstrap.h              |  5 +
 src/kudu/tablet/tablet_peer-test.cc             |  3 +
 src/kudu/tablet/tablet_peer.cc                  | 17 +++-
 src/kudu/tablet/tablet_peer.h                   |  5 +
 src/kudu/tablet/transactions/transaction.h      | 35 +++++++
 .../tablet/transactions/transaction_driver.cc   | 98 +++++++++++++++++++-
 .../tablet/transactions/transaction_driver.h    | 93 +++++++++++++++++++
 .../tablet/transactions/write_transaction.cc    | 12 ++-
 .../tablet/transactions/write_transaction.h     | 11 ++-
 src/kudu/tools/insert-generated-rows.cc         |  6 +-
 src/kudu/tserver/CMakeLists.txt                 |  1 +
 .../tserver/remote_bootstrap_session-test.cc    |  6 +-
 src/kudu/tserver/tablet_service.cc              | 13 ++-
 src/kudu/tserver/ts_tablet_manager.cc           |  6 ++
 src/kudu/tserver/ts_tablet_manager.h            |  4 +
 src/kudu/tserver/tserver_service.proto          |  5 +-
 25 files changed, 399 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/client/batcher.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/batcher.cc b/src/kudu/client/batcher.cc
index e15d135..8b68c5b 100644
--- a/src/kudu/client/batcher.cc
+++ b/src/kudu/client/batcher.cc
@@ -299,8 +299,7 @@ string WriteRpc::ToString() const {
 }
 
 void WriteRpc::Try(RemoteTabletServer* replica, const ResponseCallback& callback) {
-  VLOG(2) << "Tablet " << tablet_id_ << ": Writing batch to replica "
-      << replica->ToString();
+  VLOG(2) << "Tablet " << tablet_id_ << ": Writing batch to replica " << replica->ToString();
   replica->proxy()->WriteAsync(req_, &resp_,
                                mutable_retrier()->mutable_controller(),
                                callback);

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/consensus/consensus.proto
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus.proto b/src/kudu/consensus/consensus.proto
index 3439d5b..d8fb1d3 100644
--- a/src/kudu/consensus/consensus.proto
+++ b/src/kudu/consensus/consensus.proto
@@ -22,6 +22,7 @@ import "kudu/common/common.proto";
 import "kudu/common/wire_protocol.proto";
 import "kudu/consensus/metadata.proto";
 import "kudu/consensus/opid.proto";
+import "kudu/rpc/rpc_header.proto";
 import "kudu/tablet/metadata.proto";
 import "kudu/tablet/tablet.proto";
 import "kudu/tserver/tserver_admin.proto";
@@ -177,6 +178,9 @@ message ReplicateMsg {
   optional tserver.AlterSchemaRequestPB alter_schema_request = 6;
   optional ChangeConfigRecordPB change_config_record = 7;
 
+  // The client's request id for this message, if it is set.
+  optional rpc.RequestIdPB request_id = 8;
+
   optional NoOpRequestPB noop_request = 999;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/integration-tests/test_workload.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/test_workload.cc b/src/kudu/integration-tests/test_workload.cc
index 541647a..e3e83ad 100644
--- a/src/kudu/integration-tests/test_workload.cc
+++ b/src/kudu/integration-tests/test_workload.cc
@@ -58,6 +58,7 @@ TestWorkload::TestWorkload(ExternalMiniCluster* cluster)
     write_timeout_millis_(20000),
     timeout_allowed_(false),
     not_found_allowed_(false),
+    already_present_allowed_(false),
     num_replicas_(3),
     num_tablets_(1),
     table_name_(kDefaultTableName),
@@ -148,11 +149,12 @@ void TestWorkload::WriteThread() {
         if (not_found_allowed_ && e->status().IsNotFound()) {
           continue;
         }
-        // We don't handle write idempotency yet. (i.e making sure that when a leader fails
-        // writes to it that were eventually committed by the new leader but un-ackd to the
-        // client are not retried), so some errors are expected.
-        // It's OK as long as the errors are Status::AlreadyPresent();
-        CHECK(e->status().IsAlreadyPresent()) << "Unexpected error: " << e->status().ToString();
+
+        if (already_present_allowed_ && e->status().IsAlreadyPresent()) {
+          continue;
+        }
+
+        CHECK(e->status().ok()) << "Unexpected status: " << e->status().ToString();
       }
       inserted -= errors.size();
     }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/integration-tests/test_workload.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/test_workload.h b/src/kudu/integration-tests/test_workload.h
index 0c271b9..fd3ec7e 100644
--- a/src/kudu/integration-tests/test_workload.h
+++ b/src/kudu/integration-tests/test_workload.h
@@ -78,6 +78,12 @@ class TestWorkload {
     not_found_allowed_ = allowed;
   }
 
+  // Whether per-row errors with Status::AlreadyPresent() are allowed.
+  // By default this triggers a check failure.
+  void set_already_present_allowed(bool allowed) {
+    already_present_allowed_ = allowed;
+  }
+
   void set_num_replicas(int r) {
     num_replicas_ = r;
   }
@@ -116,6 +122,9 @@ class TestWorkload {
 
   void set_write_pattern(WritePattern pattern) {
     write_pattern_ = pattern;
+    // Since we're writing with dup keys we will get AlreadyPresent() errors on the response
+    // so allow it.
+    set_already_present_allowed(true);
   }
 
   // Sets up the internal client and creates the table which will be used for
@@ -156,6 +165,7 @@ class TestWorkload {
   int write_timeout_millis_;
   bool timeout_allowed_;
   bool not_found_allowed_;
+  bool already_present_allowed_;
   WritePattern write_pattern_ = INSERT_RANDOM_ROWS;
 
   int num_replicas_;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/master/sys_catalog.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index dba2d77..824561f 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -266,6 +266,7 @@ Status SysCatalogTable::SetupTablet(const scoped_refptr<tablet::TabletMetadata>&
   RETURN_NOT_OK(BootstrapTablet(metadata,
                                 scoped_refptr<server::Clock>(master_->clock()),
                                 master_->mem_tracker(),
+                                scoped_refptr<rpc::ResultTracker>(),
                                 metric_registry_,
                                 tablet_peer_->status_listener(),
                                 &tablet,
@@ -279,6 +280,7 @@ Status SysCatalogTable::SetupTablet(const scoped_refptr<tablet::TabletMetadata>&
   RETURN_NOT_OK_PREPEND(tablet_peer_->Init(tablet,
                                            scoped_refptr<server::Clock>(master_->clock()),
                                            master_->messenger(),
+                                           scoped_refptr<rpc::ResultTracker>(),
                                            log,
                                            tablet->GetMetricEntity()),
                         "Failed to Init() TabletPeer");
@@ -330,7 +332,10 @@ Status SysCatalogTable::SyncWrite(const WriteRequestPB *req, WriteResponsePB *re
   gscoped_ptr<tablet::TransactionCompletionCallback> txn_callback(
     new LatchTransactionCompletionCallback<WriteResponsePB>(&latch, resp));
   unique_ptr<tablet::WriteTransactionState> tx_state(
-      new tablet::WriteTransactionState(tablet_peer_.get(), req, resp));
+      new tablet::WriteTransactionState(tablet_peer_.get(),
+                                        req,
+                                        nullptr, // No RequestIdPB
+                                        resp));
   tx_state->set_completion_callback(std::move(txn_callback));
 
   RETURN_NOT_OK(tablet_peer_->SubmitWrite(std::move(tx_state)));

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/rpc/rpc_context.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_context.cc b/src/kudu/rpc/rpc_context.cc
index c39818b..02ca2cc 100644
--- a/src/kudu/rpc/rpc_context.cc
+++ b/src/kudu/rpc/rpc_context.cc
@@ -68,6 +68,23 @@ void RpcContext::RespondSuccess() {
   }
 }
 
+void RpcContext::RespondNoCache() {
+  if (AreResultsTracked()) {
+    result_tracker_->FailAndRespond(call_->header().request_id(),
+                                    response_pb_.get());
+  } else {
+    VLOG(4) << call_->remote_method().service_name() << ": Sending RPC failure response for "
+        << call_->ToString() << ": " << response_pb_->DebugString();
+    TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this,
+                           "response", pb_util::PbTracer::TracePb(*response_pb_),
+                           "trace", trace()->DumpToString());
+    // This is a bit counter intuitive, but when we get the failure but set the error on the
+    // call's response we call RespondSuccess() instead of RespondFailure().
+    call_->RespondSuccess(*response_pb_);
+    delete this;
+  }
+}
+
 void RpcContext::RespondFailure(const Status &status) {
   if (AreResultsTracked()) {
     result_tracker_->FailAndRespond(call_->header().request_id(),

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/rpc/rpc_context.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_context.h b/src/kudu/rpc/rpc_context.h
index ec32e8b..2ee58b7 100644
--- a/src/kudu/rpc/rpc_context.h
+++ b/src/kudu/rpc/rpc_context.h
@@ -83,6 +83,12 @@ class RpcContext {
   // and response protobufs are also destroyed.
   void RespondSuccess();
 
+  // Like the above, but doesn't store the results of the service call, if results
+  // are being tracked.
+  // Used in cases where a call specific error was set on the response protobuf,
+  // the call should be considered failed, thus results shouldn't be cached.
+  void RespondNoCache();
+
   // Respond with an error to the client. This sends back an error with the code
   // ERROR_APPLICATION. Because there is no more specific error code passed back
   // to the client, most applications should create a custom error PB extension

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/tablet/tablet_bootstrap-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_bootstrap-test.cc b/src/kudu/tablet/tablet_bootstrap-test.cc
index 9713073..17de026 100644
--- a/src/kudu/tablet/tablet_bootstrap-test.cc
+++ b/src/kudu/tablet/tablet_bootstrap-test.cc
@@ -107,6 +107,7 @@ class BootstrapTest : public LogTestBase {
         meta,
         scoped_refptr<Clock>(LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp)),
         shared_ptr<MemTracker>(),
+        scoped_refptr<rpc::ResultTracker>(),
         NULL,
         listener.get(),
         tablet,

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/tablet/tablet_bootstrap.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_bootstrap.cc b/src/kudu/tablet/tablet_bootstrap.cc
index 6395d4e..0fe9e09 100644
--- a/src/kudu/tablet/tablet_bootstrap.cc
+++ b/src/kudu/tablet/tablet_bootstrap.cc
@@ -41,6 +41,7 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/strings/util.h"
 #include "kudu/gutil/walltime.h"
+#include "kudu/rpc/result_tracker.h"
 #include "kudu/server/clock.h"
 #include "kudu/server/hybrid_clock.h"
 #include "kudu/tablet/lock_manager.h"
@@ -97,6 +98,7 @@ using log::LogEntryPB;
 using log::LogOptions;
 using log::LogReader;
 using log::ReadableLogSegment;
+using rpc::ResultTracker;
 using server::Clock;
 using std::map;
 using std::shared_ptr;
@@ -106,6 +108,8 @@ using std::unordered_map;
 using strings::Substitute;
 using tserver::AlterSchemaRequestPB;
 using tserver::WriteRequestPB;
+using tserver::WriteResponsePB;
+
 
 struct ReplayState;
 
@@ -160,6 +164,7 @@ class TabletBootstrap {
   TabletBootstrap(const scoped_refptr<TabletMetadata>& meta,
                   const scoped_refptr<Clock>& clock,
                   shared_ptr<MemTracker> mem_tracker,
+                  const scoped_refptr<ResultTracker> result_tracker,
                   MetricRegistry* metric_registry,
                   TabletStatusListener* listener,
                   const scoped_refptr<LogAnchorRegistry>& log_anchor_registry);
@@ -239,10 +244,11 @@ class TabletBootstrap {
                            const TxResultPB& result,
                            const vector<bool>& already_flushed);
 
-  // Determine which of the operations from 'result' correspond to already-flushed
-  // stores.
-  Status DetermineFlushedOps(const TxResultPB& result,
-                             vector<bool>* flushed_by_op);
+  // Determine which of the operations from 'result' correspond to already-flushed stores.
+  // At the same time this builds the WriteResponsePB that we'll store on the ResultTracker.
+  Status DetermineFlushedOpsAndBuildResponse(const TxResultPB& result,
+                                             vector<bool>* flushed_by_op,
+                                             WriteResponsePB* response);
 
   // Pass through all of the decoded operations in tx_state. For
   // each op:
@@ -305,6 +311,7 @@ class TabletBootstrap {
   scoped_refptr<TabletMetadata> meta_;
   scoped_refptr<Clock> clock_;
   shared_ptr<MemTracker> mem_tracker_;
+  scoped_refptr<rpc::ResultTracker> result_tracker_;
   MetricRegistry* metric_registry_;
   TabletStatusListener* listener_;
   gscoped_ptr<tablet::Tablet> tablet_;
@@ -399,6 +406,7 @@ void TabletStatusListener::StatusMessage(const string& status) {
 Status BootstrapTablet(const scoped_refptr<TabletMetadata>& meta,
                        const scoped_refptr<Clock>& clock,
                        const shared_ptr<MemTracker>& mem_tracker,
+                       const scoped_refptr<ResultTracker>& result_tracker,
                        MetricRegistry* metric_registry,
                        TabletStatusListener* listener,
                        shared_ptr<tablet::Tablet>* rebuilt_tablet,
@@ -407,7 +415,7 @@ Status BootstrapTablet(const scoped_refptr<TabletMetadata>& meta,
                        ConsensusBootstrapInfo* consensus_info) {
   TRACE_EVENT1("tablet", "BootstrapTablet",
                "tablet_id", meta->tablet_id());
-  TabletBootstrap bootstrap(meta, clock, mem_tracker,
+  TabletBootstrap bootstrap(meta, clock, mem_tracker, result_tracker,
                             metric_registry, listener, log_anchor_registry);
   RETURN_NOT_OK(bootstrap.Bootstrap(rebuilt_tablet, rebuilt_log, consensus_info));
   // This is necessary since OpenNewLog() initially disables sync.
@@ -436,11 +444,13 @@ static string DebugInfo(const string& tablet_id,
 TabletBootstrap::TabletBootstrap(
     const scoped_refptr<TabletMetadata>& meta,
     const scoped_refptr<Clock>& clock, shared_ptr<MemTracker> mem_tracker,
+    const scoped_refptr<ResultTracker> result_tracker,
     MetricRegistry* metric_registry, TabletStatusListener* listener,
     const scoped_refptr<LogAnchorRegistry>& log_anchor_registry)
     : meta_(meta),
       clock_(clock),
       mem_tracker_(std::move(mem_tracker)),
+      result_tracker_(result_tracker),
       metric_registry_(metric_registry),
       listener_(listener),
       log_anchor_registry_(log_anchor_registry) {}
@@ -1182,13 +1192,19 @@ Status TabletBootstrap::AppendCommitMsg(const CommitMsg& commit_msg) {
   return log_->Append(&commit_entry);
 }
 
-Status TabletBootstrap::DetermineFlushedOps(const TxResultPB& result,
-                                            vector<bool>* flushed_by_op) {
+Status TabletBootstrap::DetermineFlushedOpsAndBuildResponse(const TxResultPB& result,
+                                                            vector<bool>* flushed_by_op,
+                                                            WriteResponsePB* response) {
   int num_ops = result.ops_size();
   flushed_by_op->resize(num_ops);
 
   for (int i = 0; i < num_ops; i++) {
     const auto& orig_op_result = result.ops(i);
+    if (orig_op_result.has_failed_status() && response) {
+      WriteResponsePB::PerRowErrorPB* error = response->add_per_row_errors();
+      error->set_row_index(i);
+      error->mutable_error()->CopyFrom(orig_op_result.failed_status());
+    }
     bool f;
     RETURN_NOT_OK(FilterOperation(orig_op_result, &f));
     (*flushed_by_op)[i] = f;
@@ -1217,12 +1233,40 @@ Status TabletBootstrap::PlayWriteRequest(ReplicateMsg* replicate_msg,
   tablet_->StartTransaction(&tx_state);
   tablet_->StartApplying(&tx_state);
 
+  unique_ptr<WriteResponsePB> response;
+
+  bool tracking_results = result_tracker_.get() != nullptr && replicate_msg->has_request_id();
+
+  // If the results are being tracked and this write has a request id, register
+  // it with the result tracker.
+  ResultTracker::RpcState state;
+  if (tracking_results) {
+    VLOG(1) << result_tracker_.get() << " Boostrapping request for tablet: "
+        << write->tablet_id() << ". State: " << 0 << " id: "
+        << replicate_msg->request_id().DebugString();
+    // We only replay committed requests so the result of tracking this request can be:
+    // NEW - This is a previously untracked request, or we changed the driver -> store the result
+    // COMPLETED - We've bootstrapped this tablet twice, and previously stored the result -> do
+    //             nothing.
+    state = result_tracker_->TrackRpcOrChangeDriver(replicate_msg->request_id());
+    CHECK(state == ResultTracker::RpcState::NEW || state == ResultTracker::RpcState::COMPLETED)
+        << "Wrong state: " << state;
+    response.reset(new WriteResponsePB());
+    response->set_timestamp(replicate_msg->timestamp());
+  }
+
   // Determine which of the operations are already flushed to persistent
   // storage and don't need to be re-applied. We can do this even before
   // we decode any row operations, so we can short-circuit that decoding
   // in the case that the entire op has been already flushed.
   vector<bool> already_flushed;
-  RETURN_NOT_OK(DetermineFlushedOps(commit_msg.result(), &already_flushed));
+  RETURN_NOT_OK(DetermineFlushedOpsAndBuildResponse(commit_msg.result(),
+                                                    &already_flushed,
+                                                    response.get()));
+
+  if (tracking_results && state == ResultTracker::NEW) {
+    result_tracker_->RecordCompletionAndRespond(replicate_msg->request_id(), response.get());
+  }
 
   bool all_already_flushed = std::all_of(already_flushed.begin(),
                                          already_flushed.end(),

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/tablet/tablet_bootstrap.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_bootstrap.h b/src/kudu/tablet/tablet_bootstrap.h
index 1566515..a7cdb6e 100644
--- a/src/kudu/tablet/tablet_bootstrap.h
+++ b/src/kudu/tablet/tablet_bootstrap.h
@@ -43,6 +43,10 @@ namespace consensus {
 struct ConsensusBootstrapInfo;
 } // namespace consensus
 
+namespace rpc {
+class ResultTracker;
+} // namespace rpc
+
 namespace server {
 class Clock;
 }
@@ -94,6 +98,7 @@ extern const char* kLogRecoveryDir;
 Status BootstrapTablet(const scoped_refptr<TabletMetadata>& meta,
                        const scoped_refptr<server::Clock>& clock,
                        const std::shared_ptr<MemTracker>& mem_tracker,
+                       const scoped_refptr<rpc::ResultTracker>& result_tracker,
                        MetricRegistry* metric_registry,
                        TabletStatusListener* status_listener,
                        std::shared_ptr<Tablet>* rebuilt_tablet,

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/tablet/tablet_peer-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_peer-test.cc b/src/kudu/tablet/tablet_peer-test.cc
index b1c03ef..8a04d1f 100644
--- a/src/kudu/tablet/tablet_peer-test.cc
+++ b/src/kudu/tablet/tablet_peer-test.cc
@@ -138,6 +138,7 @@ class TabletPeerTest : public KuduTabletTest {
     ASSERT_OK(tablet_peer_->Init(tablet(),
                                  clock(),
                                  messenger_,
+                                 scoped_refptr<rpc::ResultTracker>(),
                                  log,
                                  metric_entity_));
   }
@@ -193,6 +194,7 @@ class TabletPeerTest : public KuduTabletTest {
     gscoped_ptr<WriteResponsePB> resp(new WriteResponsePB());
     unique_ptr<WriteTransactionState> tx_state(new WriteTransactionState(tablet_peer,
                                                                          &req,
+                                                                         nullptr, // No RequestIdPB
                                                                          resp.get()));
 
     CountDownLatch rpc_latch(1);
@@ -464,6 +466,7 @@ TEST_F(TabletPeerTest, TestActiveTransactionPreventsLogGC) {
     ASSERT_OK(GenerateSequentialDeleteRequest(req.get()));
     unique_ptr<WriteTransactionState> tx_state(new WriteTransactionState(tablet_peer_.get(),
                                                                          req.get(),
+                                                                         nullptr, // No RequestIdPB
                                                                          resp.get()));
 
     tx_state->set_completion_callback(gscoped_ptr<TransactionCompletionCallback>(

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/tablet/tablet_peer.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_peer.cc b/src/kudu/tablet/tablet_peer.cc
index 71bb522..44d3b30 100644
--- a/src/kudu/tablet/tablet_peer.cc
+++ b/src/kudu/tablet/tablet_peer.cc
@@ -36,6 +36,10 @@
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/sysinfo.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/remote_method.h"
+#include "kudu/rpc/rpc_service.h"
+#include "kudu/rpc/service_pool.h"
 #include "kudu/tablet/transactions/transaction_driver.h"
 #include "kudu/tablet/transactions/alter_schema_transaction.h"
 #include "kudu/tablet/transactions/write_transaction.h"
@@ -91,6 +95,7 @@ using consensus::WRITE_OP;
 using log::Log;
 using log::LogAnchorRegistry;
 using rpc::Messenger;
+using rpc::ResultTracker;
 using strings::Substitute;
 using tserver::TabletServerErrorPB;
 
@@ -122,6 +127,7 @@ TabletPeer::~TabletPeer() {
 Status TabletPeer::Init(const shared_ptr<Tablet>& tablet,
                         const scoped_refptr<server::Clock>& clock,
                         const shared_ptr<Messenger>& messenger,
+                        const scoped_refptr<ResultTracker>& result_tracker,
                         const scoped_refptr<Log>& log,
                         const scoped_refptr<MetricEntity>& metric_entity) {
 
@@ -143,6 +149,7 @@ Status TabletPeer::Init(const shared_ptr<Tablet>& tablet,
     clock_ = clock;
     messenger_ = messenger;
     log_ = log;
+    result_tracker_ = result_tracker;
 
     ConsensusOptions options;
     options.tablet_id = meta_->tablet_id();
@@ -319,10 +326,12 @@ Status TabletPeer::WaitUntilConsensusRunning(const MonoDelta& timeout) {
 Status TabletPeer::SubmitWrite(unique_ptr<WriteTransactionState> state) {
   RETURN_NOT_OK(CheckRunning());
 
+  state->SetResultTracker(result_tracker_);
   gscoped_ptr<WriteTransaction> transaction(new WriteTransaction(std::move(state),
                                                                  consensus::LEADER));
   scoped_refptr<TransactionDriver> driver;
-  RETURN_NOT_OK(NewLeaderTransactionDriver(transaction.PassAs<Transaction>(), &driver));
+  RETURN_NOT_OK(NewLeaderTransactionDriver(transaction.PassAs<Transaction>(),
+                                           &driver));
   return driver->ExecuteAsync();
 }
 
@@ -483,7 +492,11 @@ Status TabletPeer::StartReplicaTransaction(const scoped_refptr<ConsensusRound>&
       DCHECK(replicate_msg->has_write_request()) << "WRITE_OP replica"
           " transaction must receive a WriteRequestPB";
       unique_ptr<WriteTransactionState> tx_state(
-          new WriteTransactionState(this, &replicate_msg->write_request()));
+          new WriteTransactionState(
+              this,
+              &replicate_msg->write_request(),
+              replicate_msg->has_request_id() ? &replicate_msg->request_id() : nullptr));
+      tx_state->SetResultTracker(result_tracker_);
 
       transaction.reset(new WriteTransaction(std::move(tx_state), consensus::REPLICA));
       break;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/tablet/tablet_peer.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_peer.h b/src/kudu/tablet/tablet_peer.h
index e926f79..2ced0cb 100644
--- a/src/kudu/tablet/tablet_peer.h
+++ b/src/kudu/tablet/tablet_peer.h
@@ -42,6 +42,7 @@ class LogAnchorRegistry;
 
 namespace rpc {
 class Messenger;
+class ResultTracker;
 }
 
 namespace tserver {
@@ -78,6 +79,7 @@ class TabletPeer : public RefCountedThreadSafe<TabletPeer>,
   Status Init(const std::shared_ptr<tablet::Tablet>& tablet,
               const scoped_refptr<server::Clock>& clock,
               const std::shared_ptr<rpc::Messenger>& messenger,
+              const scoped_refptr<rpc::ResultTracker>& result_tracker,
               const scoped_refptr<log::Log>& log,
               const scoped_refptr<MetricEntity>& metric_entity);
 
@@ -330,6 +332,9 @@ class TabletPeer : public RefCountedThreadSafe<TabletPeer>,
   // can provide.
   std::vector<MaintenanceOp*> maintenance_ops_;
 
+  // The result tracker for writes.
+  scoped_refptr<rpc::ResultTracker> result_tracker_;
+
   DISALLOW_COPY_AND_ASSIGN(TabletPeer);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/tablet/transactions/transaction.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction.h b/src/kudu/tablet/transactions/transaction.h
index 0a133f5..29fcbee 100644
--- a/src/kudu/tablet/transactions/transaction.h
+++ b/src/kudu/tablet/transactions/transaction.h
@@ -20,6 +20,7 @@
 
 #include <string>
 #include <mutex>
+#include <kudu/rpc/result_tracker.h>
 
 #include "kudu/common/timestamp.h"
 #include "kudu/common/wire_protocol.h"
@@ -31,6 +32,10 @@
 
 namespace kudu {
 
+namespace rpc {
+class ResultTracker;
+} // namespace rpc
+
 namespace tablet {
 class TabletPeer;
 class TransactionCompletionCallback;
@@ -137,6 +142,17 @@ class TransactionState {
   // This will only return a non-null object for leader-side transactions.
   virtual google::protobuf::Message* response() const { return NULL; }
 
+  // Returns whether the results of the transaction are being tracked.
+  bool are_results_tracked() const {
+    return result_tracker_.get() != nullptr && has_request_id();
+  }
+
+  rpc::ResultTracker* result_tracker() const { return result_tracker_.get(); }
+
+  void SetResultTracker(const scoped_refptr<rpc::ResultTracker> result_tracker) {
+    result_tracker_ = result_tracker;
+  }
+
   // Sets the ConsensusRound for this transaction, if this transaction is
   // being executed through the consensus system.
   void set_consensus_round(const scoped_refptr<consensus::ConsensusRound>& consensus_round) {
@@ -224,6 +240,19 @@ class TransactionState {
     return external_consistency_mode_;
   }
 
+  // Returns where the transaction associated with this TransactionState had an
+  // associated transaction id.
+  bool has_request_id() const {
+    return request_id_.has_client_id();
+  }
+
+  // Returns the request id for the transaction associated with this TransactionState.
+  // Not all transactions will have a request id so users of this method should call
+  // 'has_request_id()' first to make sure it is set.
+  const rpc::RequestIdPB& request_id() const {
+    return request_id_;
+  }
+
  protected:
   explicit TransactionState(TabletPeer* tablet_peer);
   virtual ~TransactionState();
@@ -233,6 +262,9 @@ class TransactionState {
   // The tablet peer that is coordinating this transaction.
   TabletPeer* const tablet_peer_;
 
+  // The result tracker that will cache the result of this transaction.
+  scoped_refptr<rpc::ResultTracker> result_tracker_;
+
   // Optional callback to be called once the transaction completes.
   gscoped_ptr<TransactionCompletionCallback> completion_clbk_;
 
@@ -249,6 +281,9 @@ class TransactionState {
   // This OpId stores the canonical "anchor" OpId for this transaction.
   consensus::OpId op_id_;
 
+  // The client's id for this transaction, if there is one.
+  rpc::RequestIdPB request_id_;
+
   scoped_refptr<consensus::ConsensusRound> consensus_round_;
 
   // The defined consistency mode for this transaction.

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/tablet/transactions/transaction_driver.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction_driver.cc b/src/kudu/tablet/transactions/transaction_driver.cc
index fd68ca6..1b90564 100644
--- a/src/kudu/tablet/transactions/transaction_driver.cc
+++ b/src/kudu/tablet/transactions/transaction_driver.cc
@@ -21,6 +21,7 @@
 
 #include "kudu/consensus/consensus.h"
 #include "kudu/gutil/strings/strcat.h"
+#include "kudu/rpc/result_tracker.h"
 #include "kudu/tablet/tablet_peer.h"
 #include "kudu/tablet/transactions/transaction_tracker.h"
 #include "kudu/util/debug-util.h"
@@ -39,10 +40,42 @@ using consensus::ReplicateMsg;
 using consensus::CommitMsg;
 using consensus::DriverType;
 using log::Log;
+using rpc::RequestIdPB;
+using rpc::ResultTracker;
 using std::shared_ptr;
 
 static const char* kTimestampFieldName = "timestamp";
 
+class FollowerTransactionCompletionCallback : public TransactionCompletionCallback {
+ public:
+  FollowerTransactionCompletionCallback(const RequestIdPB& request_id,
+                                        const google::protobuf::Message* response,
+                                        const scoped_refptr<ResultTracker>& result_tracker)
+      : request_id_(request_id),
+        response_(response),
+        result_tracker_(result_tracker) {}
+
+  virtual void TransactionCompleted() {
+    if (status_.ok()) {
+      result_tracker_->RecordCompletionAndRespond(request_id_, response_);
+    } else {
+      // For now we always respond with TOO_BUSY, meaning the client will retry (even if
+      // this is an unretryable failure), that works as the client-driven version of this
+      // transaction will get the right error.
+      result_tracker_->FailAndRespond(request_id_,
+                                      rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY,
+                                      status_);
+    }
+  }
+
+  virtual ~FollowerTransactionCompletionCallback() {}
+
+ private:
+  const RequestIdPB& request_id_;
+  const google::protobuf::Message* response_;
+  scoped_refptr<ResultTracker> result_tracker_;
+};
+
 
 ////////////////////////////////////////////////////////////
 // TransactionDriver
@@ -79,6 +112,17 @@ Status TransactionDriver::Init(gscoped_ptr<Transaction> transaction,
     DCHECK(op_id_copy_.IsInitialized());
     replication_state_ = REPLICATING;
     replication_start_time_ = MonoTime::Now(MonoTime::FINE);
+    if (state()->are_results_tracked()) {
+      // If this is a follower transaction, make sure to set the transaction completion callback
+      // before the transaction has a chance to fail.
+      const rpc::RequestIdPB& request_id = state()->request_id();
+      const google::protobuf::Message* response = state()->response();
+      gscoped_ptr<TransactionCompletionCallback> callback(
+          new FollowerTransactionCompletionCallback(request_id,
+                                                    response,
+                                                    state()->result_tracker()));
+      mutable_state()->set_completion_callback(callback.Pass());
+    }
   } else {
     DCHECK_EQ(type, consensus::LEADER);
     gscoped_ptr<ReplicateMsg> replicate_msg;
@@ -162,16 +206,42 @@ void TransactionDriver::PrepareAndStartTask() {
   }
 }
 
+void TransactionDriver::RegisterFollowerTransactionOnResultTracker() {
+  // If this is a transaction being executed by a follower and its result is being
+  // tracked, make sure that we're the driver of the transaction.
+  if (!state()->are_results_tracked()) return;
+
+  ResultTracker::RpcState rpc_state = state()->result_tracker()->TrackRpcOrChangeDriver(
+      state()->request_id());
+  switch (rpc_state) {
+    case ResultTracker::RpcState::NEW:
+      // We're the only ones trying to execute the transaction (normal case). Proceed.
+      return;
+      // If this RPC was previously completed (like if the same tablet was bootstrapped twice)
+      // stop tracking the result. Only follower transactions can observe this state so we
+      // simply reset the callback and the result will not be tracked anymore.
+    case ResultTracker::RpcState::COMPLETED: {
+      mutable_state()->set_completion_callback(
+          gscoped_ptr<TransactionCompletionCallback>(new TransactionCompletionCallback()));
+      VLOG(1) << state()->result_tracker() << " Follower Rpc was not NEW or IN_PROGRESS: "
+          << rpc_state << " OpId: " << state()->op_id().ShortDebugString()
+          << " RequestId: " << state()->request_id().ShortDebugString();
+      return;
+    }
+    default:
+      LOG(FATAL) << "Unexpected state: " << rpc_state;
+  }
+}
+
 Status TransactionDriver::PrepareAndStart() {
   TRACE_EVENT1("txn", "PrepareAndStart", "txn", this);
   VLOG_WITH_PREFIX(4) << "PrepareAndStart()";
   // Actually prepare and start the transaction.
   prepare_physical_timestamp_ = GetMonoTimeMicros();
-  RETURN_NOT_OK(transaction_->Prepare());
 
+  RETURN_NOT_OK(transaction_->Prepare());
   RETURN_NOT_OK(transaction_->Start());
 
-
   // Only take the lock long enough to take a local copy of the
   // replication state and set our prepare state. This ensures that
   // exactly one of Replicate/Prepare callbacks will trigger the apply
@@ -182,12 +252,30 @@ Status TransactionDriver::PrepareAndStart() {
     CHECK_EQ(prepare_state_, NOT_PREPARED);
     prepare_state_ = PREPARED;
     repl_state_copy = replication_state_;
+
+    // If this is a follower transaction we need to register the transaction on the tracker here,
+    // atomically with the change of the prepared state. Otherwise if the prepare thread gets
+    // preempted after the state is prepared apply can be triggered by another thread without the
+    // rpc being registered.
+    if (transaction_->type() == consensus::REPLICA) {
+      RegisterFollowerTransactionOnResultTracker();
+    // ... else we're a client-started transaction. Make sure we're still the driver of the
+    // RPC and give up if we aren't.
+    } else {
+      if (state()->are_results_tracked()
+          && !state()->result_tracker()->IsCurrentDriver(state()->request_id())) {
+        transaction_status_ = Status::AlreadyPresent(strings::Substitute(
+            "There's already an attempt of the same operation on the server for request id: $0",
+            state()->request_id().ShortDebugString()));
+        replication_state_ = REPLICATION_FAILED;
+        return transaction_status_;
+      }
+    }
   }
 
   switch (repl_state_copy) {
     case NOT_REPLICATING:
     {
-
       // Set the timestamp in the message, now that it's prepared.
       transaction_->state()->consensus_round()->replicate_msg()->set_timestamp(
           transaction_->state()->timestamp().ToUint64());
@@ -200,8 +288,8 @@ Status TransactionDriver::PrepareAndStart() {
         replication_state_ = REPLICATING;
         replication_start_time_ = MonoTime::Now(MonoTime::FINE);
       }
-      Status s = consensus_->Replicate(mutable_state()->consensus_round());
 
+      Status s = consensus_->Replicate(mutable_state()->consensus_round());
       if (PREDICT_FALSE(!s.ok())) {
         std::lock_guard<simple_spinlock> lock(lock_);
         CHECK_EQ(replication_state_, REPLICATING);
@@ -244,7 +332,6 @@ void TransactionDriver::HandleFailure(const Status& s) {
     repl_state_copy = replication_state_;
   }
 
-
   switch (repl_state_copy) {
     case NOT_REPLICATING:
     case REPLICATION_FAILED:
@@ -297,6 +384,7 @@ void TransactionDriver::ReplicationFinished(const Status& status) {
     replication_duration = replication_finished_time.GetDeltaSince(replication_start_time_);
   }
 
+
   TRACE_COUNTER_INCREMENT("replication_time_us", replication_duration.ToMicroseconds());
 
   // If we have prepared and replicated, we're ready

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/tablet/transactions/transaction_driver.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction_driver.h b/src/kudu/tablet/transactions/transaction_driver.h
index 874fd55..0501299 100644
--- a/src/kudu/tablet/transactions/transaction_driver.h
+++ b/src/kudu/tablet/transactions/transaction_driver.h
@@ -19,6 +19,7 @@
 #define KUDU_TABLET_TRANSACTION_DRIVER_H_
 
 #include <string>
+#include <kudu/rpc/result_tracker.h>
 
 #include "kudu/consensus/consensus.h"
 #include "kudu/gutil/ref_counted.h"
@@ -34,6 +35,10 @@ namespace log {
 class Log;
 } // namespace log
 
+namespace rpc {
+class ResultTracker;
+}
+
 namespace tablet {
 class TransactionOrderVerifier;
 class TransactionTracker;
@@ -89,6 +94,89 @@ class TransactionTracker;
 //
 // [1] - see 'Implementation Techniques for Main Memory Database Systems', DeWitt et. al.
 //
+// ===========================================================================================
+//
+// Tracking transaction results for exactly once semantics
+//
+// Exactly once semantics for transactions require that the results of previous executions
+// of a transaction be cached and replayed to the client, when a duplicate request is received.
+// For single server operations, this can be encapsulated on the rpc layer, but for replicated
+// ones, like transactions, it needs additional care, as multiple copies of an RPC can arrive
+// from different sources. For instance a client might be retrying an operation on a different
+// tablet server, while that tablet server actually received the same operation from a previous
+// leader.
+//
+// The prepare phase of transactions is single threaded, so it's an ideal place to register
+// follower transactions with the result tracker and deduplicate possible multiple attempts.
+// However rpc's from clients are first registered as they first arrive, outside of the prepare
+// phase, and requests from another replica might arrive just as we're becoming leader, so we
+// need to account for all the possible interleavings.
+//
+// Relevant constraints:
+//
+// 1 - Before a replica becomes leader (i.e. accepts new client requests), it has already enqueued
+//     all current requests on the prepare queue.
+//
+// 2 - If a replica is not leader it rejects client requests on the prepare phase.
+//
+// 3 - Replicated transaction, i.e. ones received as a follower of another (leader) replica, are
+//     registered with the result tracker on the prepare phase itself. This constrains the possible
+//     interleavings because when a client-originated request prepares, it will either observe a
+//     previous request and abort, or it won't observe it and know it is the first attempt at
+//     executing that transaction.
+//
+// Given these constraints the following interleavings might happen, on a newly elected leader:
+//
+// CR - Client Request
+// RR - Replica Request
+//
+//
+//             a) ---------                                  b) ---------
+//                    |                                             |
+//       RR prepares->1                                 CR arrives->1
+//                    |                                             |
+//                    2<-CR arrives                                 2<-RR prepares
+//                    |                                             |
+//                    3<-CR attaches                                3<-CR prepares/aborts
+//                    |                                             |
+//      RR completes->4                                 CR retries->4
+//                    |                                             |
+// CR replies cached->5                 CR attaches/replies cached->5
+//                    |                                             |
+//                ---------                                     ---------
+//
+// Case a) is the simplest. The client retries a request (2) after the duplicate request from the
+// previous leader has prepared (1). When trying to register the result in the ResultTracker (2)
+// the client's request will be "attached" (3) to the execution of the previously running request.
+// When it completes (4) the client will receive the cached response (5).
+//
+// Case b) is a slightly more complex. When the client request arrives (1) the request from the
+// previous leader hasn't yet prepared, meaning that the client request will be marked as a NEW
+// request and its handler will proceed to try and prepare. In the meanwhile the replica request
+// prepares (2). Since the replica request must take precedence over the client's request, the
+// replica request must be the one to actually execute the operations, i.e. it must force the client
+// request's handler to abort (3).
+// It does this in two ways:
+//
+// - It immediately calls ResultTracker::TrackRpcOrChangeDriver(), this will make sure that it is
+//   registered as the handler driver and that it is the only one whose response will be stored.
+//
+// - Later on, when the handler for the client request finally tries to prepare (3), it will observe
+//   that it is no longer the driver of the transaction (his attempt_no has been replaced by
+//   the replica request's attempt_no) and will later call ResultTracker::FailAndRespond()
+//   causing the client to retry later (4,5).
+//
+// After the client receives the error sent on (2) it retries the operation (4) which will then
+// attach to the replica request's execution and receive the same response when it is done (5).
+//
+// Additional notes:
+//
+// The above diagrams don't distinguish between arrive/prepare for replica requests as registering
+// with the result tracker and preparing in that case happen atomically.
+//
+// If the client request was to proceed to prepare _before_ the replica request prepared, it would
+// still be ok, as it would get aborted because the replica wasn't a leader yet (constraints 1/2).
+//
 // This class is thread safe.
 class TransactionDriver : public RefCountedThreadSafe<TransactionDriver> {
 
@@ -212,6 +300,11 @@ class TransactionDriver : public RefCountedThreadSafe<TransactionDriver> {
   void SetResponseTimestamp(TransactionState* transaction_state,
                             const Timestamp& timestamp);
 
+  // If this driver is executing a follower transaction then it is possible
+  // it never went through the rpc system so we have to register it with the
+  // ResultTracker.
+  void RegisterFollowerTransactionOnResultTracker();
+
   TransactionTracker* const txn_tracker_;
   consensus::Consensus* const consensus_;
   log::Log* const log_;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/tablet/transactions/write_transaction.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/write_transaction.cc b/src/kudu/tablet/transactions/write_transaction.cc
index df4bb33..6b4c0c6 100644
--- a/src/kudu/tablet/transactions/write_transaction.cc
+++ b/src/kudu/tablet/transactions/write_transaction.cc
@@ -65,6 +65,9 @@ void WriteTransaction::NewReplicateMsg(gscoped_ptr<ReplicateMsg>* replicate_msg)
   replicate_msg->reset(new ReplicateMsg);
   (*replicate_msg)->set_op_type(WRITE_OP);
   (*replicate_msg)->mutable_write_request()->CopyFrom(*state()->request());
+  if (state()->are_results_tracked()) {
+    (*replicate_msg)->mutable_request_id()->CopyFrom(state()->request_id());
+  }
 }
 
 Status WriteTransaction::Prepare() {
@@ -126,7 +129,7 @@ Status WriteTransaction::Apply(gscoped_ptr<CommitMsg>* commit_msg) {
   // Add per-row errors to the result, update metrics.
   int i = 0;
   for (const RowOp* op : state()->row_ops()) {
-    if (state()->response() != nullptr && op->result->has_failed_status()) {
+    if (op->result->has_failed_status()) {
       // Replicas disregard the per row errors, for now
       // TODO check the per-row errors against the leader's, at least in debug mode
       WriteResponsePB::PerRowErrorPB* error = state()->response()->add_per_row_errors();
@@ -201,6 +204,7 @@ string WriteTransaction::ToString() const {
 
 WriteTransactionState::WriteTransactionState(TabletPeer* tablet_peer,
                                              const tserver::WriteRequestPB *request,
+                                             const rpc::RequestIdPB* request_id,
                                              tserver::WriteResponsePB *response)
   : TransactionState(tablet_peer),
     request_(DCHECK_NOTNULL(request)),
@@ -208,6 +212,12 @@ WriteTransactionState::WriteTransactionState(TabletPeer* tablet_peer,
     mvcc_tx_(nullptr),
     schema_at_decode_time_(nullptr) {
   external_consistency_mode_ = request_->external_consistency_mode();
+  if (!response_) {
+    response_ = &owned_response_;
+  }
+  if (request_id) {
+    request_id_ = *request_id;
+  }
 }
 
 void WriteTransactionState::SetMvccTxAndTimestamp(gscoped_ptr<ScopedTransaction> mvcc_tx) {

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/tablet/transactions/write_transaction.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/write_transaction.h b/src/kudu/tablet/transactions/write_transaction.h
index 134c04b..05f84ff 100644
--- a/src/kudu/tablet/transactions/write_transaction.h
+++ b/src/kudu/tablet/transactions/write_transaction.h
@@ -74,6 +74,7 @@ class WriteTransactionState : public TransactionState {
  public:
   WriteTransactionState(TabletPeer* tablet_peer,
                         const tserver::WriteRequestPB *request,
+                        const rpc::RequestIdPB* request_id,
                         tserver::WriteResponsePB *response = NULL);
   virtual ~WriteTransactionState();
 
@@ -185,9 +186,13 @@ class WriteTransactionState : public TransactionState {
   // from the request).
   void ResetRpcFields();
 
-  // pointers to the rpc context, request and response, lifecyle
-  // is managed by the rpc subsystem. These pointers maybe NULL if the
-  // transaction was not initiated by an RPC call.
+  // An owned version of the response, for follower transactions.
+  tserver::WriteResponsePB owned_response_;
+
+  // The lifecycle of these pointers request and response, is not managed by this class.
+  // These pointers are never null: 'request_' is always set on construction and 'response_' is
+  // either set to the response passed on the ctor, if there is one, or to point to
+  // 'owned_response_' if there isn't.
   const tserver::WriteRequestPB* request_;
   tserver::WriteResponsePB* response_;
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/tools/insert-generated-rows.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/insert-generated-rows.cc b/src/kudu/tools/insert-generated-rows.cc
index 8e59857..f0b0b1b 100644
--- a/src/kudu/tools/insert-generated-rows.cc
+++ b/src/kudu/tools/insert-generated-rows.cc
@@ -107,11 +107,7 @@ static int WriteRandomDataToTable(int argc, char** argv) {
       session->GetPendingErrors(&errors, &overflow);
       CHECK(!overflow);
       for (const client::KuduError* e : errors) {
-        if (e->status().IsAlreadyPresent()) {
-          LOG(WARNING) << "Ignoring insert error: " << e->status().ToString();
-        } else {
-          LOG(FATAL) << "Unexpected insert error: " << e->status().ToString();
-        }
+        LOG(FATAL) << "Unexpected insert error: " << e->status().ToString();
       }
       continue;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/tserver/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/CMakeLists.txt b/src/kudu/tserver/CMakeLists.txt
index 6b104b2..8568fea 100644
--- a/src/kudu/tserver/CMakeLists.txt
+++ b/src/kudu/tserver/CMakeLists.txt
@@ -47,6 +47,7 @@ PROTOBUF_GENERATE_CPP(
   PROTO_FILES tserver.proto)
 set(TSERVER_PROTO_LIBS
   kudu_common_proto
+  krpc
   consensus_metadata_proto
   tablet_proto
   wire_protocol_proto)

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/tserver/remote_bootstrap_session-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/remote_bootstrap_session-test.cc b/src/kudu/tserver/remote_bootstrap_session-test.cc
index bd34df8..d354699 100644
--- a/src/kudu/tserver/remote_bootstrap_session-test.cc
+++ b/src/kudu/tserver/remote_bootstrap_session-test.cc
@@ -133,6 +133,7 @@ class RemoteBootstrapTest : public KuduTabletTest {
     ASSERT_OK(tablet_peer_->Init(tablet(),
                                  clock(),
                                  messenger,
+                                 scoped_refptr<rpc::ResultTracker>(),
                                  log,
                                  metric_entity));
     consensus::ConsensusBootstrapInfo boot_info;
@@ -163,7 +164,10 @@ class RemoteBootstrapTest : public KuduTabletTest {
       CountDownLatch latch(1);
 
       unique_ptr<tablet::WriteTransactionState> state(
-          new tablet::WriteTransactionState(tablet_peer_.get(), &req, &resp));
+          new tablet::WriteTransactionState(tablet_peer_.get(),
+                                            &req,
+                                            nullptr, // No RequestIdPB
+                                            &resp));
       state->set_completion_callback(gscoped_ptr<tablet::TransactionCompletionCallback>(
           new tablet::LatchTransactionCompletionCallback<WriteResponsePB>(&latch, &resp)));
       ASSERT_OK(tablet_peer_->SubmitWrite(std::move(state)));

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/tserver/tablet_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index f1f9738..85e24a2 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -259,10 +259,7 @@ static void SetupErrorAndRespond(TabletServerErrorPB* error,
 
   StatusToPB(s, error->mutable_status());
   error->set_code(code);
-  // TODO: rename RespondSuccess() to just "Respond" or
-  // "SendResponse" since we use it for application-level error
-  // responses, and this just looks confusing!
-  context->RespondSuccess();
+  context->RespondNoCache();
 }
 
 template <class ReqType, class RespType>
@@ -732,9 +729,11 @@ void TabletServiceImpl::Write(const WriteRequestPB* req,
     return;
   }
 
-  unique_ptr<WriteTransactionState> tx_state(new WriteTransactionState(tablet_peer.get(),
-                                                                       req,
-                                                                       resp));
+  unique_ptr<WriteTransactionState> tx_state(new WriteTransactionState(
+      tablet_peer.get(),
+      req,
+      context->AreResultsTracked() ? context->request_id() : nullptr,
+      resp));
 
   // If the client sent us a timestamp, decode it and update the clock so that all future
   // timestamps are greater than the passed timestamp.

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/tserver/ts_tablet_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index a528871..319011a 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -36,6 +36,8 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/strings/util.h"
 #include "kudu/master/master.pb.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/result_tracker.h"
 #include "kudu/tablet/metadata.pb.h"
 #include "kudu/tablet/tablet.h"
 #include "kudu/tablet/tablet.pb.h"
@@ -45,6 +47,7 @@
 #include "kudu/tserver/heartbeater.h"
 #include "kudu/tserver/remote_bootstrap_client.h"
 #include "kudu/tserver/tablet_server.h"
+#include "kudu/tserver/tablet_service.h"
 #include "kudu/util/debug/trace_event.h"
 #include "kudu/util/env.h"
 #include "kudu/util/env_util.h"
@@ -126,6 +129,7 @@ using consensus::StartRemoteBootstrapRequestPB;
 using log::Log;
 using master::ReportedTabletPB;
 using master::TabletReportPB;
+using rpc::ResultTracker;
 using std::shared_ptr;
 using std::string;
 using std::vector;
@@ -640,6 +644,7 @@ void TSTabletManager::OpenTablet(const scoped_refptr<TabletMetadata>& meta,
     s = BootstrapTablet(meta,
                         scoped_refptr<server::Clock>(server_->clock()),
                         server_->mem_tracker(),
+                        server_->result_tracker(),
                         metric_registry_,
                         tablet_peer->status_listener(),
                         &tablet,
@@ -660,6 +665,7 @@ void TSTabletManager::OpenTablet(const scoped_refptr<TabletMetadata>& meta,
     s =  tablet_peer->Init(tablet,
                            scoped_refptr<server::Clock>(server_->clock()),
                            server_->messenger(),
+                           server_->result_tracker(),
                            log,
                            tablet->GetMetricEntity());
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/tserver/ts_tablet_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/ts_tablet_manager.h b/src/kudu/tserver/ts_tablet_manager.h
index 820016f..a0d362b 100644
--- a/src/kudu/tserver/ts_tablet_manager.h
+++ b/src/kudu/tserver/ts_tablet_manager.h
@@ -52,6 +52,10 @@ class ReportedTabletPB;
 class TabletReportPB;
 } // namespace master
 
+namespace rpc {
+class ResultTracker;
+} // namespace rpc
+
 namespace tablet {
 class TabletMetadata;
 class TabletPeer;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6d2679bd/src/kudu/tserver/tserver_service.proto
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tserver_service.proto b/src/kudu/tserver/tserver_service.proto
index b6010fd..eba8fa9 100644
--- a/src/kudu/tserver/tserver_service.proto
+++ b/src/kudu/tserver/tserver_service.proto
@@ -18,12 +18,15 @@ package kudu.tserver;
 
 option java_package = "org.kududb.tserver";
 
+import "kudu/rpc/rpc_header.proto";
 import "kudu/tserver/tserver.proto";
 
 service TabletServerService {
 
   rpc Ping(PingRequestPB) returns (PingResponsePB);
-  rpc Write(WriteRequestPB) returns (WriteResponsePB);
+  rpc Write(WriteRequestPB) returns (WriteResponsePB)  {
+    option (kudu.rpc.track_rpc_result) = true;
+  }
   rpc Scan(ScanRequestPB) returns (ScanResponsePB);
   rpc ScannerKeepAlive(ScannerKeepAliveRequestPB) returns (ScannerKeepAliveResponsePB);
   rpc ListTablets(ListTabletsRequestPB) returns (ListTabletsResponsePB);