You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2017/05/04 22:14:28 UTC

[3/5] kudu git commit: Rename TabletPeer to TabletReplica

http://git-wip-us.apache.org/repos/asf/kudu/blob/7f72105b/src/kudu/tablet/tablet_replica-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica-test.cc b/src/kudu/tablet/tablet_replica-test.cc
new file mode 100644
index 0000000..c48da6a
--- /dev/null
+++ b/src/kudu/tablet/tablet_replica-test.cc
@@ -0,0 +1,565 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/common/partial_row.h"
+#include "kudu/common/timestamp.h"
+#include "kudu/common/wire_protocol.h"
+#include "kudu/common/wire_protocol-test-util.h"
+#include "kudu/consensus/consensus_meta.h"
+#include "kudu/consensus/log.h"
+#include "kudu/consensus/log_reader.h"
+#include "kudu/consensus/log_util.h"
+#include "kudu/consensus/opid_util.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/server/clock.h"
+#include "kudu/server/logical_clock.h"
+#include "kudu/tablet/transactions/transaction.h"
+#include "kudu/tablet/transactions/transaction_driver.h"
+#include "kudu/tablet/transactions/write_transaction.h"
+#include "kudu/tablet/tablet_replica.h"
+#include "kudu/tablet/tablet_replica_mm_ops.h"
+#include "kudu/tablet/tablet-test-util.h"
+#include "kudu/tserver/tserver.pb.h"
+#include "kudu/util/maintenance_manager.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/threadpool.h"
+
+METRIC_DECLARE_entity(tablet);
+
+DECLARE_int32(flush_threshold_mb);
+
+namespace kudu {
+namespace tablet {
+
+using consensus::CommitMsg;
+using consensus::Consensus;
+using consensus::ConsensusBootstrapInfo;
+using consensus::ConsensusMetadata;
+using consensus::MakeOpId;
+using consensus::MinimumOpId;
+using consensus::OpId;
+using consensus::OpIdEquals;
+using consensus::RaftPeerPB;
+using consensus::WRITE_OP;
+using log::Log;
+using log::LogAnchorRegistry;
+using log::LogOptions;
+using rpc::Messenger;
+using server::Clock;
+using server::LogicalClock;
+using std::shared_ptr;
+using std::string;
+using std::unique_ptr;
+using strings::Substitute;
+using tserver::WriteRequestPB;
+using tserver::WriteResponsePB;
+
+static Schema GetTestSchema() {
+  return Schema({ ColumnSchema("key", INT32) }, 1);
+}
+
+class TabletReplicaTest : public KuduTabletTest {
+ public:
+  TabletReplicaTest()
+    : KuduTabletTest(GetTestSchema()),
+      insert_counter_(0),
+      delete_counter_(0) {
+  }
+
+  virtual void SetUp() OVERRIDE {
+    KuduTabletTest::SetUp();
+
+    ASSERT_OK(ThreadPoolBuilder("apply").Build(&apply_pool_));
+
+    rpc::MessengerBuilder builder(CURRENT_TEST_NAME());
+    ASSERT_OK(builder.Build(&messenger_));
+
+    metric_entity_ = METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "test-tablet");
+
+    RaftPeerPB config_peer;
+    config_peer.set_permanent_uuid(tablet()->metadata()->fs_manager()->uuid());
+    config_peer.mutable_last_known_addr()->set_host("0.0.0.0");
+    config_peer.mutable_last_known_addr()->set_port(0);
+    config_peer.set_member_type(RaftPeerPB::VOTER);
+
+    // "Bootstrap" and start the TabletReplica.
+    tablet_replica_.reset(
+      new TabletReplica(make_scoped_refptr(tablet()->metadata()),
+                        config_peer,
+                        apply_pool_.get(),
+                        Bind(&TabletReplicaTest::TabletReplicaStateChangedCallback,
+                             Unretained(this),
+                             tablet()->tablet_id())));
+
+    // Make TabletReplica use the same LogAnchorRegistry as the Tablet created by the harness.
+    // TODO: Refactor TabletHarness to allow taking a LogAnchorRegistry, while also providing
+    // TabletMetadata for consumption by TabletReplica before Tablet is instantiated.
+    tablet_replica_->log_anchor_registry_ = tablet()->log_anchor_registry_;
+
+    RaftConfigPB config;
+    config.add_peers()->CopyFrom(config_peer);
+    config.set_opid_index(consensus::kInvalidOpIdIndex);
+
+    unique_ptr<ConsensusMetadata> cmeta;
+    ASSERT_OK(ConsensusMetadata::Create(tablet()->metadata()->fs_manager(),
+                                        tablet()->tablet_id(),
+                                        tablet()->metadata()->fs_manager()->uuid(),
+                                        config,
+                                        consensus::kMinimumTerm, &cmeta));
+
+    scoped_refptr<Log> log;
+    ASSERT_OK(Log::Open(LogOptions(), fs_manager(), tablet()->tablet_id(),
+                               *tablet()->schema(), tablet()->metadata()->schema_version(),
+                               metric_entity_.get(), &log));
+
+    tablet_replica_->SetBootstrapping();
+    ASSERT_OK(tablet_replica_->Init(tablet(),
+                                 clock(),
+                                 messenger_,
+                                 scoped_refptr<rpc::ResultTracker>(),
+                                 log,
+                                 metric_entity_));
+  }
+
+  Status StartPeer(const ConsensusBootstrapInfo& info) {
+    const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
+    RETURN_NOT_OK(tablet_replica_->Start(info));
+    RETURN_NOT_OK(tablet_replica_->consensus()->WaitUntilLeaderForTests(kTimeout));
+    return Status::OK();
+  }
+
+  void TabletReplicaStateChangedCallback(const string& tablet_id, const string& reason) {
+    LOG(INFO) << "Tablet replica state changed for tablet " << tablet_id << ". Reason: " << reason;
+  }
+
+  virtual void TearDown() OVERRIDE {
+    tablet_replica_->Shutdown();
+    apply_pool_->Shutdown();
+    KuduTabletTest::TearDown();
+  }
+
+ protected:
+  // Generate monotonic sequence of key column integers.
+  Status GenerateSequentialInsertRequest(WriteRequestPB* write_req) {
+    Schema schema(GetTestSchema());
+    write_req->set_tablet_id(tablet()->tablet_id());
+    CHECK_OK(SchemaToPB(schema, write_req->mutable_schema()));
+
+    KuduPartialRow row(&schema);
+    CHECK_OK(row.SetInt32("key", insert_counter_++));
+
+    RowOperationsPBEncoder enc(write_req->mutable_row_operations());
+    enc.Add(RowOperationsPB::INSERT, row);
+    return Status::OK();
+  }
+
+  // Generate monotonic sequence of deletions, starting with 0.
+  // Will assert if you try to delete more rows than you inserted.
+  Status GenerateSequentialDeleteRequest(WriteRequestPB* write_req) {
+    CHECK_LT(delete_counter_, insert_counter_);
+    Schema schema(GetTestSchema());
+    write_req->set_tablet_id(tablet()->tablet_id());
+    CHECK_OK(SchemaToPB(schema, write_req->mutable_schema()));
+
+    KuduPartialRow row(&schema);
+    CHECK_OK(row.SetInt32("key", delete_counter_++));
+
+    RowOperationsPBEncoder enc(write_req->mutable_row_operations());
+    enc.Add(RowOperationsPB::DELETE, row);
+    return Status::OK();
+  }
+
+  Status ExecuteWriteAndRollLog(TabletReplica* tablet_replica, const WriteRequestPB& req) {
+    gscoped_ptr<WriteResponsePB> resp(new WriteResponsePB());
+    unique_ptr<WriteTransactionState> tx_state(new WriteTransactionState(tablet_replica,
+                                                                         &req,
+                                                                         nullptr, // No RequestIdPB
+                                                                         resp.get()));
+
+    CountDownLatch rpc_latch(1);
+    tx_state->set_completion_callback(gscoped_ptr<TransactionCompletionCallback>(
+        new LatchTransactionCompletionCallback<WriteResponsePB>(&rpc_latch, resp.get())));
+
+    CHECK_OK(tablet_replica->SubmitWrite(std::move(tx_state)));
+    rpc_latch.Wait();
+    CHECK(!resp->has_error())
+        << "\nReq:\n" << SecureDebugString(req) << "Resp:\n" << SecureDebugString(*resp);
+
+    // Roll the log after each write.
+    // Usually the append thread does the roll and no additional sync is required. However in
+    // this test the thread that is appending is not the same thread that is rolling the log
+    // so we must make sure the Log's queue is flushed before we roll or we might have a race
+    // between the appender thread and the thread executing the test.
+    CHECK_OK(tablet_replica->log_->WaitUntilAllFlushed());
+    CHECK_OK(tablet_replica->log_->AllocateSegmentAndRollOver());
+    return Status::OK();
+  }
+
+  // Execute insert requests and roll log after each one.
+  Status ExecuteInsertsAndRollLogs(int num_inserts) {
+    for (int i = 0; i < num_inserts; i++) {
+      gscoped_ptr<WriteRequestPB> req(new WriteRequestPB());
+      RETURN_NOT_OK(GenerateSequentialInsertRequest(req.get()));
+      RETURN_NOT_OK(ExecuteWriteAndRollLog(tablet_replica_.get(), *req));
+    }
+
+    return Status::OK();
+  }
+
+  // Execute delete requests and roll log after each one.
+  Status ExecuteDeletesAndRollLogs(int num_deletes) {
+    for (int i = 0; i < num_deletes; i++) {
+      gscoped_ptr<WriteRequestPB> req(new WriteRequestPB());
+      CHECK_OK(GenerateSequentialDeleteRequest(req.get()));
+      CHECK_OK(ExecuteWriteAndRollLog(tablet_replica_.get(), *req));
+    }
+
+    return Status::OK();
+  }
+
+  void AssertNoLogAnchors() {
+    // Make sure that there are no registered anchors in the registry
+    CHECK_EQ(0, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests());
+  }
+
+  // Assert that the Log GC() anchor is earlier than the latest OpId in the Log.
+  void AssertLogAnchorEarlierThanLogLatest() {
+    log::RetentionIndexes retention = tablet_replica_->GetRetentionIndexes();
+    OpId last_log_opid;
+    tablet_replica_->log_->GetLatestEntryOpId(&last_log_opid);
+    CHECK_LT(retention.for_durability, last_log_opid.index())
+      << "Expected valid log anchor, got earliest opid: " << retention.for_durability
+      << " (expected any value earlier than last log id: " << SecureShortDebugString(last_log_opid)
+      << ")";
+  }
+
+  // We disable automatic log GC. Don't leak those changes.
+  google::FlagSaver flag_saver_;
+
+  int32_t insert_counter_;
+  int32_t delete_counter_;
+  MetricRegistry metric_registry_;
+  scoped_refptr<MetricEntity> metric_entity_;
+  shared_ptr<Messenger> messenger_;
+  scoped_refptr<TabletReplica> tablet_replica_;
+  gscoped_ptr<ThreadPool> apply_pool_;
+};
+
+// A Transaction that waits on the apply_continue latch inside of Apply().
+class DelayedApplyTransaction : public WriteTransaction {
+ public:
+  DelayedApplyTransaction(CountDownLatch* apply_started,
+                          CountDownLatch* apply_continue,
+                          unique_ptr<WriteTransactionState> state)
+      : WriteTransaction(std::move(state), consensus::LEADER),
+        apply_started_(DCHECK_NOTNULL(apply_started)),
+        apply_continue_(DCHECK_NOTNULL(apply_continue)) {
+  }
+
+  virtual Status Apply(gscoped_ptr<CommitMsg>* commit_msg) OVERRIDE {
+    apply_started_->CountDown();
+    LOG(INFO) << "Delaying apply...";
+    apply_continue_->Wait();
+    LOG(INFO) << "Apply proceeding";
+    return WriteTransaction::Apply(commit_msg);
+  }
+
+ private:
+  CountDownLatch* apply_started_;
+  CountDownLatch* apply_continue_;
+  DISALLOW_COPY_AND_ASSIGN(DelayedApplyTransaction);
+};
+
+// Ensure that Log::GC() doesn't delete logs when the MRS has an anchor.
+TEST_F(TabletReplicaTest, TestMRSAnchorPreventsLogGC) {
+  ConsensusBootstrapInfo info;
+  ASSERT_OK(StartPeer(info));
+
+  Log* log = tablet_replica_->log_.get();
+  int32_t num_gced;
+
+  AssertNoLogAnchors();
+
+  log::SegmentSequence segments;
+  ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
+
+  ASSERT_EQ(1, segments.size());
+  ASSERT_OK(ExecuteInsertsAndRollLogs(3));
+  ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
+  ASSERT_EQ(4, segments.size());
+
+  AssertLogAnchorEarlierThanLogLatest();
+  ASSERT_GT(tablet_replica_->log_anchor_registry()->GetAnchorCountForTests(), 0);
+
+  // Ensure nothing gets deleted.
+  log::RetentionIndexes retention = tablet_replica_->GetRetentionIndexes();
+  ASSERT_OK(log->GC(retention, &num_gced));
+  ASSERT_EQ(0, num_gced) << "earliest needed: " << retention.for_durability;
+
+  // Flush MRS as needed to ensure that we don't have OpId anchors in the MRS.
+  tablet_replica_->tablet()->Flush();
+  AssertNoLogAnchors();
+
+  // The first two segments should be deleted.
+  // The last is anchored due to the commit in the last segment being the last
+  // OpId in the log.
+  retention = tablet_replica_->GetRetentionIndexes();
+  ASSERT_OK(log->GC(retention, &num_gced));
+  ASSERT_EQ(2, num_gced) << "earliest needed: " << retention.for_durability;
+  ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
+  ASSERT_EQ(2, segments.size());
+}
+
+// Ensure that Log::GC() doesn't delete logs when the DMS has an anchor.
+TEST_F(TabletReplicaTest, TestDMSAnchorPreventsLogGC) {
+  ConsensusBootstrapInfo info;
+  ASSERT_OK(StartPeer(info));
+
+  Log* log = tablet_replica_->log_.get();
+  int32_t num_gced;
+
+  AssertNoLogAnchors();
+
+  log::SegmentSequence segments;
+  ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
+
+  ASSERT_EQ(1, segments.size());
+  ASSERT_OK(ExecuteInsertsAndRollLogs(2));
+  ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
+  ASSERT_EQ(3, segments.size());
+
+  // Flush MRS & GC log so the next mutation goes into a DMS.
+  ASSERT_OK(tablet_replica_->tablet()->Flush());
+  log::RetentionIndexes retention = tablet_replica_->GetRetentionIndexes();
+  ASSERT_OK(log->GC(retention, &num_gced));
+  // We will only GC 1, and have 1 left because the earliest needed OpId falls
+  // back to the latest OpId written to the Log if no anchors are set.
+  ASSERT_EQ(1, num_gced);
+  ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
+  ASSERT_EQ(2, segments.size());
+  AssertNoLogAnchors();
+
+  OpId id;
+  log->GetLatestEntryOpId(&id);
+  LOG(INFO) << "Before: " << SecureShortDebugString(id);
+
+
+  // We currently have no anchors and the last operation in the log is 0.3
+  // Before the below was ExecuteDeletesAndRollLogs(1) but that was breaking
+  // what I think is a wrong assertion.
+  // I.e. since 0.4 is the last operation that we know is in memory 0.4 is the
+  // last anchor we expect _and_ it's the last op in the log.
+  // Only if we apply two operations is the last anchored operation and the
+  // last operation in the log different.
+
+  // Execute a mutation.
+  ASSERT_OK(ExecuteDeletesAndRollLogs(2));
+  AssertLogAnchorEarlierThanLogLatest();
+  ASSERT_GT(tablet_replica_->log_anchor_registry()->GetAnchorCountForTests(), 0);
+  ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
+  ASSERT_EQ(4, segments.size());
+
+  // Execute another couple inserts, but Flush it so it doesn't anchor.
+  ASSERT_OK(ExecuteInsertsAndRollLogs(2));
+  ASSERT_OK(tablet_replica_->tablet()->Flush());
+  ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
+  ASSERT_EQ(6, segments.size());
+
+  // Ensure the delta and last insert remain in the logs, anchored by the delta.
+  // Note that this will allow GC of the 2nd insert done above.
+  retention = tablet_replica_->GetRetentionIndexes();
+  ASSERT_OK(log->GC(retention, &num_gced));
+  ASSERT_EQ(1, num_gced);
+  ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
+  ASSERT_EQ(5, segments.size());
+
+  // Flush DMS to release the anchor.
+  tablet_replica_->tablet()->FlushBiggestDMS();
+
+  // Verify no anchors after Flush().
+  AssertNoLogAnchors();
+
+  // We should only hang onto one segment due to no anchors.
+  // The last log OpId is the commit in the last segment, so it only anchors
+  // that segment, not the previous, because it's not the first OpId in the
+  // segment.
+  retention = tablet_replica_->GetRetentionIndexes();
+  ASSERT_OK(log->GC(retention, &num_gced));
+  ASSERT_EQ(3, num_gced);
+  ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
+  ASSERT_EQ(2, segments.size());
+}
+
+// Ensure that Log::GC() doesn't compact logs with OpIds of active transactions.
+TEST_F(TabletReplicaTest, TestActiveTransactionPreventsLogGC) {
+  ConsensusBootstrapInfo info;
+  ASSERT_OK(StartPeer(info));
+
+  Log* log = tablet_replica_->log_.get();
+  int32_t num_gced;
+
+  AssertNoLogAnchors();
+
+  log::SegmentSequence segments;
+  ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
+
+  ASSERT_EQ(1, segments.size());
+  ASSERT_OK(ExecuteInsertsAndRollLogs(4));
+  ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
+  ASSERT_EQ(5, segments.size());
+
+  // Flush MRS as needed to ensure that we don't have OpId anchors in the MRS.
+  ASSERT_EQ(1, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests());
+  tablet_replica_->tablet()->Flush();
+
+  // Verify no anchors after Flush().
+  AssertNoLogAnchors();
+
+  // Now create a long-lived Transaction that hangs during Apply().
+  // Allow other transactions to go through. Logs should be populated, but the
+  // long-lived Transaction should prevent the log from being deleted since it
+  // is in-flight.
+  CountDownLatch rpc_latch(1);
+  CountDownLatch apply_started(1);
+  CountDownLatch apply_continue(1);
+  gscoped_ptr<WriteRequestPB> req(new WriteRequestPB());
+  gscoped_ptr<WriteResponsePB> resp(new WriteResponsePB());
+  {
+    // Long-running mutation.
+    ASSERT_OK(GenerateSequentialDeleteRequest(req.get()));
+    unique_ptr<WriteTransactionState> tx_state(new WriteTransactionState(tablet_replica_.get(),
+                                                                         req.get(),
+                                                                         nullptr, // No RequestIdPB
+                                                                         resp.get()));
+
+    tx_state->set_completion_callback(gscoped_ptr<TransactionCompletionCallback>(
+        new LatchTransactionCompletionCallback<WriteResponsePB>(&rpc_latch, resp.get())));
+
+    gscoped_ptr<DelayedApplyTransaction> transaction(
+        new DelayedApplyTransaction(&apply_started,
+                                    &apply_continue,
+                                    std::move(tx_state)));
+
+    scoped_refptr<TransactionDriver> driver;
+    ASSERT_OK(tablet_replica_->NewLeaderTransactionDriver(transaction.PassAs<Transaction>(),
+                                                       &driver));
+
+    ASSERT_OK(driver->ExecuteAsync());
+    apply_started.Wait();
+    ASSERT_TRUE(driver->GetOpId().IsInitialized())
+      << "By the time a transaction is applied, it should have an Opid";
+    // The apply will hang until we CountDown() the continue latch.
+    // Now, roll the log. Below, we execute a few more insertions with rolling.
+    ASSERT_OK(log->AllocateSegmentAndRollOver());
+  }
+
+  ASSERT_EQ(1, tablet_replica_->txn_tracker_.GetNumPendingForTests());
+  // The log anchor is currently equal to the latest OpId written to the Log
+  // because we are delaying the Commit message with the CountDownLatch.
+
+  // GC the first four segments created by the inserts.
+  log::RetentionIndexes retention = tablet_replica_->GetRetentionIndexes();
+  ASSERT_OK(log->GC(retention, &num_gced));
+  ASSERT_EQ(4, num_gced);
+  ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
+  ASSERT_EQ(2, segments.size());
+
+  // We use mutations here, since an MRS Flush() quiesces the tablet, and we
+  // want to ensure the only thing "anchoring" is the TransactionTracker.
+  ASSERT_OK(ExecuteDeletesAndRollLogs(3));
+  ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
+  ASSERT_EQ(5, segments.size());
+  ASSERT_EQ(1, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests());
+  tablet_replica_->tablet()->FlushBiggestDMS();
+  ASSERT_EQ(0, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests());
+  ASSERT_EQ(1, tablet_replica_->txn_tracker_.GetNumPendingForTests());
+
+  AssertLogAnchorEarlierThanLogLatest();
+
+  // Try to GC(), nothing should be deleted due to the in-flight transaction.
+  retention = tablet_replica_->GetRetentionIndexes();
+  ASSERT_OK(log->GC(retention, &num_gced));
+  ASSERT_EQ(0, num_gced);
+  ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
+  ASSERT_EQ(5, segments.size());
+
+  // Now we release the transaction and wait for everything to complete.
+  // We fully quiesce and flush, which should release all anchors.
+  ASSERT_EQ(1, tablet_replica_->txn_tracker_.GetNumPendingForTests());
+  apply_continue.CountDown();
+  rpc_latch.Wait();
+  tablet_replica_->txn_tracker_.WaitForAllToFinish();
+  ASSERT_EQ(0, tablet_replica_->txn_tracker_.GetNumPendingForTests());
+  tablet_replica_->tablet()->FlushBiggestDMS();
+  AssertNoLogAnchors();
+
+  // All should be deleted except the two last segments.
+  retention = tablet_replica_->GetRetentionIndexes();
+  ASSERT_OK(log->GC(retention, &num_gced));
+  ASSERT_EQ(3, num_gced);
+  ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
+  ASSERT_EQ(2, segments.size());
+}
+
+TEST_F(TabletReplicaTest, TestGCEmptyLog) {
+  ConsensusBootstrapInfo info;
+  tablet_replica_->Start(info);
+  // We don't wait on consensus on purpose.
+  ASSERT_OK(tablet_replica_->RunLogGC());
+}
+
+TEST_F(TabletReplicaTest, TestFlushOpsPerfImprovements) {
+  FLAGS_flush_threshold_mb = 64;
+
+  MaintenanceOpStats stats;
+
+  // Just on the threshold and not enough time has passed for a time-based flush.
+  stats.set_ram_anchored(64 * 1024 * 1024);
+  FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(&stats, 1);
+  ASSERT_EQ(0.0, stats.perf_improvement());
+  stats.Clear();
+
+  // Just on the threshold and enough time has passed, we'll have a low improvement.
+  stats.set_ram_anchored(64 * 1024 * 1024);
+  FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(&stats, 3 * 60 * 1000);
+  ASSERT_GT(stats.perf_improvement(), 0.01);
+  stats.Clear();
+
+  // Way over the threshold, number is much higher than 1.
+  stats.set_ram_anchored(128 * 1024 * 1024);
+  FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(&stats, 1);
+  ASSERT_LT(1.0, stats.perf_improvement());
+  stats.Clear();
+
+  // Below the threshold but have been there a long time, closing in to 1.0.
+  stats.set_ram_anchored(30 * 1024 * 1024);
+  FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(&stats, 60 * 50 * 1000);
+  ASSERT_LT(0.7, stats.perf_improvement());
+  ASSERT_GT(1.0, stats.perf_improvement());
+  stats.Clear();
+}
+
+} // namespace tablet
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/7f72105b/src/kudu/tablet/tablet_replica.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc
new file mode 100644
index 0000000..3914dc9
--- /dev/null
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -0,0 +1,669 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/tablet/tablet_replica.h"
+
+#include <algorithm>
+#include <gflags/gflags.h>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "kudu/consensus/consensus.h"
+#include "kudu/consensus/consensus_meta.h"
+#include "kudu/consensus/log.h"
+#include "kudu/consensus/log_util.h"
+#include "kudu/consensus/opid_util.h"
+#include "kudu/consensus/log_anchor_registry.h"
+#include "kudu/consensus/quorum_util.h"
+#include "kudu/consensus/raft_consensus.h"
+#include "kudu/gutil/mathlimits.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/sysinfo.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/remote_method.h"
+#include "kudu/rpc/rpc_service.h"
+#include "kudu/rpc/service_pool.h"
+#include "kudu/tablet/transactions/transaction_driver.h"
+#include "kudu/tablet/transactions/alter_schema_transaction.h"
+#include "kudu/tablet/transactions/write_transaction.h"
+#include "kudu/tablet/tablet_bootstrap.h"
+#include "kudu/tablet/tablet_metrics.h"
+#include "kudu/tablet/tablet_replica_mm_ops.h"
+#include "kudu/tablet/tablet.pb.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/threadpool.h"
+#include "kudu/util/trace.h"
+
+using std::map;
+using std::shared_ptr;
+using std::unique_ptr;
+
+namespace kudu {
+namespace tablet {
+
+METRIC_DEFINE_histogram(tablet, op_prepare_queue_length, "Operation Prepare Queue Length",
+                        MetricUnit::kTasks,
+                        "Number of operations waiting to be prepared within this tablet. "
+                        "High queue lengths indicate that the server is unable to process "
+                        "operations as fast as they are being written to the WAL.",
+                        10000, 2);
+
+METRIC_DEFINE_histogram(tablet, op_prepare_queue_time, "Operation Prepare Queue Time",
+                        MetricUnit::kMicroseconds,
+                        "Time that operations spent waiting in the prepare queue before being "
+                        "processed. High queue times indicate that the server is unable to "
+                        "process operations as fast as they are being written to the WAL.",
+                        10000000, 2);
+
+METRIC_DEFINE_histogram(tablet, op_prepare_run_time, "Operation Prepare Run Time",
+                        MetricUnit::kMicroseconds,
+                        "Time that operations spent being prepared in the tablet. "
+                        "High values may indicate that the server is under-provisioned or "
+                        "that operations are experiencing high contention with one another for "
+                        "locks.",
+                        10000000, 2);
+
+using consensus::Consensus;
+using consensus::ConsensusBootstrapInfo;
+using consensus::ConsensusMetadata;
+using consensus::ConsensusOptions;
+using consensus::ConsensusRound;
+using consensus::OpId;
+using consensus::RaftConfigPB;
+using consensus::RaftPeerPB;
+using consensus::RaftConsensus;
+using consensus::TimeManager;
+using consensus::ALTER_SCHEMA_OP;
+using consensus::WRITE_OP;
+using log::Log;
+using log::LogAnchorRegistry;
+using rpc::Messenger;
+using rpc::ResultTracker;
+using strings::Substitute;
+using tserver::TabletServerErrorPB;
+
+TabletReplica::TabletReplica(const scoped_refptr<TabletMetadata>& meta,
+                             const consensus::RaftPeerPB& local_peer_pb,
+                             ThreadPool* apply_pool,
+                             Callback<void(const std::string& reason)> mark_dirty_clbk)
+    : meta_(meta),
+      tablet_id_(meta->tablet_id()),
+      local_peer_pb_(local_peer_pb),
+      state_(NOT_STARTED),
+      last_status_("Tablet initializing..."),
+      apply_pool_(apply_pool),
+      log_anchor_registry_(new LogAnchorRegistry()),
+      mark_dirty_clbk_(std::move(mark_dirty_clbk)) {}
+
+TabletReplica::~TabletReplica() {
+  std::lock_guard<simple_spinlock> lock(lock_);
+  // We should either have called Shutdown(), or we should have never called
+  // Init().
+  CHECK(!tablet_)
+      << "TabletReplica not fully shut down. State: "
+      << TabletStatePB_Name(state_);
+}
+
+Status TabletReplica::Init(const shared_ptr<Tablet>& tablet,
+                           const scoped_refptr<server::Clock>& clock,
+                           const shared_ptr<Messenger>& messenger,
+                           const scoped_refptr<ResultTracker>& result_tracker,
+                           const scoped_refptr<Log>& log,
+                           const scoped_refptr<MetricEntity>& metric_entity) {
+
+  DCHECK(tablet) << "A TabletReplica must be provided with a Tablet";
+  DCHECK(log) << "A TabletReplica must be provided with a Log";
+
+  RETURN_NOT_OK(ThreadPoolBuilder("prepare").set_max_threads(1).Build(&prepare_pool_));
+  prepare_pool_->SetQueueLengthHistogram(
+      METRIC_op_prepare_queue_length.Instantiate(metric_entity));
+  prepare_pool_->SetQueueTimeMicrosHistogram(
+      METRIC_op_prepare_queue_time.Instantiate(metric_entity));
+  prepare_pool_->SetRunTimeMicrosHistogram(
+      METRIC_op_prepare_run_time.Instantiate(metric_entity));
+
+  {
+    std::lock_guard<simple_spinlock> lock(lock_);
+    CHECK_EQ(BOOTSTRAPPING, state_);
+    tablet_ = tablet;
+    clock_ = clock;
+    messenger_ = messenger;
+    log_ = log;
+    result_tracker_ = result_tracker;
+
+    ConsensusOptions options;
+    options.tablet_id = meta_->tablet_id();
+
+    TRACE("Creating consensus instance");
+
+    unique_ptr<ConsensusMetadata> cmeta;
+    RETURN_NOT_OK(ConsensusMetadata::Load(meta_->fs_manager(), tablet_id_,
+                                          meta_->fs_manager()->uuid(), &cmeta));
+
+    scoped_refptr<TimeManager> time_manager(new TimeManager(
+        clock, tablet_->mvcc_manager()->GetCleanTimestamp()));
+
+    consensus_ = RaftConsensus::Create(options,
+                                       std::move(cmeta),
+                                       local_peer_pb_,
+                                       metric_entity,
+                                       time_manager,
+                                       this,
+                                       messenger_,
+                                       log_.get(),
+                                       tablet_->mem_tracker(),
+                                       mark_dirty_clbk_);
+  }
+
+  if (tablet_->metrics() != nullptr) {
+    TRACE("Starting instrumentation");
+    txn_tracker_.StartInstrumentation(tablet_->GetMetricEntity());
+  }
+  txn_tracker_.StartMemoryTracking(tablet_->mem_tracker());
+
+  TRACE("TabletReplica::Init() finished");
+  VLOG(2) << "T " << tablet_id() << " P " << consensus_->peer_uuid() << ": Peer Initted";
+  return Status::OK();
+}
+
+Status TabletReplica::Start(const ConsensusBootstrapInfo& bootstrap_info) {
+  std::lock_guard<simple_spinlock> l(state_change_lock_);
+  TRACE("Starting consensus");
+
+  VLOG(2) << "T " << tablet_id() << " P " << consensus_->peer_uuid() << ": Peer starting";
+
+  VLOG(2) << "RaftConfig before starting: " << SecureDebugString(consensus_->CommittedConfig());
+
+  RETURN_NOT_OK(consensus_->Start(bootstrap_info));
+  {
+    std::lock_guard<simple_spinlock> lock(lock_);
+    CHECK_EQ(state_, BOOTSTRAPPING);
+    state_ = RUNNING;
+  }
+
+  // Because we changed the tablet state, we need to re-report the tablet to the master.
+  mark_dirty_clbk_.Run("Started TabletReplica");
+
+  return Status::OK();
+}
+
+const consensus::RaftConfigPB TabletReplica::RaftConfig() const {
+  CHECK(consensus_) << "consensus is null";
+  return consensus_->CommittedConfig();
+}
+
+void TabletReplica::Shutdown() {
+
+  LOG(INFO) << "Initiating TabletReplica shutdown for tablet: " << tablet_id_;
+
+  {
+    std::unique_lock<simple_spinlock> lock(lock_);
+    if (state_ == QUIESCING || state_ == SHUTDOWN) {
+      lock.unlock();
+      WaitUntilShutdown();
+      return;
+    }
+    state_ = QUIESCING;
+  }
+
+  std::lock_guard<simple_spinlock> l(state_change_lock_);
+  // Even though Tablet::Shutdown() also unregisters its ops, we have to do it here
+  // to ensure that any currently running operation finishes before we proceed with
+  // the rest of the shutdown sequence. In particular, a maintenance operation could
+  // indirectly end up calling into the log, which we are about to shut down.
+  if (tablet_) tablet_->UnregisterMaintenanceOps();
+  UnregisterMaintenanceOps();
+
+  if (consensus_) consensus_->Shutdown();
+
+  // TODO: KUDU-183: Keep track of the pending tasks and send an "abort" message.
+  LOG_SLOW_EXECUTION(WARNING, 1000,
+      Substitute("TabletReplica: tablet $0: Waiting for Transactions to complete", tablet_id())) {
+    txn_tracker_.WaitForAllToFinish();
+  }
+
+  if (prepare_pool_) {
+    prepare_pool_->Shutdown();
+  }
+
+  if (log_) {
+    WARN_NOT_OK(log_->Close(), "Error closing the Log.");
+  }
+
+  if (VLOG_IS_ON(1)) {
+    VLOG(1) << "TabletReplica: tablet " << tablet_id() << " shut down!";
+  }
+
+  if (tablet_) {
+    tablet_->Shutdown();
+  }
+
+  // Only mark the peer as SHUTDOWN when all other components have shut down.
+  {
+    std::lock_guard<simple_spinlock> lock(lock_);
+    // Release mem tracker resources.
+    consensus_.reset();
+    tablet_.reset();
+    state_ = SHUTDOWN;
+  }
+}
+
+void TabletReplica::WaitUntilShutdown() {
+  while (true) {
+    {
+      std::lock_guard<simple_spinlock> lock(lock_);
+      if (state_ == SHUTDOWN) {
+        return;
+      }
+    }
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+}
+
+Status TabletReplica::CheckRunning() const {
+  {
+    std::lock_guard<simple_spinlock> lock(lock_);
+    if (state_ != RUNNING) {
+      return Status::IllegalState(Substitute("The tablet is not in a running state: $0",
+                                             TabletStatePB_Name(state_)));
+    }
+  }
+  return Status::OK();
+}
+
+Status TabletReplica::WaitUntilConsensusRunning(const MonoDelta& timeout) {
+  MonoTime start(MonoTime::Now());
+
+  int backoff_exp = 0;
+  const int kMaxBackoffExp = 8;
+  while (true) {
+    bool has_consensus = false;
+    TabletStatePB cached_state;
+    {
+      std::lock_guard<simple_spinlock> lock(lock_);
+      cached_state = state_;
+      if (consensus_) {
+        has_consensus = true; // consensus_ is a set-once object.
+      }
+    }
+    if (cached_state == QUIESCING || cached_state == SHUTDOWN) {
+      return Status::IllegalState(
+          Substitute("The tablet is already shutting down or shutdown. State: $0",
+                     TabletStatePB_Name(cached_state)));
+    }
+    if (cached_state == RUNNING && has_consensus && consensus_->IsRunning()) {
+      break;
+    }
+    MonoTime now(MonoTime::Now());
+    MonoDelta elapsed(now - start);
+    if (elapsed > timeout) {
+      return Status::TimedOut(Substitute("Consensus is not running after waiting for $0. State; $1",
+                                         elapsed.ToString(), TabletStatePB_Name(cached_state)));
+    }
+    SleepFor(MonoDelta::FromMilliseconds(1 << backoff_exp));
+    backoff_exp = std::min(backoff_exp + 1, kMaxBackoffExp);
+  }
+  return Status::OK();
+}
+
+Status TabletReplica::SubmitWrite(unique_ptr<WriteTransactionState> state) {
+  RETURN_NOT_OK(CheckRunning());
+
+  state->SetResultTracker(result_tracker_);
+  gscoped_ptr<WriteTransaction> transaction(new WriteTransaction(std::move(state),
+                                                                 consensus::LEADER));
+  scoped_refptr<TransactionDriver> driver;
+  RETURN_NOT_OK(NewLeaderTransactionDriver(transaction.PassAs<Transaction>(),
+                                           &driver));
+  return driver->ExecuteAsync();
+}
+
+Status TabletReplica::SubmitAlterSchema(unique_ptr<AlterSchemaTransactionState> state) {
+  RETURN_NOT_OK(CheckRunning());
+
+  gscoped_ptr<AlterSchemaTransaction> transaction(
+      new AlterSchemaTransaction(std::move(state), consensus::LEADER));
+  scoped_refptr<TransactionDriver> driver;
+  RETURN_NOT_OK(NewLeaderTransactionDriver(transaction.PassAs<Transaction>(), &driver));
+  return driver->ExecuteAsync();
+}
+
+void TabletReplica::GetTabletStatusPB(TabletStatusPB* status_pb_out) const {
+  std::lock_guard<simple_spinlock> lock(lock_);
+  DCHECK(status_pb_out != nullptr);
+  status_pb_out->set_tablet_id(meta_->tablet_id());
+  status_pb_out->set_table_name(meta_->table_name());
+  status_pb_out->set_last_status(last_status_);
+  meta_->partition().ToPB(status_pb_out->mutable_partition());
+  status_pb_out->set_state(state_);
+  status_pb_out->set_tablet_data_state(meta_->tablet_data_state());
+  if (tablet_) {
+    status_pb_out->set_estimated_on_disk_size(tablet_->EstimateOnDiskSize());
+  }
+}
+
+Status TabletReplica::RunLogGC() {
+  if (!CheckRunning().ok()) {
+    return Status::OK();
+  }
+  int32_t num_gced;
+  log::RetentionIndexes retention = GetRetentionIndexes();
+  Status s = log_->GC(retention, &num_gced);
+  if (!s.ok()) {
+    s = s.CloneAndPrepend("Unexpected error while running Log GC from TabletReplica");
+    LOG(ERROR) << s.ToString();
+  }
+  return Status::OK();
+}
+
+void TabletReplica::StatusMessage(const std::string& status) {
+  std::lock_guard<simple_spinlock> lock(lock_);
+  last_status_ = status;
+}
+
+string TabletReplica::last_status() const {
+  std::lock_guard<simple_spinlock> lock(lock_);
+  return last_status_;
+}
+
+void TabletReplica::SetFailed(const Status& error) {
+  std::lock_guard<simple_spinlock> lock(lock_);
+  CHECK(!error.ok());
+  state_ = FAILED;
+  error_ = error;
+  last_status_ = error.ToString();
+}
+
+string TabletReplica::HumanReadableState() const {
+  std::lock_guard<simple_spinlock> lock(lock_);
+  TabletDataState data_state = meta_->tablet_data_state();
+  // If failed, any number of things could have gone wrong.
+  if (state_ == FAILED) {
+    return Substitute("$0 ($1): $2", TabletStatePB_Name(state_),
+                      TabletDataState_Name(data_state),
+                      error_.ToString());
+  // If it's copying, or tombstoned, that is the important thing
+  // to show.
+  } else if (data_state != TABLET_DATA_READY) {
+    return TabletDataState_Name(data_state);
+  }
+  // Otherwise, the tablet's data is in a "normal" state, so we just display
+  // the runtime state (BOOTSTRAPPING, RUNNING, etc).
+  return TabletStatePB_Name(state_);
+}
+
+void TabletReplica::GetInFlightTransactions(Transaction::TraceType trace_type,
+                                            vector<consensus::TransactionStatusPB>* out) const {
+  vector<scoped_refptr<TransactionDriver> > pending_transactions;
+  txn_tracker_.GetPendingTransactions(&pending_transactions);
+  for (const scoped_refptr<TransactionDriver>& driver : pending_transactions) {
+    if (driver->state() != nullptr) {
+      consensus::TransactionStatusPB status_pb;
+      status_pb.mutable_op_id()->CopyFrom(driver->GetOpId());
+      switch (driver->tx_type()) {
+        case Transaction::WRITE_TXN:
+          status_pb.set_tx_type(consensus::WRITE_OP);
+          break;
+        case Transaction::ALTER_SCHEMA_TXN:
+          status_pb.set_tx_type(consensus::ALTER_SCHEMA_OP);
+          break;
+      }
+      status_pb.set_description(driver->ToString());
+      int64_t running_for_micros =
+          (MonoTime::Now() - driver->start_time()).ToMicroseconds();
+      status_pb.set_running_for_micros(running_for_micros);
+      if (trace_type == Transaction::TRACE_TXNS) {
+        status_pb.set_trace_buffer(driver->trace()->DumpToString());
+      }
+      out->push_back(status_pb);
+    }
+  }
+}
+
+log::RetentionIndexes TabletReplica::GetRetentionIndexes() const {
+  // Let consensus set a minimum index that should be anchored.
+  // This ensures that we:
+  //   (a) don't GC any operations which are still in flight
+  //   (b) don't GC any operations that are needed to catch up lagging peers.
+  log::RetentionIndexes ret = consensus_->GetRetentionIndexes();
+
+  // If we never have written to the log, no need to proceed.
+  if (ret.for_durability == 0) return ret;
+
+  // Next, we interrogate the anchor registry.
+  // Returns OK if minimum known, NotFound if no anchors are registered.
+  {
+    int64_t min_anchor_index;
+    Status s = log_anchor_registry_->GetEarliestRegisteredLogIndex(&min_anchor_index);
+    if (PREDICT_FALSE(!s.ok())) {
+      DCHECK(s.IsNotFound()) << "Unexpected error calling LogAnchorRegistry: " << s.ToString();
+    } else {
+      ret.for_durability = std::min(ret.for_durability, min_anchor_index);
+    }
+  }
+
+  // Next, interrogate the TransactionTracker.
+  vector<scoped_refptr<TransactionDriver> > pending_transactions;
+  txn_tracker_.GetPendingTransactions(&pending_transactions);
+  for (const scoped_refptr<TransactionDriver>& driver : pending_transactions) {
+    OpId tx_op_id = driver->GetOpId();
+    // A transaction which doesn't have an opid hasn't been submitted for replication yet and
+    // thus has no need to anchor the log.
+    if (tx_op_id.IsInitialized()) {
+      ret.for_durability = std::min(ret.for_durability, tx_op_id.index());
+    }
+  }
+
+  return ret;
+}
+
+Status TabletReplica::GetReplaySizeMap(map<int64_t, int64_t>* replay_size_map) const {
+  RETURN_NOT_OK(CheckRunning());
+  log_->GetReplaySizeMap(replay_size_map);
+  return Status::OK();
+}
+
+Status TabletReplica::GetGCableDataSize(int64_t* retention_size) const {
+  RETURN_NOT_OK(CheckRunning());
+  *retention_size = log_->GetGCableDataSize(GetRetentionIndexes());
+  return Status::OK();
+}
+
+Status TabletReplica::StartReplicaTransaction(const scoped_refptr<ConsensusRound>& round) {
+  {
+    std::lock_guard<simple_spinlock> lock(lock_);
+    if (state_ != RUNNING && state_ != BOOTSTRAPPING) {
+      return Status::IllegalState(TabletStatePB_Name(state_));
+    }
+  }
+
+  consensus::ReplicateMsg* replicate_msg = round->replicate_msg();
+  DCHECK(replicate_msg->has_timestamp());
+  gscoped_ptr<Transaction> transaction;
+  switch (replicate_msg->op_type()) {
+    case WRITE_OP:
+    {
+      DCHECK(replicate_msg->has_write_request()) << "WRITE_OP replica"
+          " transaction must receive a WriteRequestPB";
+      unique_ptr<WriteTransactionState> tx_state(
+          new WriteTransactionState(
+              this,
+              &replicate_msg->write_request(),
+              replicate_msg->has_request_id() ? &replicate_msg->request_id() : nullptr));
+      tx_state->SetResultTracker(result_tracker_);
+
+      transaction.reset(new WriteTransaction(std::move(tx_state), consensus::REPLICA));
+      break;
+    }
+    case ALTER_SCHEMA_OP:
+    {
+      DCHECK(replicate_msg->has_alter_schema_request()) << "ALTER_SCHEMA_OP replica"
+          " transaction must receive an AlterSchemaRequestPB";
+      unique_ptr<AlterSchemaTransactionState> tx_state(
+          new AlterSchemaTransactionState(this, &replicate_msg->alter_schema_request(),
+                                          nullptr));
+      transaction.reset(
+          new AlterSchemaTransaction(std::move(tx_state), consensus::REPLICA));
+      break;
+    }
+    default:
+      LOG(FATAL) << "Unsupported Operation Type";
+  }
+
+  // TODO(todd) Look at wiring the stuff below on the driver
+  TransactionState* state = transaction->state();
+  state->set_consensus_round(round);
+
+  scoped_refptr<TransactionDriver> driver;
+  RETURN_NOT_OK(NewReplicaTransactionDriver(std::move(transaction), &driver));
+
+  // Unretained is required to avoid a refcount cycle.
+  state->consensus_round()->SetConsensusReplicatedCallback(
+      Bind(&TransactionDriver::ReplicationFinished, Unretained(driver.get())));
+
+  RETURN_NOT_OK(driver->ExecuteAsync());
+  return Status::OK();
+}
+
+Status TabletReplica::NewLeaderTransactionDriver(gscoped_ptr<Transaction> transaction,
+                                                 scoped_refptr<TransactionDriver>* driver) {
+  scoped_refptr<TransactionDriver> tx_driver = new TransactionDriver(
+    &txn_tracker_,
+    consensus_.get(),
+    log_.get(),
+    prepare_pool_.get(),
+    apply_pool_,
+    &txn_order_verifier_);
+  RETURN_NOT_OK(tx_driver->Init(std::move(transaction), consensus::LEADER));
+  driver->swap(tx_driver);
+
+  return Status::OK();
+}
+
+Status TabletReplica::NewReplicaTransactionDriver(gscoped_ptr<Transaction> transaction,
+                                                  scoped_refptr<TransactionDriver>* driver) {
+  scoped_refptr<TransactionDriver> tx_driver = new TransactionDriver(
+    &txn_tracker_,
+    consensus_.get(),
+    log_.get(),
+    prepare_pool_.get(),
+    apply_pool_,
+    &txn_order_verifier_);
+  RETURN_NOT_OK(tx_driver->Init(std::move(transaction), consensus::REPLICA));
+  driver->swap(tx_driver);
+
+  return Status::OK();
+}
+
+void TabletReplica::RegisterMaintenanceOps(MaintenanceManager* maint_mgr) {
+  // Taking state_change_lock_ ensures that we don't shut down concurrently with
+  // this last start-up task.
+  std::lock_guard<simple_spinlock> l(state_change_lock_);
+
+  if (state() != RUNNING) {
+    LOG(WARNING) << "Not registering maintenance operations for " << tablet_
+                 << ": tablet not in RUNNING state";
+    return;
+  }
+
+  DCHECK(maintenance_ops_.empty());
+
+  gscoped_ptr<MaintenanceOp> mrs_flush_op(new FlushMRSOp(this));
+  maint_mgr->RegisterOp(mrs_flush_op.get());
+  maintenance_ops_.push_back(mrs_flush_op.release());
+
+  gscoped_ptr<MaintenanceOp> dms_flush_op(new FlushDeltaMemStoresOp(this));
+  maint_mgr->RegisterOp(dms_flush_op.get());
+  maintenance_ops_.push_back(dms_flush_op.release());
+
+  gscoped_ptr<MaintenanceOp> log_gc(new LogGCOp(this));
+  maint_mgr->RegisterOp(log_gc.get());
+  maintenance_ops_.push_back(log_gc.release());
+
+  tablet_->RegisterMaintenanceOps(maint_mgr);
+}
+
+void TabletReplica::UnregisterMaintenanceOps() {
+  DCHECK(state_change_lock_.is_locked());
+  for (MaintenanceOp* op : maintenance_ops_) {
+    op->Unregister();
+  }
+  STLDeleteElements(&maintenance_ops_);
+}
+
+Status FlushInflightsToLogCallback::WaitForInflightsAndFlushLog() {
+  // This callback is triggered prior to any TabletMetadata flush.
+  // The guarantee that we are trying to enforce is this:
+  //
+  //   If an operation has been flushed to stable storage (eg a DRS or DeltaFile)
+  //   then its COMMIT message must be present in the log.
+  //
+  // The purpose for this is so that, during bootstrap, we can accurately identify
+  // whether each operation has been flushed. If we don't see a COMMIT message for
+  // an operation, then we assume it was not completely applied and needs to be
+  // re-applied. Thus, if we had something on disk but with no COMMIT message,
+  // we'd attempt to double-apply the write, resulting in an error (eg trying to
+  // delete an already-deleted row).
+  //
+  // So, to enforce this property, we do two steps:
+  //
+  // 1) Wait for any operations which are already mid-Apply() to Commit() in MVCC.
+  //
+  // Because the operations always enqueue their COMMIT message to the log
+  // before calling Commit(), this ensures that any in-flight operations have
+  // their commit messages "en route".
+  //
+  // NOTE: we only wait for those operations that have started their Apply() phase.
+  // Any operations which haven't yet started applying haven't made any changes
+  // to in-memory state: thus, they obviously couldn't have made any changes to
+  // on-disk storage either (data can only get to the disk by going through an in-memory
+  // store). Only those that have started Apply() could have potentially written some
+  // data which is now on disk.
+  //
+  // Perhaps more importantly, if we waited on operations that hadn't started their
+  // Apply() phase, we might be waiting forever -- for example, if a follower has been
+  // partitioned from its leader, it may have operations sitting around in flight
+  // for quite a long time before eventually aborting or committing. This would
+  // end up blocking all flushes if we waited on it.
+  //
+  // 2) Flush the log
+  //
+  // This ensures that the above-mentioned commit messages are not just enqueued
+  // to the log, but also on disk.
+  VLOG(1) << "T " << tablet_->metadata()->tablet_id()
+      <<  ": Waiting for in-flight transactions to commit.";
+  LOG_SLOW_EXECUTION(WARNING, 200, "Committing in-flights took a long time.") {
+    tablet_->mvcc_manager()->WaitForApplyingTransactionsToCommit();
+  }
+  VLOG(1) << "T " << tablet_->metadata()->tablet_id()
+      << ": Waiting for the log queue to be flushed.";
+  LOG_SLOW_EXECUTION(WARNING, 200, "Flushing the Log queue took a long time.") {
+    RETURN_NOT_OK(log_->WaitUntilAllFlushed());
+  }
+  return Status::OK();
+}
+
+
+}  // namespace tablet
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/7f72105b/src/kudu/tablet/tablet_replica.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h
new file mode 100644
index 0000000..7f207cb
--- /dev/null
+++ b/src/kudu/tablet/tablet_replica.h
@@ -0,0 +1,377 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_TABLET_TABLET_REPLICA_H_
+#define KUDU_TABLET_TABLET_REPLICA_H_
+
+#include <map>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <vector>
+
+#include "kudu/consensus/consensus.h"
+#include "kudu/consensus/log.h"
+#include "kudu/consensus/time_manager.h"
+#include "kudu/gutil/callback.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/tablet/tablet.h"
+#include "kudu/tablet/transaction_order_verifier.h"
+#include "kudu/tablet/transactions/transaction_tracker.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/semaphore.h"
+
+namespace kudu {
+
+namespace log {
+class LogAnchorRegistry;
+}
+
+namespace rpc {
+class Messenger;
+class ResultTracker;
+}
+
+namespace tserver {
+class CatchUpServiceTest;
+}
+
+class MaintenanceManager;
+class MaintenanceOp;
+
+namespace tablet {
+class LeaderTransactionDriver;
+class ReplicaTransactionDriver;
+class TabletReplica;
+class TabletStatusPB;
+class TabletStatusListener;
+class TransactionDriver;
+
+// Interface by which various tablet-related processes can report back their status
+// to TabletReplica without having to have a circular class dependency, and so that
+// those other classes can be easily tested without constructing a TabletReplica.
+class TabletStatusListener {
+ public:
+  virtual ~TabletStatusListener() {}
+
+  virtual void StatusMessage(const std::string& status) = 0;
+};
+
+// A replica in a tablet consensus configuration, which coordinates writes to tablets.
+// Each time Write() is called this class appends a new entry to a replicated
+// state machine through a consensus algorithm, which makes sure that other
+// peers see the same updates in the same order. In addition to this, this
+// class also splits the work and coordinates multi-threaded execution.
+class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
+                      public consensus::ReplicaTransactionFactory,
+                      public TabletStatusListener {
+ public:
+  TabletReplica(const scoped_refptr<TabletMetadata>& meta,
+                const consensus::RaftPeerPB& local_peer_pb, ThreadPool* apply_pool,
+                Callback<void(const std::string& reason)> mark_dirty_clbk);
+
+  // Initializes the TabletReplica, namely creating the Log and initializing
+  // Consensus.
+  Status Init(const std::shared_ptr<tablet::Tablet>& tablet,
+              const scoped_refptr<server::Clock>& clock,
+              const std::shared_ptr<rpc::Messenger>& messenger,
+              const scoped_refptr<rpc::ResultTracker>& result_tracker,
+              const scoped_refptr<log::Log>& log,
+              const scoped_refptr<MetricEntity>& metric_entity);
+
+  // Starts the TabletReplica, making it available for Write()s. If this
+  // TabletReplica is part of a consensus configuration this will connect it to other replicas
+  // in the consensus configuration.
+  Status Start(const consensus::ConsensusBootstrapInfo& info);
+
+  // Shutdown this tablet replica.
+  // If a shutdown is already in progress, blocks until that shutdown is complete.
+  void Shutdown();
+
+  // Check that the tablet is in a RUNNING state.
+  Status CheckRunning() const;
+
+  // Wait until the tablet is in a RUNNING state or if there's a timeout.
+  // TODO have a way to wait for any state?
+  Status WaitUntilConsensusRunning(const MonoDelta& timeout);
+
+  // Submits a write to a tablet and executes it asynchronously.
+  // The caller is expected to build and pass a TrasactionContext that points
+  // to the RPC WriteRequest, WriteResponse, RpcContext and to the tablet's
+  // MvccManager.
+  Status SubmitWrite(std::unique_ptr<WriteTransactionState> tx_state);
+
+  // Called by the tablet service to start an alter schema transaction.
+  //
+  // The transaction contains all the information required to execute the
+  // AlterSchema operation and send the response back.
+  //
+  // If the returned Status is OK, the response to the client will be sent
+  // asynchronously. Otherwise the tablet service will have to send the response directly.
+  //
+  // The AlterSchema operation is taking the tablet component lock in exclusive mode
+  // meaning that no other operation on the tablet can be executed while the
+  // AlterSchema is in progress.
+  Status SubmitAlterSchema(std::unique_ptr<AlterSchemaTransactionState> tx_state);
+
+  void GetTabletStatusPB(TabletStatusPB* status_pb_out) const;
+
+  // Used by consensus to create and start a new ReplicaTransaction.
+  virtual Status StartReplicaTransaction(
+      const scoped_refptr<consensus::ConsensusRound>& round) OVERRIDE;
+
+  consensus::Consensus* consensus() {
+    std::lock_guard<simple_spinlock> lock(lock_);
+    return consensus_.get();
+  }
+
+  scoped_refptr<consensus::Consensus> shared_consensus() const {
+    std::lock_guard<simple_spinlock> lock(lock_);
+    return consensus_;
+  }
+
+  Tablet* tablet() const {
+    std::lock_guard<simple_spinlock> lock(lock_);
+    return tablet_.get();
+  }
+
+  scoped_refptr<consensus::TimeManager> time_manager() const {
+    return consensus_->time_manager();
+  }
+
+  std::shared_ptr<Tablet> shared_tablet() const {
+    std::lock_guard<simple_spinlock> lock(lock_);
+    return tablet_;
+  }
+
+  const TabletStatePB state() const {
+    std::lock_guard<simple_spinlock> lock(lock_);
+    return state_;
+  }
+
+  // Returns the current Raft configuration.
+  const consensus::RaftConfigPB RaftConfig() const;
+
+  // If any peers in the consensus configuration lack permanent uuids, get them via an
+  // RPC call and update.
+  // TODO: move this to raft_consensus.h.
+  Status UpdatePermanentUuids();
+
+  // Sets the tablet to a BOOTSTRAPPING state, indicating it is starting up.
+  void SetBootstrapping() {
+    std::lock_guard<simple_spinlock> lock(lock_);
+    CHECK_EQ(NOT_STARTED, state_);
+    state_ = BOOTSTRAPPING;
+  }
+
+  // Implementation of TabletStatusListener::StatusMessage().
+  void StatusMessage(const std::string& status) override;
+
+  // Retrieve the last human-readable status of this tablet replica.
+  std::string last_status() const;
+
+  // Sets the tablet state to FAILED additionally setting the error to the provided
+  // one.
+  void SetFailed(const Status& error);
+
+  // Returns the error that occurred, when state is FAILED.
+  Status error() const {
+    std::lock_guard<simple_spinlock> lock(lock_);
+    return error_;
+  }
+
+  // Returns a human-readable string indicating the state of the tablet.
+  // Typically this looks like "NOT_STARTED", "TABLET_DATA_COPYING",
+  // etc. For use in places like the Web UI.
+  std::string HumanReadableState() const;
+
+  // Adds list of transactions in-flight at the time of the call to 'out'.
+  void GetInFlightTransactions(Transaction::TraceType trace_type,
+                               std::vector<consensus::TransactionStatusPB>* out) const;
+
+  // Returns the log indexes to be retained for durability and to catch up peers.
+  // Used for selection of log segments to delete during Log GC.
+  log::RetentionIndexes GetRetentionIndexes() const;
+
+  // See Log::GetReplaySizeMap(...).
+  //
+  // Returns a non-ok status if the tablet isn't running.
+  Status GetReplaySizeMap(std::map<int64_t, int64_t>* replay_size_map) const;
+
+  // Returns the amount of bytes that would be GC'd if RunLogGC() was called.
+  //
+  // Returns a non-ok status if the tablet isn't running.
+  Status GetGCableDataSize(int64_t* retention_size) const;
+
+  // Return a pointer to the Log.
+  // TabletReplica keeps a reference to Log after Init().
+  log::Log* log() const {
+    return log_.get();
+  }
+
+  server::Clock* clock() {
+    return clock_.get();
+  }
+
+  const scoped_refptr<log::LogAnchorRegistry>& log_anchor_registry() const {
+    return log_anchor_registry_;
+  }
+
+  // Returns the tablet_id of the tablet managed by this TabletReplica.
+  // Returns the correct tablet_id even if the underlying tablet is not available
+  // yet.
+  const std::string& tablet_id() const { return tablet_id_; }
+
+  // Convenience method to return the permanent_uuid of this peer.
+  std::string permanent_uuid() const { return tablet_->metadata()->fs_manager()->uuid(); }
+
+  Status NewLeaderTransactionDriver(gscoped_ptr<Transaction> transaction,
+                                    scoped_refptr<TransactionDriver>* driver);
+
+  Status NewReplicaTransactionDriver(gscoped_ptr<Transaction> transaction,
+                                     scoped_refptr<TransactionDriver>* driver);
+
+  // Tells the tablet's log to garbage collect.
+  Status RunLogGC();
+
+  // Register the maintenance ops associated with this peer's tablet, also invokes
+  // Tablet::RegisterMaintenanceOps().
+  void RegisterMaintenanceOps(MaintenanceManager* maintenance_manager);
+
+  // Unregister the maintenance ops associated with this peer's tablet.
+  // This method is not thread safe.
+  void UnregisterMaintenanceOps();
+
+  // Return pointer to the transaction tracker for this peer.
+  const TransactionTracker* transaction_tracker() const { return &txn_tracker_; }
+
+  const scoped_refptr<TabletMetadata>& tablet_metadata() const {
+    return meta_;
+  }
+
+  // Marks the tablet as dirty so that it's included in the next heartbeat.
+  void MarkTabletDirty(const std::string& reason) {
+    mark_dirty_clbk_.Run(reason);
+  }
+
+ private:
+  friend class RefCountedThreadSafe<TabletReplica>;
+  friend class TabletReplicaTest;
+  FRIEND_TEST(TabletReplicaTest, TestMRSAnchorPreventsLogGC);
+  FRIEND_TEST(TabletReplicaTest, TestDMSAnchorPreventsLogGC);
+  FRIEND_TEST(TabletReplicaTest, TestActiveTransactionPreventsLogGC);
+
+  ~TabletReplica();
+
+  // Wait until the TabletReplica is fully in SHUTDOWN state.
+  void WaitUntilShutdown();
+
+  // After bootstrap is complete and consensus is setup this initiates the transactions
+  // that were not complete on bootstrap.
+  // Not implemented yet. See .cc file.
+  Status StartPendingTransactions(consensus::RaftPeerPB::Role my_role,
+                                  const consensus::ConsensusBootstrapInfo& bootstrap_info);
+
+  const scoped_refptr<TabletMetadata> meta_;
+
+  const std::string tablet_id_;
+
+  const consensus::RaftPeerPB local_peer_pb_;
+
+  TabletStatePB state_;
+  Status error_;
+  TransactionTracker txn_tracker_;
+  TransactionOrderVerifier txn_order_verifier_;
+  scoped_refptr<log::Log> log_;
+  std::shared_ptr<Tablet> tablet_;
+  std::shared_ptr<rpc::Messenger> messenger_;
+  scoped_refptr<consensus::Consensus> consensus_;
+  simple_spinlock prepare_replicate_lock_;
+
+  // Lock protecting state_, last_status_, as well as smart pointers to collaborating
+  // classes such as tablet_ and consensus_.
+  mutable simple_spinlock lock_;
+
+  // The human-readable last status of the tablet, displayed on the web page, command line
+  // tools, etc.
+  std::string last_status_;
+
+  // Lock taken during Init/Shutdown which ensures that only a single thread
+  // attempts to perform major lifecycle operations (Init/Shutdown) at once.
+  // This must be acquired before acquiring lock_ if they are acquired together.
+  // We don't just use lock_ since the lifecycle operations may take a while
+  // and we'd like other threads to be able to quickly poll the state_ variable
+  // during them in order to reject RPCs, etc.
+  mutable simple_spinlock state_change_lock_;
+
+  // IMPORTANT: correct execution of PrepareTask assumes that 'prepare_pool_'
+  // is single-threaded, moving to a multi-tablet setup where multiple TabletReplicas
+  // use the same 'prepare_pool_' needs to enforce that, for a single
+  // TabletReplica, PrepareTasks are executed *serially*.
+  // TODO move the prepare pool to TabletServer.
+  gscoped_ptr<ThreadPool> prepare_pool_;
+
+  // Pool that executes apply tasks for transactions. This is a multi-threaded
+  // pool, constructor-injected by either the Master (for system tables) or
+  // the Tablet server.
+  ThreadPool* apply_pool_;
+
+  scoped_refptr<server::Clock> clock_;
+
+  scoped_refptr<log::LogAnchorRegistry> log_anchor_registry_;
+
+  // Function to mark this TabletReplica's tablet as dirty in the TSTabletManager.
+  //
+  // Must be called whenever cluster membership or leadership changes, or when
+  // the tablet's schema changes.
+  Callback<void(const std::string& reason)> mark_dirty_clbk_;
+
+  // List of maintenance operations for the tablet that need information that only the peer
+  // can provide.
+  std::vector<MaintenanceOp*> maintenance_ops_;
+
+  // The result tracker for writes.
+  scoped_refptr<rpc::ResultTracker> result_tracker_;
+
+  DISALLOW_COPY_AND_ASSIGN(TabletReplica);
+};
+
+// A callback to wait for the in-flight transactions to complete and to flush
+// the Log when they do.
+// Tablet is passed as a raw pointer as this callback is set in TabletMetadata and
+// were we to keep the tablet as a shared_ptr a circular dependency would occur:
+// callback->tablet->metadata->callback. Since the tablet indirectly owns this
+// callback we know that is must still be alive when it fires.
+class FlushInflightsToLogCallback : public RefCountedThreadSafe<FlushInflightsToLogCallback> {
+ public:
+  FlushInflightsToLogCallback(Tablet* tablet,
+                              const scoped_refptr<log::Log>& log)
+   : tablet_(tablet),
+     log_(log) {}
+
+  Status WaitForInflightsAndFlushLog();
+
+ private:
+  Tablet* tablet_;
+  scoped_refptr<log::Log> log_;
+};
+
+
+}  // namespace tablet
+}  // namespace kudu
+
+#endif /* KUDU_TABLET_TABLET_REPLICA_H_ */

http://git-wip-us.apache.org/repos/asf/kudu/blob/7f72105b/src/kudu/tablet/tablet_replica_mm_ops.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica_mm_ops.cc b/src/kudu/tablet/tablet_replica_mm_ops.cc
new file mode 100644
index 0000000..85650ff
--- /dev/null
+++ b/src/kudu/tablet/tablet_replica_mm_ops.cc
@@ -0,0 +1,246 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/tablet/tablet_replica_mm_ops.h"
+
+#include <algorithm>
+#include <gflags/gflags.h>
+#include <map>
+#include <mutex>
+#include <string>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/tablet/tablet_metrics.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/maintenance_manager.h"
+#include "kudu/util/metrics.h"
+
+DEFINE_int32(flush_threshold_mb, 1024,
+             "Size at which MemRowSet flushes are triggered. "
+             "A MRS can still flush below this threshold if it if hasn't flushed in a while, "
+             "or if the server-wide memory limit has been reached.");
+TAG_FLAG(flush_threshold_mb, experimental);
+
+DEFINE_int32(flush_threshold_secs, 2 * 60,
+             "Number of seconds after which a non-empty MemRowSet will become flushable "
+             "even if it is not large.");
+TAG_FLAG(flush_threshold_secs, experimental);
+
+
+METRIC_DEFINE_gauge_uint32(tablet, log_gc_running,
+                           "Log GCs Running",
+                           kudu::MetricUnit::kOperations,
+                           "Number of log GC operations currently running.");
+METRIC_DEFINE_histogram(tablet, log_gc_duration,
+                        "Log GC Duration",
+                        kudu::MetricUnit::kMilliseconds,
+                        "Time spent garbage collecting the logs.", 60000LU, 1);
+
+namespace kudu {
+namespace tablet {
+
+using std::map;
+using strings::Substitute;
+
+// Upper bound for how long it takes to reach "full perf improvement" in time-based flushing.
+const double kFlushUpperBoundMs = 60 * 60 * 1000;
+
+//
+// FlushOpPerfImprovementPolicy.
+//
+
+void FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(MaintenanceOpStats* stats,
+                                                              double elapsed_ms) {
+  if (stats->ram_anchored() > FLAGS_flush_threshold_mb * 1024 * 1024) {
+    // If we're over the user-specified flush threshold, then consider the perf
+    // improvement to be 1 for every extra MB.  This produces perf_improvement results
+    // which are much higher than any compaction would produce, and means that, when
+    // there is an MRS over threshold, a flush will almost always be selected instead of
+    // a compaction.  That's not necessarily a good thing, but in the absence of better
+    // heuristics, it will do for now.
+    double extra_mb =
+        static_cast<double>(FLAGS_flush_threshold_mb - (stats->ram_anchored()) / (1024 * 1024));
+    stats->set_perf_improvement(extra_mb);
+  } else if (elapsed_ms > FLAGS_flush_threshold_secs * 1000) {
+    // Even if we aren't over the threshold, consider flushing if we haven't flushed
+    // in a long time. But, don't give it a large perf_improvement score. We should
+    // only do this if we really don't have much else to do, and if we've already waited a bit.
+    // The following will give an improvement that's between 0.0 and 1.0, gradually growing
+    // as 'elapsed_ms' approaches 'kFlushUpperBoundMs'.
+    double perf = elapsed_ms / kFlushUpperBoundMs;
+    if (perf > 1.0) {
+      perf = 1.0;
+    }
+    stats->set_perf_improvement(perf);
+  }
+}
+
+//
+// FlushMRSOp.
+//
+
+void FlushMRSOp::UpdateStats(MaintenanceOpStats* stats) {
+  std::lock_guard<simple_spinlock> l(lock_);
+
+  map<int64_t, int64_t> replay_size_map;
+  if (tablet_replica_->tablet()->MemRowSetEmpty() ||
+      !tablet_replica_->GetReplaySizeMap(&replay_size_map).ok()) {
+    return;
+  }
+
+  {
+    std::unique_lock<Semaphore> lock(tablet_replica_->tablet()->rowsets_flush_sem_,
+                                     std::defer_lock);
+    stats->set_runnable(lock.try_lock());
+  }
+
+  stats->set_ram_anchored(tablet_replica_->tablet()->MemRowSetSize());
+  stats->set_logs_retained_bytes(
+      tablet_replica_->tablet()->MemRowSetLogReplaySize(replay_size_map));
+
+  // TODO(todd): use workload statistics here to find out how "hot" the tablet has
+  // been in the last 5 minutes.
+  FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(
+      stats,
+      time_since_flush_.elapsed().wall_millis());
+}
+
+bool FlushMRSOp::Prepare() {
+  // Try to acquire the rowsets_flush_sem_.  If we can't, the Prepare step
+  // fails.  This also implies that only one instance of FlushMRSOp can be
+  // running at once.
+  return tablet_replica_->tablet()->rowsets_flush_sem_.try_lock();
+}
+
+void FlushMRSOp::Perform() {
+  CHECK(!tablet_replica_->tablet()->rowsets_flush_sem_.try_lock());
+
+  KUDU_CHECK_OK_PREPEND(tablet_replica_->tablet()->FlushUnlocked(),
+                        Substitute("FlushMRS failed on $0", tablet_replica_->tablet_id()));
+
+  {
+    std::lock_guard<simple_spinlock> l(lock_);
+    time_since_flush_.start();
+  }
+  tablet_replica_->tablet()->rowsets_flush_sem_.unlock();
+}
+
+scoped_refptr<Histogram> FlushMRSOp::DurationHistogram() const {
+  return tablet_replica_->tablet()->metrics()->flush_mrs_duration;
+}
+
+scoped_refptr<AtomicGauge<uint32_t> > FlushMRSOp::RunningGauge() const {
+  return tablet_replica_->tablet()->metrics()->flush_mrs_running;
+}
+
+//
+// FlushDeltaMemStoresOp.
+//
+
+void FlushDeltaMemStoresOp::UpdateStats(MaintenanceOpStats* stats) {
+  std::lock_guard<simple_spinlock> l(lock_);
+  int64_t dms_size;
+  int64_t retention_size;
+  map<int64_t, int64_t> max_idx_to_replay_size;
+  if (tablet_replica_->tablet()->DeltaMemRowSetEmpty() ||
+      !tablet_replica_->GetReplaySizeMap(&max_idx_to_replay_size).ok()) {
+    return;
+  }
+  tablet_replica_->tablet()->GetInfoForBestDMSToFlush(max_idx_to_replay_size,
+                                                   &dms_size, &retention_size);
+
+  stats->set_ram_anchored(dms_size);
+  stats->set_runnable(true);
+  stats->set_logs_retained_bytes(retention_size);
+
+  FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(
+      stats,
+      time_since_flush_.elapsed().wall_millis());
+}
+
+void FlushDeltaMemStoresOp::Perform() {
+  map<int64_t, int64_t> max_idx_to_replay_size;
+  if (!tablet_replica_->GetReplaySizeMap(&max_idx_to_replay_size).ok()) {
+    LOG(WARNING) << "Won't flush deltas since tablet shutting down: "
+                 << tablet_replica_->tablet_id();
+    return;
+  }
+  KUDU_CHECK_OK_PREPEND(tablet_replica_->tablet()->FlushDMSWithHighestRetention(
+                            max_idx_to_replay_size),
+                            Substitute("Failed to flush DMS on $0",
+                                       tablet_replica_->tablet()->tablet_id()));
+  {
+    std::lock_guard<simple_spinlock> l(lock_);
+    time_since_flush_.start();
+  }
+}
+
+scoped_refptr<Histogram> FlushDeltaMemStoresOp::DurationHistogram() const {
+  return tablet_replica_->tablet()->metrics()->flush_dms_duration;
+}
+
+scoped_refptr<AtomicGauge<uint32_t> > FlushDeltaMemStoresOp::RunningGauge() const {
+  return tablet_replica_->tablet()->metrics()->flush_dms_running;
+}
+
+//
+// LogGCOp.
+//
+
+LogGCOp::LogGCOp(TabletReplica* tablet_replica)
+    : MaintenanceOp(StringPrintf("LogGCOp(%s)", tablet_replica->tablet()->tablet_id().c_str()),
+                    MaintenanceOp::LOW_IO_USAGE),
+      tablet_replica_(tablet_replica),
+      log_gc_duration_(METRIC_log_gc_duration.Instantiate(
+                           tablet_replica->tablet()->GetMetricEntity())),
+      log_gc_running_(METRIC_log_gc_running.Instantiate(
+                          tablet_replica->tablet()->GetMetricEntity(), 0)),
+      sem_(1) {}
+
+void LogGCOp::UpdateStats(MaintenanceOpStats* stats) {
+  int64_t retention_size;
+
+  if (!tablet_replica_->GetGCableDataSize(&retention_size).ok()) {
+    return;
+  }
+
+  stats->set_logs_retained_bytes(retention_size);
+  stats->set_runnable(sem_.GetValue() == 1);
+}
+
+bool LogGCOp::Prepare() {
+  return sem_.try_lock();
+}
+
+void LogGCOp::Perform() {
+  CHECK(!sem_.try_lock());
+
+  tablet_replica_->RunLogGC();
+
+  sem_.unlock();
+}
+
+scoped_refptr<Histogram> LogGCOp::DurationHistogram() const {
+  return log_gc_duration_;
+}
+
+scoped_refptr<AtomicGauge<uint32_t> > LogGCOp::RunningGauge() const {
+  return log_gc_running_;
+}
+
+}  // namespace tablet
+}  // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/7f72105b/src/kudu/tablet/tablet_replica_mm_ops.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica_mm_ops.h b/src/kudu/tablet/tablet_replica_mm_ops.h
new file mode 100644
index 0000000..fc391ed
--- /dev/null
+++ b/src/kudu/tablet/tablet_replica_mm_ops.h
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_TABLET_TABLET_REPLICA_MM_OPS_H_
+#define KUDU_TABLET_TABLET_REPLICA_MM_OPS_H_
+
+#include "kudu/tablet/tablet_replica.h"
+#include "kudu/util/maintenance_manager.h"
+#include "kudu/util/stopwatch.h"
+
+namespace kudu {
+
+class Histogram;
+template<class T>
+class AtomicGauge;
+
+namespace tablet {
+
+class FlushOpPerfImprovementPolicy {
+ public:
+  ~FlushOpPerfImprovementPolicy() {}
+
+  // Sets the performance improvement based on the anchored ram if it's over the threshold,
+  // else it will set it based on how long it has been since the last flush.
+  static void SetPerfImprovementForFlush(MaintenanceOpStats* stats, double elapsed_ms);
+
+ private:
+  FlushOpPerfImprovementPolicy() {}
+};
+
+// Maintenance op for MRS flush. Only one can happen at a time.
+class FlushMRSOp : public MaintenanceOp {
+ public:
+  explicit FlushMRSOp(TabletReplica* tablet_replica)
+    : MaintenanceOp(StringPrintf("FlushMRSOp(%s)", tablet_replica->tablet()->tablet_id().c_str()),
+                    MaintenanceOp::HIGH_IO_USAGE),
+      tablet_replica_(tablet_replica) {
+    time_since_flush_.start();
+  }
+
+  virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
+
+  virtual bool Prepare() OVERRIDE;
+
+  virtual void Perform() OVERRIDE;
+
+  virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE;
+
+  virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE;
+
+ private:
+  // Lock protecting time_since_flush_.
+  mutable simple_spinlock lock_;
+  Stopwatch time_since_flush_;
+
+  TabletReplica *const tablet_replica_;
+};
+
+// Maintenance op for DMS flush.
+// Reports stats for all the DMS this tablet contains but only flushes one in Perform().
+class FlushDeltaMemStoresOp : public MaintenanceOp {
+ public:
+  explicit FlushDeltaMemStoresOp(TabletReplica* tablet_replica)
+    : MaintenanceOp(StringPrintf("FlushDeltaMemStoresOp(%s)",
+                                 tablet_replica->tablet()->tablet_id().c_str()),
+                    MaintenanceOp::HIGH_IO_USAGE),
+      tablet_replica_(tablet_replica) {
+    time_since_flush_.start();
+  }
+
+  virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
+
+  virtual bool Prepare() OVERRIDE {
+    return true;
+  }
+
+  virtual void Perform() OVERRIDE;
+
+  virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE;
+
+  virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE;
+
+ private:
+  // Lock protecting time_since_flush_
+  mutable simple_spinlock lock_;
+  Stopwatch time_since_flush_;
+
+  TabletReplica *const tablet_replica_;
+};
+
+// Maintenance task that runs log GC. Reports log retention that represents the amount of data
+// that can be GC'd.
+//
+// Only one LogGC op can run at a time.
+class LogGCOp : public MaintenanceOp {
+ public:
+  explicit LogGCOp(TabletReplica* tablet_replica);
+
+  virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
+
+  virtual bool Prepare() OVERRIDE;
+
+  virtual void Perform() OVERRIDE;
+
+  virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE;
+
+  virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE;
+
+ private:
+  TabletReplica *const tablet_replica_;
+  scoped_refptr<Histogram> log_gc_duration_;
+  scoped_refptr<AtomicGauge<uint32_t> > log_gc_running_;
+  mutable Semaphore sem_;
+};
+
+} // namespace tablet
+} // namespace kudu
+
+#endif /* KUDU_TABLET_TABLET_REPLICA_MM_OPS_H_ */

http://git-wip-us.apache.org/repos/asf/kudu/blob/7f72105b/src/kudu/tablet/transactions/alter_schema_transaction.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/alter_schema_transaction.cc b/src/kudu/tablet/transactions/alter_schema_transaction.cc
index 92a626d..38d2c56 100644
--- a/src/kudu/tablet/transactions/alter_schema_transaction.cc
+++ b/src/kudu/tablet/transactions/alter_schema_transaction.cc
@@ -23,7 +23,7 @@
 #include "kudu/rpc/rpc_context.h"
 #include "kudu/server/hybrid_clock.h"
 #include "kudu/tablet/tablet.h"
-#include "kudu/tablet/tablet_peer.h"
+#include "kudu/tablet/tablet_replica.h"
 #include "kudu/tablet/tablet_metrics.h"
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/pb_util.h"
@@ -86,7 +86,7 @@ Status AlterSchemaTransaction::Prepare() {
     return s;
   }
 
-  Tablet* tablet = state_->tablet_peer()->tablet();
+  Tablet* tablet = state_->tablet_replica()->tablet();
   RETURN_NOT_OK(tablet->CreatePreparedAlterSchema(state(), schema.get()));
 
   state_->AddToAutoReleasePool(schema.release());
@@ -106,15 +106,15 @@ Status AlterSchemaTransaction::Start() {
 Status AlterSchemaTransaction::Apply(gscoped_ptr<CommitMsg>* commit_msg) {
   TRACE("APPLY ALTER-SCHEMA: Starting");
 
-  Tablet* tablet = state_->tablet_peer()->tablet();
+  Tablet* tablet = state_->tablet_replica()->tablet();
   RETURN_NOT_OK(tablet->AlterSchema(state()));
-  state_->tablet_peer()->log()
+  state_->tablet_replica()->log()
     ->SetSchemaForNextLogSegment(*DCHECK_NOTNULL(state_->schema()),
                                                  state_->schema_version());
 
   // Altered tablets should be included in the next tserver heartbeat so that
   // clients waiting on IsAlterTableDone() are unblocked promptly.
-  state_->tablet_peer()->MarkTabletDirty("Alter schema finished");
+  state_->tablet_replica()->MarkTabletDirty("Alter schema finished");
 
   commit_msg->reset(new CommitMsg());
   (*commit_msg)->set_op_type(ALTER_SCHEMA_OP);

http://git-wip-us.apache.org/repos/asf/kudu/blob/7f72105b/src/kudu/tablet/transactions/alter_schema_transaction.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/alter_schema_transaction.h b/src/kudu/tablet/transactions/alter_schema_transaction.h
index cfcae3f..e1e0ab9 100644
--- a/src/kudu/tablet/transactions/alter_schema_transaction.h
+++ b/src/kudu/tablet/transactions/alter_schema_transaction.h
@@ -42,10 +42,10 @@ class AlterSchemaTransactionState : public TransactionState {
   ~AlterSchemaTransactionState() {
   }
 
-  AlterSchemaTransactionState(TabletPeer* tablet_peer,
+  AlterSchemaTransactionState(TabletReplica* tablet_replica,
                               const tserver::AlterSchemaRequestPB* request,
                               tserver::AlterSchemaResponsePB* response)
-      : TransactionState(tablet_peer),
+      : TransactionState(tablet_replica),
         schema_(NULL),
         request_(request),
         response_(response) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/7f72105b/src/kudu/tablet/transactions/transaction.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction.cc b/src/kudu/tablet/transactions/transaction.cc
index b8681a1..efa6688 100644
--- a/src/kudu/tablet/transactions/transaction.cc
+++ b/src/kudu/tablet/transactions/transaction.cc
@@ -28,8 +28,8 @@ Transaction::Transaction(TransactionState* state, DriverType type, TransactionTy
       tx_type_(tx_type) {
 }
 
-TransactionState::TransactionState(TabletPeer* tablet_peer)
-    : tablet_peer_(tablet_peer),
+TransactionState::TransactionState(TabletReplica* tablet_replica)
+    : tablet_replica_(tablet_replica),
       completion_clbk_(new TransactionCompletionCallback()),
       timestamp_error_(0),
       arena_(1024, 4 * 1024 * 1024),

http://git-wip-us.apache.org/repos/asf/kudu/blob/7f72105b/src/kudu/tablet/transactions/transaction.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction.h b/src/kudu/tablet/transactions/transaction.h
index 4d8b4d5..f0696a6 100644
--- a/src/kudu/tablet/transactions/transaction.h
+++ b/src/kudu/tablet/transactions/transaction.h
@@ -37,7 +37,7 @@ class ResultTracker;
 } // namespace rpc
 
 namespace tablet {
-class TabletPeer;
+class TabletReplica;
 class TransactionCompletionCallback;
 class TransactionState;
 
@@ -169,8 +169,8 @@ class TransactionState {
     return consensus_round_.get();
   }
 
-  TabletPeer* tablet_peer() const {
-    return tablet_peer_;
+  TabletReplica* tablet_replica() const {
+    return tablet_replica_;
   }
 
   // Return metrics related to this transaction.
@@ -257,13 +257,13 @@ class TransactionState {
   }
 
  protected:
-  explicit TransactionState(TabletPeer* tablet_peer);
+  explicit TransactionState(TabletReplica* tablet_replica);
   virtual ~TransactionState();
 
   TransactionMetrics tx_metrics_;
 
-  // The tablet peer that is coordinating this transaction.
-  TabletPeer* const tablet_peer_;
+  // The TabletReplica that is coordinating this transaction.
+  TabletReplica* const tablet_replica_;
 
   // The result tracker that will cache the result of this transaction.
   scoped_refptr<rpc::ResultTracker> result_tracker_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/7f72105b/src/kudu/tablet/transactions/transaction_driver.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction_driver.cc b/src/kudu/tablet/transactions/transaction_driver.cc
index 3050f34..cb6bf64 100644
--- a/src/kudu/tablet/transactions/transaction_driver.cc
+++ b/src/kudu/tablet/transactions/transaction_driver.cc
@@ -23,7 +23,7 @@
 #include "kudu/consensus/time_manager.h"
 #include "kudu/gutil/strings/strcat.h"
 #include "kudu/rpc/result_tracker.h"
-#include "kudu/tablet/tablet_peer.h"
+#include "kudu/tablet/tablet_replica.h"
 #include "kudu/tablet/transactions/transaction_tracker.h"
 #include "kudu/util/debug-util.h"
 #include "kudu/util/debug/trace_event.h"
@@ -442,7 +442,7 @@ Status TransactionDriver::ApplyAsync() {
       order_verifier_->CheckApply(op_id_copy_.index(), prepare_physical_timestamp_);
       // Now that the transaction is committed in consensus advance the safe time.
       if (transaction_->state()->external_consistency_mode() != COMMIT_WAIT) {
-        transaction_->state()->tablet_peer()->tablet()->mvcc_manager()->
+        transaction_->state()->tablet_replica()->tablet()->mvcc_manager()->
             AdjustSafeTime(transaction_->state()->timestamp());
       }
     } else {
@@ -515,7 +515,7 @@ void TransactionDriver::SetResponseTimestamp(TransactionState* transaction_state
 Status TransactionDriver::CommitWait() {
   MonoTime before = MonoTime::Now();
   DCHECK(mutable_state()->external_consistency_mode() == COMMIT_WAIT);
-  RETURN_NOT_OK(mutable_state()->tablet_peer()->clock()->WaitUntilAfter(
+  RETURN_NOT_OK(mutable_state()->tablet_replica()->clock()->WaitUntilAfter(
       mutable_state()->timestamp(), MonoTime::Max()));
   mutable_state()->mutable_metrics()->commit_wait_duration_usec =
       (MonoTime::Now() - before).ToMicroseconds();

http://git-wip-us.apache.org/repos/asf/kudu/blob/7f72105b/src/kudu/tablet/transactions/transaction_tracker.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction_tracker.cc b/src/kudu/tablet/transactions/transaction_tracker.cc
index 3661747..3ac55a9 100644
--- a/src/kudu/tablet/transactions/transaction_tracker.cc
+++ b/src/kudu/tablet/transactions/transaction_tracker.cc
@@ -23,7 +23,7 @@
 
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/substitute.h"
-#include "kudu/tablet/tablet_peer.h"
+#include "kudu/tablet/tablet_replica.h"
 #include "kudu/tablet/transactions/transaction_driver.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/logging.h"
@@ -97,12 +97,12 @@ Status TransactionTracker::Add(TransactionDriver* driver) {
     }
 
     // May be null in unit tests.
-    TabletPeer* peer = driver->state()->tablet_peer();
+    TabletReplica* replica = driver->state()->tablet_replica();
 
     string msg = Substitute(
         "Transaction failed, tablet $0 transaction memory consumption ($1) "
         "has exceeded its limit ($2) or the limit of an ancestral tracker",
-        peer ? peer->tablet()->tablet_id() : "(unknown)",
+        replica ? replica->tablet()->tablet_id() : "(unknown)",
         mem_tracker_->consumption(), mem_tracker_->limit());
 
     KLOG_EVERY_N_SECS(WARNING, 1) << msg << THROTTLE_MSG;

http://git-wip-us.apache.org/repos/asf/kudu/blob/7f72105b/src/kudu/tablet/transactions/transaction_tracker.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction_tracker.h b/src/kudu/tablet/transactions/transaction_tracker.h
index 88a9103..3a3725b 100644
--- a/src/kudu/tablet/transactions/transaction_tracker.h
+++ b/src/kudu/tablet/transactions/transaction_tracker.h
@@ -39,7 +39,7 @@ class MetricEntity;
 namespace tablet {
 class TransactionDriver;
 
-// Each TabletPeer has a TransactionTracker which keeps track of pending transactions.
+// Each TabletReplica has a TransactionTracker which keeps track of pending transactions.
 // Each "LeaderTransaction" will register itself by calling Add().
 // It will remove itself by calling Release().
 class TransactionTracker {