You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2017/05/04 22:14:28 UTC
[3/5] kudu git commit: Rename TabletPeer to TabletReplica
http://git-wip-us.apache.org/repos/asf/kudu/blob/7f72105b/src/kudu/tablet/tablet_replica-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica-test.cc b/src/kudu/tablet/tablet_replica-test.cc
new file mode 100644
index 0000000..c48da6a
--- /dev/null
+++ b/src/kudu/tablet/tablet_replica-test.cc
@@ -0,0 +1,565 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/common/partial_row.h"
+#include "kudu/common/timestamp.h"
+#include "kudu/common/wire_protocol.h"
+#include "kudu/common/wire_protocol-test-util.h"
+#include "kudu/consensus/consensus_meta.h"
+#include "kudu/consensus/log.h"
+#include "kudu/consensus/log_reader.h"
+#include "kudu/consensus/log_util.h"
+#include "kudu/consensus/opid_util.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/server/clock.h"
+#include "kudu/server/logical_clock.h"
+#include "kudu/tablet/transactions/transaction.h"
+#include "kudu/tablet/transactions/transaction_driver.h"
+#include "kudu/tablet/transactions/write_transaction.h"
+#include "kudu/tablet/tablet_replica.h"
+#include "kudu/tablet/tablet_replica_mm_ops.h"
+#include "kudu/tablet/tablet-test-util.h"
+#include "kudu/tserver/tserver.pb.h"
+#include "kudu/util/maintenance_manager.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/threadpool.h"
+
+METRIC_DECLARE_entity(tablet);
+
+DECLARE_int32(flush_threshold_mb);
+
+namespace kudu {
+namespace tablet {
+
+using consensus::CommitMsg;
+using consensus::Consensus;
+using consensus::ConsensusBootstrapInfo;
+using consensus::ConsensusMetadata;
+using consensus::MakeOpId;
+using consensus::MinimumOpId;
+using consensus::OpId;
+using consensus::OpIdEquals;
+using consensus::RaftPeerPB;
+using consensus::WRITE_OP;
+using log::Log;
+using log::LogAnchorRegistry;
+using log::LogOptions;
+using rpc::Messenger;
+using server::Clock;
+using server::LogicalClock;
+using std::shared_ptr;
+using std::string;
+using std::unique_ptr;
+using strings::Substitute;
+using tserver::WriteRequestPB;
+using tserver::WriteResponsePB;
+
+static Schema GetTestSchema() {
+ return Schema({ ColumnSchema("key", INT32) }, 1);
+}
+
+class TabletReplicaTest : public KuduTabletTest {
+ public:
+ TabletReplicaTest()
+ : KuduTabletTest(GetTestSchema()),
+ insert_counter_(0),
+ delete_counter_(0) {
+ }
+
+ virtual void SetUp() OVERRIDE {
+ KuduTabletTest::SetUp();
+
+ ASSERT_OK(ThreadPoolBuilder("apply").Build(&apply_pool_));
+
+ rpc::MessengerBuilder builder(CURRENT_TEST_NAME());
+ ASSERT_OK(builder.Build(&messenger_));
+
+ metric_entity_ = METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "test-tablet");
+
+ RaftPeerPB config_peer;
+ config_peer.set_permanent_uuid(tablet()->metadata()->fs_manager()->uuid());
+ config_peer.mutable_last_known_addr()->set_host("0.0.0.0");
+ config_peer.mutable_last_known_addr()->set_port(0);
+ config_peer.set_member_type(RaftPeerPB::VOTER);
+
+ // "Bootstrap" and start the TabletReplica.
+ tablet_replica_.reset(
+ new TabletReplica(make_scoped_refptr(tablet()->metadata()),
+ config_peer,
+ apply_pool_.get(),
+ Bind(&TabletReplicaTest::TabletReplicaStateChangedCallback,
+ Unretained(this),
+ tablet()->tablet_id())));
+
+ // Make TabletReplica use the same LogAnchorRegistry as the Tablet created by the harness.
+ // TODO: Refactor TabletHarness to allow taking a LogAnchorRegistry, while also providing
+ // TabletMetadata for consumption by TabletReplica before Tablet is instantiated.
+ tablet_replica_->log_anchor_registry_ = tablet()->log_anchor_registry_;
+
+ RaftConfigPB config;
+ config.add_peers()->CopyFrom(config_peer);
+ config.set_opid_index(consensus::kInvalidOpIdIndex);
+
+ unique_ptr<ConsensusMetadata> cmeta;
+ ASSERT_OK(ConsensusMetadata::Create(tablet()->metadata()->fs_manager(),
+ tablet()->tablet_id(),
+ tablet()->metadata()->fs_manager()->uuid(),
+ config,
+ consensus::kMinimumTerm, &cmeta));
+
+ scoped_refptr<Log> log;
+ ASSERT_OK(Log::Open(LogOptions(), fs_manager(), tablet()->tablet_id(),
+ *tablet()->schema(), tablet()->metadata()->schema_version(),
+ metric_entity_.get(), &log));
+
+ tablet_replica_->SetBootstrapping();
+ ASSERT_OK(tablet_replica_->Init(tablet(),
+ clock(),
+ messenger_,
+ scoped_refptr<rpc::ResultTracker>(),
+ log,
+ metric_entity_));
+ }
+
+ Status StartPeer(const ConsensusBootstrapInfo& info) {
+ const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
+ RETURN_NOT_OK(tablet_replica_->Start(info));
+ RETURN_NOT_OK(tablet_replica_->consensus()->WaitUntilLeaderForTests(kTimeout));
+ return Status::OK();
+ }
+
+ void TabletReplicaStateChangedCallback(const string& tablet_id, const string& reason) {
+ LOG(INFO) << "Tablet replica state changed for tablet " << tablet_id << ". Reason: " << reason;
+ }
+
+ virtual void TearDown() OVERRIDE {
+ tablet_replica_->Shutdown();
+ apply_pool_->Shutdown();
+ KuduTabletTest::TearDown();
+ }
+
+ protected:
+ // Generate monotonic sequence of key column integers.
+ Status GenerateSequentialInsertRequest(WriteRequestPB* write_req) {
+ Schema schema(GetTestSchema());
+ write_req->set_tablet_id(tablet()->tablet_id());
+ CHECK_OK(SchemaToPB(schema, write_req->mutable_schema()));
+
+ KuduPartialRow row(&schema);
+ CHECK_OK(row.SetInt32("key", insert_counter_++));
+
+ RowOperationsPBEncoder enc(write_req->mutable_row_operations());
+ enc.Add(RowOperationsPB::INSERT, row);
+ return Status::OK();
+ }
+
+ // Generate monotonic sequence of deletions, starting with 0.
+ // Will assert if you try to delete more rows than you inserted.
+ Status GenerateSequentialDeleteRequest(WriteRequestPB* write_req) {
+ CHECK_LT(delete_counter_, insert_counter_);
+ Schema schema(GetTestSchema());
+ write_req->set_tablet_id(tablet()->tablet_id());
+ CHECK_OK(SchemaToPB(schema, write_req->mutable_schema()));
+
+ KuduPartialRow row(&schema);
+ CHECK_OK(row.SetInt32("key", delete_counter_++));
+
+ RowOperationsPBEncoder enc(write_req->mutable_row_operations());
+ enc.Add(RowOperationsPB::DELETE, row);
+ return Status::OK();
+ }
+
+ Status ExecuteWriteAndRollLog(TabletReplica* tablet_replica, const WriteRequestPB& req) {
+ gscoped_ptr<WriteResponsePB> resp(new WriteResponsePB());
+ unique_ptr<WriteTransactionState> tx_state(new WriteTransactionState(tablet_replica,
+ &req,
+ nullptr, // No RequestIdPB
+ resp.get()));
+
+ CountDownLatch rpc_latch(1);
+ tx_state->set_completion_callback(gscoped_ptr<TransactionCompletionCallback>(
+ new LatchTransactionCompletionCallback<WriteResponsePB>(&rpc_latch, resp.get())));
+
+ CHECK_OK(tablet_replica->SubmitWrite(std::move(tx_state)));
+ rpc_latch.Wait();
+ CHECK(!resp->has_error())
+ << "\nReq:\n" << SecureDebugString(req) << "Resp:\n" << SecureDebugString(*resp);
+
+ // Roll the log after each write.
+ // Usually the append thread does the roll and no additional sync is required. However in
+ // this test the thread that is appending is not the same thread that is rolling the log
+ // so we must make sure the Log's queue is flushed before we roll or we might have a race
+ // between the appender thread and the thread executing the test.
+ CHECK_OK(tablet_replica->log_->WaitUntilAllFlushed());
+ CHECK_OK(tablet_replica->log_->AllocateSegmentAndRollOver());
+ return Status::OK();
+ }
+
+ // Execute insert requests and roll log after each one.
+ Status ExecuteInsertsAndRollLogs(int num_inserts) {
+ for (int i = 0; i < num_inserts; i++) {
+ gscoped_ptr<WriteRequestPB> req(new WriteRequestPB());
+ RETURN_NOT_OK(GenerateSequentialInsertRequest(req.get()));
+ RETURN_NOT_OK(ExecuteWriteAndRollLog(tablet_replica_.get(), *req));
+ }
+
+ return Status::OK();
+ }
+
+ // Execute delete requests and roll log after each one.
+ Status ExecuteDeletesAndRollLogs(int num_deletes) {
+ for (int i = 0; i < num_deletes; i++) {
+ gscoped_ptr<WriteRequestPB> req(new WriteRequestPB());
+ CHECK_OK(GenerateSequentialDeleteRequest(req.get()));
+ CHECK_OK(ExecuteWriteAndRollLog(tablet_replica_.get(), *req));
+ }
+
+ return Status::OK();
+ }
+
+ void AssertNoLogAnchors() {
+ // Make sure that there are no registered anchors in the registry
+ CHECK_EQ(0, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests());
+ }
+
+ // Assert that the Log GC() anchor is earlier than the latest OpId in the Log.
+ void AssertLogAnchorEarlierThanLogLatest() {
+ log::RetentionIndexes retention = tablet_replica_->GetRetentionIndexes();
+ OpId last_log_opid;
+ tablet_replica_->log_->GetLatestEntryOpId(&last_log_opid);
+ CHECK_LT(retention.for_durability, last_log_opid.index())
+ << "Expected valid log anchor, got earliest opid: " << retention.for_durability
+ << " (expected any value earlier than last log id: " << SecureShortDebugString(last_log_opid)
+ << ")";
+ }
+
+ // We disable automatic log GC. Don't leak those changes.
+ google::FlagSaver flag_saver_;
+
+ int32_t insert_counter_;
+ int32_t delete_counter_;
+ MetricRegistry metric_registry_;
+ scoped_refptr<MetricEntity> metric_entity_;
+ shared_ptr<Messenger> messenger_;
+ scoped_refptr<TabletReplica> tablet_replica_;
+ gscoped_ptr<ThreadPool> apply_pool_;
+};
+
+// A Transaction that waits on the apply_continue latch inside of Apply().
+class DelayedApplyTransaction : public WriteTransaction {
+ public:
+ DelayedApplyTransaction(CountDownLatch* apply_started,
+ CountDownLatch* apply_continue,
+ unique_ptr<WriteTransactionState> state)
+ : WriteTransaction(std::move(state), consensus::LEADER),
+ apply_started_(DCHECK_NOTNULL(apply_started)),
+ apply_continue_(DCHECK_NOTNULL(apply_continue)) {
+ }
+
+ virtual Status Apply(gscoped_ptr<CommitMsg>* commit_msg) OVERRIDE {
+ apply_started_->CountDown();
+ LOG(INFO) << "Delaying apply...";
+ apply_continue_->Wait();
+ LOG(INFO) << "Apply proceeding";
+ return WriteTransaction::Apply(commit_msg);
+ }
+
+ private:
+ CountDownLatch* apply_started_;
+ CountDownLatch* apply_continue_;
+ DISALLOW_COPY_AND_ASSIGN(DelayedApplyTransaction);
+};
+
+// Ensure that Log::GC() doesn't delete logs when the MRS has an anchor.
+TEST_F(TabletReplicaTest, TestMRSAnchorPreventsLogGC) {
+ ConsensusBootstrapInfo info;
+ ASSERT_OK(StartPeer(info));
+
+ Log* log = tablet_replica_->log_.get();
+ int32_t num_gced;
+
+ AssertNoLogAnchors();
+
+ log::SegmentSequence segments;
+ ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
+
+ ASSERT_EQ(1, segments.size());
+ ASSERT_OK(ExecuteInsertsAndRollLogs(3));
+ ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
+ ASSERT_EQ(4, segments.size());
+
+ AssertLogAnchorEarlierThanLogLatest();
+ ASSERT_GT(tablet_replica_->log_anchor_registry()->GetAnchorCountForTests(), 0);
+
+ // Ensure nothing gets deleted.
+ log::RetentionIndexes retention = tablet_replica_->GetRetentionIndexes();
+ ASSERT_OK(log->GC(retention, &num_gced));
+ ASSERT_EQ(0, num_gced) << "earliest needed: " << retention.for_durability;
+
+ // Flush MRS as needed to ensure that we don't have OpId anchors in the MRS.
+ tablet_replica_->tablet()->Flush();
+ AssertNoLogAnchors();
+
+ // The first two segments should be deleted.
+ // The last is anchored due to the commit in the last segment being the last
+ // OpId in the log.
+ retention = tablet_replica_->GetRetentionIndexes();
+ ASSERT_OK(log->GC(retention, &num_gced));
+ ASSERT_EQ(2, num_gced) << "earliest needed: " << retention.for_durability;
+ ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
+ ASSERT_EQ(2, segments.size());
+}
+
+// Ensure that Log::GC() doesn't delete logs when the DMS has an anchor.
+TEST_F(TabletReplicaTest, TestDMSAnchorPreventsLogGC) {
+ ConsensusBootstrapInfo info;
+ ASSERT_OK(StartPeer(info));
+
+ Log* log = tablet_replica_->log_.get();
+ int32_t num_gced;
+
+ AssertNoLogAnchors();
+
+ log::SegmentSequence segments;
+ ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
+
+ ASSERT_EQ(1, segments.size());
+ ASSERT_OK(ExecuteInsertsAndRollLogs(2));
+ ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
+ ASSERT_EQ(3, segments.size());
+
+ // Flush MRS & GC log so the next mutation goes into a DMS.
+ ASSERT_OK(tablet_replica_->tablet()->Flush());
+ log::RetentionIndexes retention = tablet_replica_->GetRetentionIndexes();
+ ASSERT_OK(log->GC(retention, &num_gced));
+ // We will only GC 1, and have 1 left because the earliest needed OpId falls
+ // back to the latest OpId written to the Log if no anchors are set.
+ ASSERT_EQ(1, num_gced);
+ ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
+ ASSERT_EQ(2, segments.size());
+ AssertNoLogAnchors();
+
+ OpId id;
+ log->GetLatestEntryOpId(&id);
+ LOG(INFO) << "Before: " << SecureShortDebugString(id);
+
+
+ // We currently have no anchors and the last operation in the log is 0.3
+ // Before the below was ExecuteDeletesAndRollLogs(1) but that was breaking
+ // what I think is a wrong assertion.
+ // I.e. since 0.4 is the last operation that we know is in memory 0.4 is the
+ // last anchor we expect _and_ it's the last op in the log.
+ // Only if we apply two operations is the last anchored operation and the
+ // last operation in the log different.
+
+ // Execute a mutation.
+ ASSERT_OK(ExecuteDeletesAndRollLogs(2));
+ AssertLogAnchorEarlierThanLogLatest();
+ ASSERT_GT(tablet_replica_->log_anchor_registry()->GetAnchorCountForTests(), 0);
+ ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
+ ASSERT_EQ(4, segments.size());
+
+ // Execute another couple inserts, but Flush it so it doesn't anchor.
+ ASSERT_OK(ExecuteInsertsAndRollLogs(2));
+ ASSERT_OK(tablet_replica_->tablet()->Flush());
+ ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
+ ASSERT_EQ(6, segments.size());
+
+ // Ensure the delta and last insert remain in the logs, anchored by the delta.
+ // Note that this will allow GC of the 2nd insert done above.
+ retention = tablet_replica_->GetRetentionIndexes();
+ ASSERT_OK(log->GC(retention, &num_gced));
+ ASSERT_EQ(1, num_gced);
+ ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
+ ASSERT_EQ(5, segments.size());
+
+ // Flush DMS to release the anchor.
+ tablet_replica_->tablet()->FlushBiggestDMS();
+
+ // Verify no anchors after Flush().
+ AssertNoLogAnchors();
+
+ // We should only hang onto one segment due to no anchors.
+ // The last log OpId is the commit in the last segment, so it only anchors
+ // that segment, not the previous, because it's not the first OpId in the
+ // segment.
+ retention = tablet_replica_->GetRetentionIndexes();
+ ASSERT_OK(log->GC(retention, &num_gced));
+ ASSERT_EQ(3, num_gced);
+ ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
+ ASSERT_EQ(2, segments.size());
+}
+
+// Ensure that Log::GC() doesn't compact logs with OpIds of active transactions.
+TEST_F(TabletReplicaTest, TestActiveTransactionPreventsLogGC) {
+ ConsensusBootstrapInfo info;
+ ASSERT_OK(StartPeer(info));
+
+ Log* log = tablet_replica_->log_.get();
+ int32_t num_gced;
+
+ AssertNoLogAnchors();
+
+ log::SegmentSequence segments;
+ ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
+
+ ASSERT_EQ(1, segments.size());
+ ASSERT_OK(ExecuteInsertsAndRollLogs(4));
+ ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
+ ASSERT_EQ(5, segments.size());
+
+ // Flush MRS as needed to ensure that we don't have OpId anchors in the MRS.
+ ASSERT_EQ(1, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests());
+ tablet_replica_->tablet()->Flush();
+
+ // Verify no anchors after Flush().
+ AssertNoLogAnchors();
+
+ // Now create a long-lived Transaction that hangs during Apply().
+ // Allow other transactions to go through. Logs should be populated, but the
+ // long-lived Transaction should prevent the log from being deleted since it
+ // is in-flight.
+ CountDownLatch rpc_latch(1);
+ CountDownLatch apply_started(1);
+ CountDownLatch apply_continue(1);
+ gscoped_ptr<WriteRequestPB> req(new WriteRequestPB());
+ gscoped_ptr<WriteResponsePB> resp(new WriteResponsePB());
+ {
+ // Long-running mutation.
+ ASSERT_OK(GenerateSequentialDeleteRequest(req.get()));
+ unique_ptr<WriteTransactionState> tx_state(new WriteTransactionState(tablet_replica_.get(),
+ req.get(),
+ nullptr, // No RequestIdPB
+ resp.get()));
+
+ tx_state->set_completion_callback(gscoped_ptr<TransactionCompletionCallback>(
+ new LatchTransactionCompletionCallback<WriteResponsePB>(&rpc_latch, resp.get())));
+
+ gscoped_ptr<DelayedApplyTransaction> transaction(
+ new DelayedApplyTransaction(&apply_started,
+ &apply_continue,
+ std::move(tx_state)));
+
+ scoped_refptr<TransactionDriver> driver;
+ ASSERT_OK(tablet_replica_->NewLeaderTransactionDriver(transaction.PassAs<Transaction>(),
+ &driver));
+
+ ASSERT_OK(driver->ExecuteAsync());
+ apply_started.Wait();
+ ASSERT_TRUE(driver->GetOpId().IsInitialized())
+ << "By the time a transaction is applied, it should have an Opid";
+ // The apply will hang until we CountDown() the continue latch.
+ // Now, roll the log. Below, we execute a few more insertions with rolling.
+ ASSERT_OK(log->AllocateSegmentAndRollOver());
+ }
+
+ ASSERT_EQ(1, tablet_replica_->txn_tracker_.GetNumPendingForTests());
+ // The log anchor is currently equal to the latest OpId written to the Log
+ // because we are delaying the Commit message with the CountDownLatch.
+
+ // GC the first four segments created by the inserts.
+ log::RetentionIndexes retention = tablet_replica_->GetRetentionIndexes();
+ ASSERT_OK(log->GC(retention, &num_gced));
+ ASSERT_EQ(4, num_gced);
+ ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
+ ASSERT_EQ(2, segments.size());
+
+ // We use mutations here, since an MRS Flush() quiesces the tablet, and we
+ // want to ensure the only thing "anchoring" is the TransactionTracker.
+ ASSERT_OK(ExecuteDeletesAndRollLogs(3));
+ ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
+ ASSERT_EQ(5, segments.size());
+ ASSERT_EQ(1, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests());
+ tablet_replica_->tablet()->FlushBiggestDMS();
+ ASSERT_EQ(0, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests());
+ ASSERT_EQ(1, tablet_replica_->txn_tracker_.GetNumPendingForTests());
+
+ AssertLogAnchorEarlierThanLogLatest();
+
+ // Try to GC(), nothing should be deleted due to the in-flight transaction.
+ retention = tablet_replica_->GetRetentionIndexes();
+ ASSERT_OK(log->GC(retention, &num_gced));
+ ASSERT_EQ(0, num_gced);
+ ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
+ ASSERT_EQ(5, segments.size());
+
+ // Now we release the transaction and wait for everything to complete.
+ // We fully quiesce and flush, which should release all anchors.
+ ASSERT_EQ(1, tablet_replica_->txn_tracker_.GetNumPendingForTests());
+ apply_continue.CountDown();
+ rpc_latch.Wait();
+ tablet_replica_->txn_tracker_.WaitForAllToFinish();
+ ASSERT_EQ(0, tablet_replica_->txn_tracker_.GetNumPendingForTests());
+ tablet_replica_->tablet()->FlushBiggestDMS();
+ AssertNoLogAnchors();
+
+ // All should be deleted except the two last segments.
+ retention = tablet_replica_->GetRetentionIndexes();
+ ASSERT_OK(log->GC(retention, &num_gced));
+ ASSERT_EQ(3, num_gced);
+ ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
+ ASSERT_EQ(2, segments.size());
+}
+
+TEST_F(TabletReplicaTest, TestGCEmptyLog) {
+ ConsensusBootstrapInfo info;
+ tablet_replica_->Start(info);
+ // We don't wait on consensus on purpose.
+ ASSERT_OK(tablet_replica_->RunLogGC());
+}
+
+TEST_F(TabletReplicaTest, TestFlushOpsPerfImprovements) {
+ FLAGS_flush_threshold_mb = 64;
+
+ MaintenanceOpStats stats;
+
+ // Just on the threshold and not enough time has passed for a time-based flush.
+ stats.set_ram_anchored(64 * 1024 * 1024);
+ FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(&stats, 1);
+ ASSERT_EQ(0.0, stats.perf_improvement());
+ stats.Clear();
+
+ // Just on the threshold and enough time has passed, we'll have a low improvement.
+ stats.set_ram_anchored(64 * 1024 * 1024);
+ FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(&stats, 3 * 60 * 1000);
+ ASSERT_GT(stats.perf_improvement(), 0.01);
+ stats.Clear();
+
+ // Way over the threshold, number is much higher than 1.
+ stats.set_ram_anchored(128 * 1024 * 1024);
+ FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(&stats, 1);
+ ASSERT_LT(1.0, stats.perf_improvement());
+ stats.Clear();
+
+ // Below the threshold but have been there a long time, closing in to 1.0.
+ stats.set_ram_anchored(30 * 1024 * 1024);
+ FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(&stats, 60 * 50 * 1000);
+ ASSERT_LT(0.7, stats.perf_improvement());
+ ASSERT_GT(1.0, stats.perf_improvement());
+ stats.Clear();
+}
+
+} // namespace tablet
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/7f72105b/src/kudu/tablet/tablet_replica.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc
new file mode 100644
index 0000000..3914dc9
--- /dev/null
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -0,0 +1,669 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/tablet/tablet_replica.h"
+
+#include <algorithm>
+#include <gflags/gflags.h>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "kudu/consensus/consensus.h"
+#include "kudu/consensus/consensus_meta.h"
+#include "kudu/consensus/log.h"
+#include "kudu/consensus/log_util.h"
+#include "kudu/consensus/opid_util.h"
+#include "kudu/consensus/log_anchor_registry.h"
+#include "kudu/consensus/quorum_util.h"
+#include "kudu/consensus/raft_consensus.h"
+#include "kudu/gutil/mathlimits.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/sysinfo.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/remote_method.h"
+#include "kudu/rpc/rpc_service.h"
+#include "kudu/rpc/service_pool.h"
+#include "kudu/tablet/transactions/transaction_driver.h"
+#include "kudu/tablet/transactions/alter_schema_transaction.h"
+#include "kudu/tablet/transactions/write_transaction.h"
+#include "kudu/tablet/tablet_bootstrap.h"
+#include "kudu/tablet/tablet_metrics.h"
+#include "kudu/tablet/tablet_replica_mm_ops.h"
+#include "kudu/tablet/tablet.pb.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/threadpool.h"
+#include "kudu/util/trace.h"
+
+using std::map;
+using std::shared_ptr;
+using std::unique_ptr;
+
+namespace kudu {
+namespace tablet {
+
+METRIC_DEFINE_histogram(tablet, op_prepare_queue_length, "Operation Prepare Queue Length",
+ MetricUnit::kTasks,
+ "Number of operations waiting to be prepared within this tablet. "
+ "High queue lengths indicate that the server is unable to process "
+ "operations as fast as they are being written to the WAL.",
+ 10000, 2);
+
+METRIC_DEFINE_histogram(tablet, op_prepare_queue_time, "Operation Prepare Queue Time",
+ MetricUnit::kMicroseconds,
+ "Time that operations spent waiting in the prepare queue before being "
+ "processed. High queue times indicate that the server is unable to "
+ "process operations as fast as they are being written to the WAL.",
+ 10000000, 2);
+
+METRIC_DEFINE_histogram(tablet, op_prepare_run_time, "Operation Prepare Run Time",
+ MetricUnit::kMicroseconds,
+ "Time that operations spent being prepared in the tablet. "
+ "High values may indicate that the server is under-provisioned or "
+ "that operations are experiencing high contention with one another for "
+ "locks.",
+ 10000000, 2);
+
+using consensus::Consensus;
+using consensus::ConsensusBootstrapInfo;
+using consensus::ConsensusMetadata;
+using consensus::ConsensusOptions;
+using consensus::ConsensusRound;
+using consensus::OpId;
+using consensus::RaftConfigPB;
+using consensus::RaftPeerPB;
+using consensus::RaftConsensus;
+using consensus::TimeManager;
+using consensus::ALTER_SCHEMA_OP;
+using consensus::WRITE_OP;
+using log::Log;
+using log::LogAnchorRegistry;
+using rpc::Messenger;
+using rpc::ResultTracker;
+using strings::Substitute;
+using tserver::TabletServerErrorPB;
+
+TabletReplica::TabletReplica(const scoped_refptr<TabletMetadata>& meta,
+ const consensus::RaftPeerPB& local_peer_pb,
+ ThreadPool* apply_pool,
+ Callback<void(const std::string& reason)> mark_dirty_clbk)
+ : meta_(meta),
+ tablet_id_(meta->tablet_id()),
+ local_peer_pb_(local_peer_pb),
+ state_(NOT_STARTED),
+ last_status_("Tablet initializing..."),
+ apply_pool_(apply_pool),
+ log_anchor_registry_(new LogAnchorRegistry()),
+ mark_dirty_clbk_(std::move(mark_dirty_clbk)) {}
+
+TabletReplica::~TabletReplica() {
+ std::lock_guard<simple_spinlock> lock(lock_);
+ // We should either have called Shutdown(), or we should have never called
+ // Init().
+ CHECK(!tablet_)
+ << "TabletReplica not fully shut down. State: "
+ << TabletStatePB_Name(state_);
+}
+
+Status TabletReplica::Init(const shared_ptr<Tablet>& tablet,
+ const scoped_refptr<server::Clock>& clock,
+ const shared_ptr<Messenger>& messenger,
+ const scoped_refptr<ResultTracker>& result_tracker,
+ const scoped_refptr<Log>& log,
+ const scoped_refptr<MetricEntity>& metric_entity) {
+
+ DCHECK(tablet) << "A TabletReplica must be provided with a Tablet";
+ DCHECK(log) << "A TabletReplica must be provided with a Log";
+
+ RETURN_NOT_OK(ThreadPoolBuilder("prepare").set_max_threads(1).Build(&prepare_pool_));
+ prepare_pool_->SetQueueLengthHistogram(
+ METRIC_op_prepare_queue_length.Instantiate(metric_entity));
+ prepare_pool_->SetQueueTimeMicrosHistogram(
+ METRIC_op_prepare_queue_time.Instantiate(metric_entity));
+ prepare_pool_->SetRunTimeMicrosHistogram(
+ METRIC_op_prepare_run_time.Instantiate(metric_entity));
+
+ {
+ std::lock_guard<simple_spinlock> lock(lock_);
+ CHECK_EQ(BOOTSTRAPPING, state_);
+ tablet_ = tablet;
+ clock_ = clock;
+ messenger_ = messenger;
+ log_ = log;
+ result_tracker_ = result_tracker;
+
+ ConsensusOptions options;
+ options.tablet_id = meta_->tablet_id();
+
+ TRACE("Creating consensus instance");
+
+ unique_ptr<ConsensusMetadata> cmeta;
+ RETURN_NOT_OK(ConsensusMetadata::Load(meta_->fs_manager(), tablet_id_,
+ meta_->fs_manager()->uuid(), &cmeta));
+
+ scoped_refptr<TimeManager> time_manager(new TimeManager(
+ clock, tablet_->mvcc_manager()->GetCleanTimestamp()));
+
+ consensus_ = RaftConsensus::Create(options,
+ std::move(cmeta),
+ local_peer_pb_,
+ metric_entity,
+ time_manager,
+ this,
+ messenger_,
+ log_.get(),
+ tablet_->mem_tracker(),
+ mark_dirty_clbk_);
+ }
+
+ if (tablet_->metrics() != nullptr) {
+ TRACE("Starting instrumentation");
+ txn_tracker_.StartInstrumentation(tablet_->GetMetricEntity());
+ }
+ txn_tracker_.StartMemoryTracking(tablet_->mem_tracker());
+
+ TRACE("TabletReplica::Init() finished");
+ VLOG(2) << "T " << tablet_id() << " P " << consensus_->peer_uuid() << ": Peer Initted";
+ return Status::OK();
+}
+
+Status TabletReplica::Start(const ConsensusBootstrapInfo& bootstrap_info) {
+ std::lock_guard<simple_spinlock> l(state_change_lock_);
+ TRACE("Starting consensus");
+
+ VLOG(2) << "T " << tablet_id() << " P " << consensus_->peer_uuid() << ": Peer starting";
+
+ VLOG(2) << "RaftConfig before starting: " << SecureDebugString(consensus_->CommittedConfig());
+
+ RETURN_NOT_OK(consensus_->Start(bootstrap_info));
+ {
+ std::lock_guard<simple_spinlock> lock(lock_);
+ CHECK_EQ(state_, BOOTSTRAPPING);
+ state_ = RUNNING;
+ }
+
+ // Because we changed the tablet state, we need to re-report the tablet to the master.
+ mark_dirty_clbk_.Run("Started TabletReplica");
+
+ return Status::OK();
+}
+
+const consensus::RaftConfigPB TabletReplica::RaftConfig() const {
+ CHECK(consensus_) << "consensus is null";
+ return consensus_->CommittedConfig();
+}
+
+void TabletReplica::Shutdown() {
+
+ LOG(INFO) << "Initiating TabletReplica shutdown for tablet: " << tablet_id_;
+
+ {
+ std::unique_lock<simple_spinlock> lock(lock_);
+ if (state_ == QUIESCING || state_ == SHUTDOWN) {
+ lock.unlock();
+ WaitUntilShutdown();
+ return;
+ }
+ state_ = QUIESCING;
+ }
+
+ std::lock_guard<simple_spinlock> l(state_change_lock_);
+ // Even though Tablet::Shutdown() also unregisters its ops, we have to do it here
+ // to ensure that any currently running operation finishes before we proceed with
+ // the rest of the shutdown sequence. In particular, a maintenance operation could
+ // indirectly end up calling into the log, which we are about to shut down.
+ if (tablet_) tablet_->UnregisterMaintenanceOps();
+ UnregisterMaintenanceOps();
+
+ if (consensus_) consensus_->Shutdown();
+
+ // TODO: KUDU-183: Keep track of the pending tasks and send an "abort" message.
+ LOG_SLOW_EXECUTION(WARNING, 1000,
+ Substitute("TabletReplica: tablet $0: Waiting for Transactions to complete", tablet_id())) {
+ txn_tracker_.WaitForAllToFinish();
+ }
+
+ if (prepare_pool_) {
+ prepare_pool_->Shutdown();
+ }
+
+ if (log_) {
+ WARN_NOT_OK(log_->Close(), "Error closing the Log.");
+ }
+
+ if (VLOG_IS_ON(1)) {
+ VLOG(1) << "TabletReplica: tablet " << tablet_id() << " shut down!";
+ }
+
+ if (tablet_) {
+ tablet_->Shutdown();
+ }
+
+ // Only mark the peer as SHUTDOWN when all other components have shut down.
+ {
+ std::lock_guard<simple_spinlock> lock(lock_);
+ // Release mem tracker resources.
+ consensus_.reset();
+ tablet_.reset();
+ state_ = SHUTDOWN;
+ }
+}
+
+void TabletReplica::WaitUntilShutdown() {
+ while (true) {
+ {
+ std::lock_guard<simple_spinlock> lock(lock_);
+ if (state_ == SHUTDOWN) {
+ return;
+ }
+ }
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ }
+}
+
+Status TabletReplica::CheckRunning() const {
+ {
+ std::lock_guard<simple_spinlock> lock(lock_);
+ if (state_ != RUNNING) {
+ return Status::IllegalState(Substitute("The tablet is not in a running state: $0",
+ TabletStatePB_Name(state_)));
+ }
+ }
+ return Status::OK();
+}
+
+Status TabletReplica::WaitUntilConsensusRunning(const MonoDelta& timeout) {
+ MonoTime start(MonoTime::Now());
+
+ int backoff_exp = 0;
+ const int kMaxBackoffExp = 8;
+ while (true) {
+ bool has_consensus = false;
+ TabletStatePB cached_state;
+ {
+ std::lock_guard<simple_spinlock> lock(lock_);
+ cached_state = state_;
+ if (consensus_) {
+ has_consensus = true; // consensus_ is a set-once object.
+ }
+ }
+ if (cached_state == QUIESCING || cached_state == SHUTDOWN) {
+ return Status::IllegalState(
+ Substitute("The tablet is already shutting down or shutdown. State: $0",
+ TabletStatePB_Name(cached_state)));
+ }
+ if (cached_state == RUNNING && has_consensus && consensus_->IsRunning()) {
+ break;
+ }
+ MonoTime now(MonoTime::Now());
+ MonoDelta elapsed(now - start);
+ if (elapsed > timeout) {
+ return Status::TimedOut(Substitute("Consensus is not running after waiting for $0. State; $1",
+ elapsed.ToString(), TabletStatePB_Name(cached_state)));
+ }
+ SleepFor(MonoDelta::FromMilliseconds(1 << backoff_exp));
+ backoff_exp = std::min(backoff_exp + 1, kMaxBackoffExp);
+ }
+ return Status::OK();
+}
+
+Status TabletReplica::SubmitWrite(unique_ptr<WriteTransactionState> state) {
+ RETURN_NOT_OK(CheckRunning());
+
+ state->SetResultTracker(result_tracker_);
+ gscoped_ptr<WriteTransaction> transaction(new WriteTransaction(std::move(state),
+ consensus::LEADER));
+ scoped_refptr<TransactionDriver> driver;
+ RETURN_NOT_OK(NewLeaderTransactionDriver(transaction.PassAs<Transaction>(),
+ &driver));
+ return driver->ExecuteAsync();
+}
+
+Status TabletReplica::SubmitAlterSchema(unique_ptr<AlterSchemaTransactionState> state) {
+ RETURN_NOT_OK(CheckRunning());
+
+ gscoped_ptr<AlterSchemaTransaction> transaction(
+ new AlterSchemaTransaction(std::move(state), consensus::LEADER));
+ scoped_refptr<TransactionDriver> driver;
+ RETURN_NOT_OK(NewLeaderTransactionDriver(transaction.PassAs<Transaction>(), &driver));
+ return driver->ExecuteAsync();
+}
+
+void TabletReplica::GetTabletStatusPB(TabletStatusPB* status_pb_out) const {
+ std::lock_guard<simple_spinlock> lock(lock_);
+ DCHECK(status_pb_out != nullptr);
+ status_pb_out->set_tablet_id(meta_->tablet_id());
+ status_pb_out->set_table_name(meta_->table_name());
+ status_pb_out->set_last_status(last_status_);
+ meta_->partition().ToPB(status_pb_out->mutable_partition());
+ status_pb_out->set_state(state_);
+ status_pb_out->set_tablet_data_state(meta_->tablet_data_state());
+ if (tablet_) {
+ status_pb_out->set_estimated_on_disk_size(tablet_->EstimateOnDiskSize());
+ }
+}
+
+Status TabletReplica::RunLogGC() {
+ if (!CheckRunning().ok()) {
+ return Status::OK();
+ }
+ int32_t num_gced;
+ log::RetentionIndexes retention = GetRetentionIndexes();
+ Status s = log_->GC(retention, &num_gced);
+ if (!s.ok()) {
+ s = s.CloneAndPrepend("Unexpected error while running Log GC from TabletReplica");
+ LOG(ERROR) << s.ToString();
+ }
+ return Status::OK();
+}
+
+void TabletReplica::StatusMessage(const std::string& status) {
+ std::lock_guard<simple_spinlock> lock(lock_);
+ last_status_ = status;
+}
+
+string TabletReplica::last_status() const {
+ std::lock_guard<simple_spinlock> lock(lock_);
+ return last_status_;
+}
+
+void TabletReplica::SetFailed(const Status& error) {
+ std::lock_guard<simple_spinlock> lock(lock_);
+ CHECK(!error.ok());
+ state_ = FAILED;
+ error_ = error;
+ last_status_ = error.ToString();
+}
+
+string TabletReplica::HumanReadableState() const {
+ std::lock_guard<simple_spinlock> lock(lock_);
+ TabletDataState data_state = meta_->tablet_data_state();
+ // If failed, any number of things could have gone wrong.
+ if (state_ == FAILED) {
+ return Substitute("$0 ($1): $2", TabletStatePB_Name(state_),
+ TabletDataState_Name(data_state),
+ error_.ToString());
+ // If it's copying, or tombstoned, that is the important thing
+ // to show.
+ } else if (data_state != TABLET_DATA_READY) {
+ return TabletDataState_Name(data_state);
+ }
+ // Otherwise, the tablet's data is in a "normal" state, so we just display
+ // the runtime state (BOOTSTRAPPING, RUNNING, etc).
+ return TabletStatePB_Name(state_);
+}
+
+void TabletReplica::GetInFlightTransactions(Transaction::TraceType trace_type,
+ vector<consensus::TransactionStatusPB>* out) const {
+ vector<scoped_refptr<TransactionDriver> > pending_transactions;
+ txn_tracker_.GetPendingTransactions(&pending_transactions);
+ for (const scoped_refptr<TransactionDriver>& driver : pending_transactions) {
+ if (driver->state() != nullptr) {
+ consensus::TransactionStatusPB status_pb;
+ status_pb.mutable_op_id()->CopyFrom(driver->GetOpId());
+ switch (driver->tx_type()) {
+ case Transaction::WRITE_TXN:
+ status_pb.set_tx_type(consensus::WRITE_OP);
+ break;
+ case Transaction::ALTER_SCHEMA_TXN:
+ status_pb.set_tx_type(consensus::ALTER_SCHEMA_OP);
+ break;
+ }
+ status_pb.set_description(driver->ToString());
+ int64_t running_for_micros =
+ (MonoTime::Now() - driver->start_time()).ToMicroseconds();
+ status_pb.set_running_for_micros(running_for_micros);
+ if (trace_type == Transaction::TRACE_TXNS) {
+ status_pb.set_trace_buffer(driver->trace()->DumpToString());
+ }
+ out->push_back(status_pb);
+ }
+ }
+}
+
+log::RetentionIndexes TabletReplica::GetRetentionIndexes() const {
+ // Let consensus set a minimum index that should be anchored.
+ // This ensures that we:
+ // (a) don't GC any operations which are still in flight
+ // (b) don't GC any operations that are needed to catch up lagging peers.
+ log::RetentionIndexes ret = consensus_->GetRetentionIndexes();
+
+ // If we never have written to the log, no need to proceed.
+ if (ret.for_durability == 0) return ret;
+
+ // Next, we interrogate the anchor registry.
+ // Returns OK if minimum known, NotFound if no anchors are registered.
+ {
+ int64_t min_anchor_index;
+ Status s = log_anchor_registry_->GetEarliestRegisteredLogIndex(&min_anchor_index);
+ if (PREDICT_FALSE(!s.ok())) {
+ DCHECK(s.IsNotFound()) << "Unexpected error calling LogAnchorRegistry: " << s.ToString();
+ } else {
+ ret.for_durability = std::min(ret.for_durability, min_anchor_index);
+ }
+ }
+
+ // Next, interrogate the TransactionTracker.
+ vector<scoped_refptr<TransactionDriver> > pending_transactions;
+ txn_tracker_.GetPendingTransactions(&pending_transactions);
+ for (const scoped_refptr<TransactionDriver>& driver : pending_transactions) {
+ OpId tx_op_id = driver->GetOpId();
+ // A transaction which doesn't have an opid hasn't been submitted for replication yet and
+ // thus has no need to anchor the log.
+ if (tx_op_id.IsInitialized()) {
+ ret.for_durability = std::min(ret.for_durability, tx_op_id.index());
+ }
+ }
+
+ return ret;
+}
+
+Status TabletReplica::GetReplaySizeMap(map<int64_t, int64_t>* replay_size_map) const {
+ RETURN_NOT_OK(CheckRunning());
+ log_->GetReplaySizeMap(replay_size_map);
+ return Status::OK();
+}
+
+Status TabletReplica::GetGCableDataSize(int64_t* retention_size) const {
+ RETURN_NOT_OK(CheckRunning());
+ *retention_size = log_->GetGCableDataSize(GetRetentionIndexes());
+ return Status::OK();
+}
+
+Status TabletReplica::StartReplicaTransaction(const scoped_refptr<ConsensusRound>& round) {
+ {
+ std::lock_guard<simple_spinlock> lock(lock_);
+ if (state_ != RUNNING && state_ != BOOTSTRAPPING) {
+ return Status::IllegalState(TabletStatePB_Name(state_));
+ }
+ }
+
+ consensus::ReplicateMsg* replicate_msg = round->replicate_msg();
+ DCHECK(replicate_msg->has_timestamp());
+ gscoped_ptr<Transaction> transaction;
+ switch (replicate_msg->op_type()) {
+ case WRITE_OP:
+ {
+ DCHECK(replicate_msg->has_write_request()) << "WRITE_OP replica"
+ " transaction must receive a WriteRequestPB";
+ unique_ptr<WriteTransactionState> tx_state(
+ new WriteTransactionState(
+ this,
+ &replicate_msg->write_request(),
+ replicate_msg->has_request_id() ? &replicate_msg->request_id() : nullptr));
+ tx_state->SetResultTracker(result_tracker_);
+
+ transaction.reset(new WriteTransaction(std::move(tx_state), consensus::REPLICA));
+ break;
+ }
+ case ALTER_SCHEMA_OP:
+ {
+ DCHECK(replicate_msg->has_alter_schema_request()) << "ALTER_SCHEMA_OP replica"
+ " transaction must receive an AlterSchemaRequestPB";
+ unique_ptr<AlterSchemaTransactionState> tx_state(
+ new AlterSchemaTransactionState(this, &replicate_msg->alter_schema_request(),
+ nullptr));
+ transaction.reset(
+ new AlterSchemaTransaction(std::move(tx_state), consensus::REPLICA));
+ break;
+ }
+ default:
+ LOG(FATAL) << "Unsupported Operation Type";
+ }
+
+ // TODO(todd) Look at wiring the stuff below on the driver
+ TransactionState* state = transaction->state();
+ state->set_consensus_round(round);
+
+ scoped_refptr<TransactionDriver> driver;
+ RETURN_NOT_OK(NewReplicaTransactionDriver(std::move(transaction), &driver));
+
+ // Unretained is required to avoid a refcount cycle.
+ state->consensus_round()->SetConsensusReplicatedCallback(
+ Bind(&TransactionDriver::ReplicationFinished, Unretained(driver.get())));
+
+ RETURN_NOT_OK(driver->ExecuteAsync());
+ return Status::OK();
+}
+
+Status TabletReplica::NewLeaderTransactionDriver(gscoped_ptr<Transaction> transaction,
+ scoped_refptr<TransactionDriver>* driver) {
+ scoped_refptr<TransactionDriver> tx_driver = new TransactionDriver(
+ &txn_tracker_,
+ consensus_.get(),
+ log_.get(),
+ prepare_pool_.get(),
+ apply_pool_,
+ &txn_order_verifier_);
+ RETURN_NOT_OK(tx_driver->Init(std::move(transaction), consensus::LEADER));
+ driver->swap(tx_driver);
+
+ return Status::OK();
+}
+
+Status TabletReplica::NewReplicaTransactionDriver(gscoped_ptr<Transaction> transaction,
+ scoped_refptr<TransactionDriver>* driver) {
+ scoped_refptr<TransactionDriver> tx_driver = new TransactionDriver(
+ &txn_tracker_,
+ consensus_.get(),
+ log_.get(),
+ prepare_pool_.get(),
+ apply_pool_,
+ &txn_order_verifier_);
+ RETURN_NOT_OK(tx_driver->Init(std::move(transaction), consensus::REPLICA));
+ driver->swap(tx_driver);
+
+ return Status::OK();
+}
+
+void TabletReplica::RegisterMaintenanceOps(MaintenanceManager* maint_mgr) {
+ // Taking state_change_lock_ ensures that we don't shut down concurrently with
+ // this last start-up task.
+ std::lock_guard<simple_spinlock> l(state_change_lock_);
+
+ if (state() != RUNNING) {
+ LOG(WARNING) << "Not registering maintenance operations for " << tablet_
+ << ": tablet not in RUNNING state";
+ return;
+ }
+
+ DCHECK(maintenance_ops_.empty());
+
+ gscoped_ptr<MaintenanceOp> mrs_flush_op(new FlushMRSOp(this));
+ maint_mgr->RegisterOp(mrs_flush_op.get());
+ maintenance_ops_.push_back(mrs_flush_op.release());
+
+ gscoped_ptr<MaintenanceOp> dms_flush_op(new FlushDeltaMemStoresOp(this));
+ maint_mgr->RegisterOp(dms_flush_op.get());
+ maintenance_ops_.push_back(dms_flush_op.release());
+
+ gscoped_ptr<MaintenanceOp> log_gc(new LogGCOp(this));
+ maint_mgr->RegisterOp(log_gc.get());
+ maintenance_ops_.push_back(log_gc.release());
+
+ tablet_->RegisterMaintenanceOps(maint_mgr);
+}
+
+void TabletReplica::UnregisterMaintenanceOps() {
+ DCHECK(state_change_lock_.is_locked());
+ for (MaintenanceOp* op : maintenance_ops_) {
+ op->Unregister();
+ }
+ STLDeleteElements(&maintenance_ops_);
+}
+
+Status FlushInflightsToLogCallback::WaitForInflightsAndFlushLog() {
+ // This callback is triggered prior to any TabletMetadata flush.
+ // The guarantee that we are trying to enforce is this:
+ //
+ // If an operation has been flushed to stable storage (eg a DRS or DeltaFile)
+ // then its COMMIT message must be present in the log.
+ //
+ // The purpose for this is so that, during bootstrap, we can accurately identify
+ // whether each operation has been flushed. If we don't see a COMMIT message for
+ // an operation, then we assume it was not completely applied and needs to be
+ // re-applied. Thus, if we had something on disk but with no COMMIT message,
+ // we'd attempt to double-apply the write, resulting in an error (eg trying to
+ // delete an already-deleted row).
+ //
+ // So, to enforce this property, we do two steps:
+ //
+ // 1) Wait for any operations which are already mid-Apply() to Commit() in MVCC.
+ //
+ // Because the operations always enqueue their COMMIT message to the log
+ // before calling Commit(), this ensures that any in-flight operations have
+ // their commit messages "en route".
+ //
+ // NOTE: we only wait for those operations that have started their Apply() phase.
+ // Any operations which haven't yet started applying haven't made any changes
+ // to in-memory state: thus, they obviously couldn't have made any changes to
+ // on-disk storage either (data can only get to the disk by going through an in-memory
+ // store). Only those that have started Apply() could have potentially written some
+ // data which is now on disk.
+ //
+ // Perhaps more importantly, if we waited on operations that hadn't started their
+ // Apply() phase, we might be waiting forever -- for example, if a follower has been
+ // partitioned from its leader, it may have operations sitting around in flight
+ // for quite a long time before eventually aborting or committing. This would
+ // end up blocking all flushes if we waited on it.
+ //
+ // 2) Flush the log
+ //
+ // This ensures that the above-mentioned commit messages are not just enqueued
+ // to the log, but also on disk.
+ VLOG(1) << "T " << tablet_->metadata()->tablet_id()
+ << ": Waiting for in-flight transactions to commit.";
+ LOG_SLOW_EXECUTION(WARNING, 200, "Committing in-flights took a long time.") {
+ tablet_->mvcc_manager()->WaitForApplyingTransactionsToCommit();
+ }
+ VLOG(1) << "T " << tablet_->metadata()->tablet_id()
+ << ": Waiting for the log queue to be flushed.";
+ LOG_SLOW_EXECUTION(WARNING, 200, "Flushing the Log queue took a long time.") {
+ RETURN_NOT_OK(log_->WaitUntilAllFlushed());
+ }
+ return Status::OK();
+}
+
+
+} // namespace tablet
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/7f72105b/src/kudu/tablet/tablet_replica.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h
new file mode 100644
index 0000000..7f207cb
--- /dev/null
+++ b/src/kudu/tablet/tablet_replica.h
@@ -0,0 +1,377 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_TABLET_TABLET_REPLICA_H_
+#define KUDU_TABLET_TABLET_REPLICA_H_
+
+#include <map>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <vector>
+
+#include "kudu/consensus/consensus.h"
+#include "kudu/consensus/log.h"
+#include "kudu/consensus/time_manager.h"
+#include "kudu/gutil/callback.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/tablet/tablet.h"
+#include "kudu/tablet/transaction_order_verifier.h"
+#include "kudu/tablet/transactions/transaction_tracker.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/semaphore.h"
+
+namespace kudu {
+
+namespace log {
+class LogAnchorRegistry;
+}
+
+namespace rpc {
+class Messenger;
+class ResultTracker;
+}
+
+namespace tserver {
+class CatchUpServiceTest;
+}
+
+class MaintenanceManager;
+class MaintenanceOp;
+
+namespace tablet {
+class LeaderTransactionDriver;
+class ReplicaTransactionDriver;
+class TabletReplica;
+class TabletStatusPB;
+class TabletStatusListener;
+class TransactionDriver;
+
+// Interface by which various tablet-related processes can report back their status
+// to TabletReplica without having to have a circular class dependency, and so that
+// those other classes can be easily tested without constructing a TabletReplica.
+class TabletStatusListener {
+ public:
+ virtual ~TabletStatusListener() {}
+
+ virtual void StatusMessage(const std::string& status) = 0;
+};
+
+// A replica in a tablet consensus configuration, which coordinates writes to tablets.
+// Each time Write() is called this class appends a new entry to a replicated
+// state machine through a consensus algorithm, which makes sure that other
+// peers see the same updates in the same order. In addition to this, this
+// class also splits the work and coordinates multi-threaded execution.
+class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
+ public consensus::ReplicaTransactionFactory,
+ public TabletStatusListener {
+ public:
+ TabletReplica(const scoped_refptr<TabletMetadata>& meta,
+ const consensus::RaftPeerPB& local_peer_pb, ThreadPool* apply_pool,
+ Callback<void(const std::string& reason)> mark_dirty_clbk);
+
+ // Initializes the TabletReplica, namely creating the Log and initializing
+ // Consensus.
+ Status Init(const std::shared_ptr<tablet::Tablet>& tablet,
+ const scoped_refptr<server::Clock>& clock,
+ const std::shared_ptr<rpc::Messenger>& messenger,
+ const scoped_refptr<rpc::ResultTracker>& result_tracker,
+ const scoped_refptr<log::Log>& log,
+ const scoped_refptr<MetricEntity>& metric_entity);
+
+ // Starts the TabletReplica, making it available for Write()s. If this
+ // TabletReplica is part of a consensus configuration this will connect it to other replicas
+ // in the consensus configuration.
+ Status Start(const consensus::ConsensusBootstrapInfo& info);
+
+ // Shutdown this tablet replica.
+ // If a shutdown is already in progress, blocks until that shutdown is complete.
+ void Shutdown();
+
+ // Check that the tablet is in a RUNNING state.
+ Status CheckRunning() const;
+
+ // Wait until the tablet is in a RUNNING state or if there's a timeout.
+ // TODO have a way to wait for any state?
+ Status WaitUntilConsensusRunning(const MonoDelta& timeout);
+
+ // Submits a write to a tablet and executes it asynchronously.
+ // The caller is expected to build and pass a TrasactionContext that points
+ // to the RPC WriteRequest, WriteResponse, RpcContext and to the tablet's
+ // MvccManager.
+ Status SubmitWrite(std::unique_ptr<WriteTransactionState> tx_state);
+
+ // Called by the tablet service to start an alter schema transaction.
+ //
+ // The transaction contains all the information required to execute the
+ // AlterSchema operation and send the response back.
+ //
+ // If the returned Status is OK, the response to the client will be sent
+ // asynchronously. Otherwise the tablet service will have to send the response directly.
+ //
+ // The AlterSchema operation is taking the tablet component lock in exclusive mode
+ // meaning that no other operation on the tablet can be executed while the
+ // AlterSchema is in progress.
+ Status SubmitAlterSchema(std::unique_ptr<AlterSchemaTransactionState> tx_state);
+
+ void GetTabletStatusPB(TabletStatusPB* status_pb_out) const;
+
+ // Used by consensus to create and start a new ReplicaTransaction.
+ virtual Status StartReplicaTransaction(
+ const scoped_refptr<consensus::ConsensusRound>& round) OVERRIDE;
+
+ consensus::Consensus* consensus() {
+ std::lock_guard<simple_spinlock> lock(lock_);
+ return consensus_.get();
+ }
+
+ scoped_refptr<consensus::Consensus> shared_consensus() const {
+ std::lock_guard<simple_spinlock> lock(lock_);
+ return consensus_;
+ }
+
+ Tablet* tablet() const {
+ std::lock_guard<simple_spinlock> lock(lock_);
+ return tablet_.get();
+ }
+
+ scoped_refptr<consensus::TimeManager> time_manager() const {
+ return consensus_->time_manager();
+ }
+
+ std::shared_ptr<Tablet> shared_tablet() const {
+ std::lock_guard<simple_spinlock> lock(lock_);
+ return tablet_;
+ }
+
+ const TabletStatePB state() const {
+ std::lock_guard<simple_spinlock> lock(lock_);
+ return state_;
+ }
+
+ // Returns the current Raft configuration.
+ const consensus::RaftConfigPB RaftConfig() const;
+
+ // If any peers in the consensus configuration lack permanent uuids, get them via an
+ // RPC call and update.
+ // TODO: move this to raft_consensus.h.
+ Status UpdatePermanentUuids();
+
+ // Sets the tablet to a BOOTSTRAPPING state, indicating it is starting up.
+ void SetBootstrapping() {
+ std::lock_guard<simple_spinlock> lock(lock_);
+ CHECK_EQ(NOT_STARTED, state_);
+ state_ = BOOTSTRAPPING;
+ }
+
+ // Implementation of TabletStatusListener::StatusMessage().
+ void StatusMessage(const std::string& status) override;
+
+ // Retrieve the last human-readable status of this tablet replica.
+ std::string last_status() const;
+
+ // Sets the tablet state to FAILED additionally setting the error to the provided
+ // one.
+ void SetFailed(const Status& error);
+
+ // Returns the error that occurred, when state is FAILED.
+ Status error() const {
+ std::lock_guard<simple_spinlock> lock(lock_);
+ return error_;
+ }
+
+ // Returns a human-readable string indicating the state of the tablet.
+ // Typically this looks like "NOT_STARTED", "TABLET_DATA_COPYING",
+ // etc. For use in places like the Web UI.
+ std::string HumanReadableState() const;
+
+ // Adds list of transactions in-flight at the time of the call to 'out'.
+ void GetInFlightTransactions(Transaction::TraceType trace_type,
+ std::vector<consensus::TransactionStatusPB>* out) const;
+
+ // Returns the log indexes to be retained for durability and to catch up peers.
+ // Used for selection of log segments to delete during Log GC.
+ log::RetentionIndexes GetRetentionIndexes() const;
+
+ // See Log::GetReplaySizeMap(...).
+ //
+ // Returns a non-ok status if the tablet isn't running.
+ Status GetReplaySizeMap(std::map<int64_t, int64_t>* replay_size_map) const;
+
+ // Returns the amount of bytes that would be GC'd if RunLogGC() was called.
+ //
+ // Returns a non-ok status if the tablet isn't running.
+ Status GetGCableDataSize(int64_t* retention_size) const;
+
+ // Return a pointer to the Log.
+ // TabletReplica keeps a reference to Log after Init().
+ log::Log* log() const {
+ return log_.get();
+ }
+
+ server::Clock* clock() {
+ return clock_.get();
+ }
+
+ const scoped_refptr<log::LogAnchorRegistry>& log_anchor_registry() const {
+ return log_anchor_registry_;
+ }
+
+ // Returns the tablet_id of the tablet managed by this TabletReplica.
+ // Returns the correct tablet_id even if the underlying tablet is not available
+ // yet.
+ const std::string& tablet_id() const { return tablet_id_; }
+
+ // Convenience method to return the permanent_uuid of this peer.
+ std::string permanent_uuid() const { return tablet_->metadata()->fs_manager()->uuid(); }
+
+ Status NewLeaderTransactionDriver(gscoped_ptr<Transaction> transaction,
+ scoped_refptr<TransactionDriver>* driver);
+
+ Status NewReplicaTransactionDriver(gscoped_ptr<Transaction> transaction,
+ scoped_refptr<TransactionDriver>* driver);
+
+ // Tells the tablet's log to garbage collect.
+ Status RunLogGC();
+
+ // Register the maintenance ops associated with this peer's tablet, also invokes
+ // Tablet::RegisterMaintenanceOps().
+ void RegisterMaintenanceOps(MaintenanceManager* maintenance_manager);
+
+ // Unregister the maintenance ops associated with this peer's tablet.
+ // This method is not thread safe.
+ void UnregisterMaintenanceOps();
+
+ // Return pointer to the transaction tracker for this peer.
+ const TransactionTracker* transaction_tracker() const { return &txn_tracker_; }
+
+ const scoped_refptr<TabletMetadata>& tablet_metadata() const {
+ return meta_;
+ }
+
+ // Marks the tablet as dirty so that it's included in the next heartbeat.
+ void MarkTabletDirty(const std::string& reason) {
+ mark_dirty_clbk_.Run(reason);
+ }
+
+ private:
+ friend class RefCountedThreadSafe<TabletReplica>;
+ friend class TabletReplicaTest;
+ FRIEND_TEST(TabletReplicaTest, TestMRSAnchorPreventsLogGC);
+ FRIEND_TEST(TabletReplicaTest, TestDMSAnchorPreventsLogGC);
+ FRIEND_TEST(TabletReplicaTest, TestActiveTransactionPreventsLogGC);
+
+ ~TabletReplica();
+
+ // Wait until the TabletReplica is fully in SHUTDOWN state.
+ void WaitUntilShutdown();
+
+ // After bootstrap is complete and consensus is setup this initiates the transactions
+ // that were not complete on bootstrap.
+ // Not implemented yet. See .cc file.
+ Status StartPendingTransactions(consensus::RaftPeerPB::Role my_role,
+ const consensus::ConsensusBootstrapInfo& bootstrap_info);
+
+ const scoped_refptr<TabletMetadata> meta_;
+
+ const std::string tablet_id_;
+
+ const consensus::RaftPeerPB local_peer_pb_;
+
+ TabletStatePB state_;
+ Status error_;
+ TransactionTracker txn_tracker_;
+ TransactionOrderVerifier txn_order_verifier_;
+ scoped_refptr<log::Log> log_;
+ std::shared_ptr<Tablet> tablet_;
+ std::shared_ptr<rpc::Messenger> messenger_;
+ scoped_refptr<consensus::Consensus> consensus_;
+ simple_spinlock prepare_replicate_lock_;
+
+ // Lock protecting state_, last_status_, as well as smart pointers to collaborating
+ // classes such as tablet_ and consensus_.
+ mutable simple_spinlock lock_;
+
+ // The human-readable last status of the tablet, displayed on the web page, command line
+ // tools, etc.
+ std::string last_status_;
+
+ // Lock taken during Init/Shutdown which ensures that only a single thread
+ // attempts to perform major lifecycle operations (Init/Shutdown) at once.
+ // This must be acquired before acquiring lock_ if they are acquired together.
+ // We don't just use lock_ since the lifecycle operations may take a while
+ // and we'd like other threads to be able to quickly poll the state_ variable
+ // during them in order to reject RPCs, etc.
+ mutable simple_spinlock state_change_lock_;
+
+ // IMPORTANT: correct execution of PrepareTask assumes that 'prepare_pool_'
+ // is single-threaded, moving to a multi-tablet setup where multiple TabletReplicas
+ // use the same 'prepare_pool_' needs to enforce that, for a single
+ // TabletReplica, PrepareTasks are executed *serially*.
+ // TODO move the prepare pool to TabletServer.
+ gscoped_ptr<ThreadPool> prepare_pool_;
+
+ // Pool that executes apply tasks for transactions. This is a multi-threaded
+ // pool, constructor-injected by either the Master (for system tables) or
+ // the Tablet server.
+ ThreadPool* apply_pool_;
+
+ scoped_refptr<server::Clock> clock_;
+
+ scoped_refptr<log::LogAnchorRegistry> log_anchor_registry_;
+
+ // Function to mark this TabletReplica's tablet as dirty in the TSTabletManager.
+ //
+ // Must be called whenever cluster membership or leadership changes, or when
+ // the tablet's schema changes.
+ Callback<void(const std::string& reason)> mark_dirty_clbk_;
+
+ // List of maintenance operations for the tablet that need information that only the peer
+ // can provide.
+ std::vector<MaintenanceOp*> maintenance_ops_;
+
+ // The result tracker for writes.
+ scoped_refptr<rpc::ResultTracker> result_tracker_;
+
+ DISALLOW_COPY_AND_ASSIGN(TabletReplica);
+};
+
+// A callback to wait for the in-flight transactions to complete and to flush
+// the Log when they do.
+// Tablet is passed as a raw pointer as this callback is set in TabletMetadata and
+// were we to keep the tablet as a shared_ptr a circular dependency would occur:
+// callback->tablet->metadata->callback. Since the tablet indirectly owns this
+// callback we know that is must still be alive when it fires.
+class FlushInflightsToLogCallback : public RefCountedThreadSafe<FlushInflightsToLogCallback> {
+ public:
+ FlushInflightsToLogCallback(Tablet* tablet,
+ const scoped_refptr<log::Log>& log)
+ : tablet_(tablet),
+ log_(log) {}
+
+ Status WaitForInflightsAndFlushLog();
+
+ private:
+ Tablet* tablet_;
+ scoped_refptr<log::Log> log_;
+};
+
+
+} // namespace tablet
+} // namespace kudu
+
+#endif /* KUDU_TABLET_TABLET_REPLICA_H_ */
http://git-wip-us.apache.org/repos/asf/kudu/blob/7f72105b/src/kudu/tablet/tablet_replica_mm_ops.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica_mm_ops.cc b/src/kudu/tablet/tablet_replica_mm_ops.cc
new file mode 100644
index 0000000..85650ff
--- /dev/null
+++ b/src/kudu/tablet/tablet_replica_mm_ops.cc
@@ -0,0 +1,246 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/tablet/tablet_replica_mm_ops.h"
+
+#include <algorithm>
+#include <gflags/gflags.h>
+#include <map>
+#include <mutex>
+#include <string>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/tablet/tablet_metrics.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/maintenance_manager.h"
+#include "kudu/util/metrics.h"
+
+DEFINE_int32(flush_threshold_mb, 1024,
+ "Size at which MemRowSet flushes are triggered. "
+ "A MRS can still flush below this threshold if it if hasn't flushed in a while, "
+ "or if the server-wide memory limit has been reached.");
+TAG_FLAG(flush_threshold_mb, experimental);
+
+DEFINE_int32(flush_threshold_secs, 2 * 60,
+ "Number of seconds after which a non-empty MemRowSet will become flushable "
+ "even if it is not large.");
+TAG_FLAG(flush_threshold_secs, experimental);
+
+
+METRIC_DEFINE_gauge_uint32(tablet, log_gc_running,
+ "Log GCs Running",
+ kudu::MetricUnit::kOperations,
+ "Number of log GC operations currently running.");
+METRIC_DEFINE_histogram(tablet, log_gc_duration,
+ "Log GC Duration",
+ kudu::MetricUnit::kMilliseconds,
+ "Time spent garbage collecting the logs.", 60000LU, 1);
+
+namespace kudu {
+namespace tablet {
+
+using std::map;
+using strings::Substitute;
+
+// Upper bound for how long it takes to reach "full perf improvement" in time-based flushing.
+const double kFlushUpperBoundMs = 60 * 60 * 1000;
+
+//
+// FlushOpPerfImprovementPolicy.
+//
+
+void FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(MaintenanceOpStats* stats,
+ double elapsed_ms) {
+ if (stats->ram_anchored() > FLAGS_flush_threshold_mb * 1024 * 1024) {
+ // If we're over the user-specified flush threshold, then consider the perf
+ // improvement to be 1 for every extra MB. This produces perf_improvement results
+ // which are much higher than any compaction would produce, and means that, when
+ // there is an MRS over threshold, a flush will almost always be selected instead of
+ // a compaction. That's not necessarily a good thing, but in the absence of better
+ // heuristics, it will do for now.
+ double extra_mb =
+ static_cast<double>(FLAGS_flush_threshold_mb - (stats->ram_anchored()) / (1024 * 1024));
+ stats->set_perf_improvement(extra_mb);
+ } else if (elapsed_ms > FLAGS_flush_threshold_secs * 1000) {
+ // Even if we aren't over the threshold, consider flushing if we haven't flushed
+ // in a long time. But, don't give it a large perf_improvement score. We should
+ // only do this if we really don't have much else to do, and if we've already waited a bit.
+ // The following will give an improvement that's between 0.0 and 1.0, gradually growing
+ // as 'elapsed_ms' approaches 'kFlushUpperBoundMs'.
+ double perf = elapsed_ms / kFlushUpperBoundMs;
+ if (perf > 1.0) {
+ perf = 1.0;
+ }
+ stats->set_perf_improvement(perf);
+ }
+}
+
+//
+// FlushMRSOp.
+//
+
+void FlushMRSOp::UpdateStats(MaintenanceOpStats* stats) {
+ std::lock_guard<simple_spinlock> l(lock_);
+
+ map<int64_t, int64_t> replay_size_map;
+ if (tablet_replica_->tablet()->MemRowSetEmpty() ||
+ !tablet_replica_->GetReplaySizeMap(&replay_size_map).ok()) {
+ return;
+ }
+
+ {
+ std::unique_lock<Semaphore> lock(tablet_replica_->tablet()->rowsets_flush_sem_,
+ std::defer_lock);
+ stats->set_runnable(lock.try_lock());
+ }
+
+ stats->set_ram_anchored(tablet_replica_->tablet()->MemRowSetSize());
+ stats->set_logs_retained_bytes(
+ tablet_replica_->tablet()->MemRowSetLogReplaySize(replay_size_map));
+
+ // TODO(todd): use workload statistics here to find out how "hot" the tablet has
+ // been in the last 5 minutes.
+ FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(
+ stats,
+ time_since_flush_.elapsed().wall_millis());
+}
+
+bool FlushMRSOp::Prepare() {
+ // Try to acquire the rowsets_flush_sem_. If we can't, the Prepare step
+ // fails. This also implies that only one instance of FlushMRSOp can be
+ // running at once.
+ return tablet_replica_->tablet()->rowsets_flush_sem_.try_lock();
+}
+
+void FlushMRSOp::Perform() {
+ CHECK(!tablet_replica_->tablet()->rowsets_flush_sem_.try_lock());
+
+ KUDU_CHECK_OK_PREPEND(tablet_replica_->tablet()->FlushUnlocked(),
+ Substitute("FlushMRS failed on $0", tablet_replica_->tablet_id()));
+
+ {
+ std::lock_guard<simple_spinlock> l(lock_);
+ time_since_flush_.start();
+ }
+ tablet_replica_->tablet()->rowsets_flush_sem_.unlock();
+}
+
+scoped_refptr<Histogram> FlushMRSOp::DurationHistogram() const {
+ return tablet_replica_->tablet()->metrics()->flush_mrs_duration;
+}
+
+scoped_refptr<AtomicGauge<uint32_t> > FlushMRSOp::RunningGauge() const {
+ return tablet_replica_->tablet()->metrics()->flush_mrs_running;
+}
+
+//
+// FlushDeltaMemStoresOp.
+//
+
+void FlushDeltaMemStoresOp::UpdateStats(MaintenanceOpStats* stats) {
+ std::lock_guard<simple_spinlock> l(lock_);
+ int64_t dms_size;
+ int64_t retention_size;
+ map<int64_t, int64_t> max_idx_to_replay_size;
+ if (tablet_replica_->tablet()->DeltaMemRowSetEmpty() ||
+ !tablet_replica_->GetReplaySizeMap(&max_idx_to_replay_size).ok()) {
+ return;
+ }
+ tablet_replica_->tablet()->GetInfoForBestDMSToFlush(max_idx_to_replay_size,
+ &dms_size, &retention_size);
+
+ stats->set_ram_anchored(dms_size);
+ stats->set_runnable(true);
+ stats->set_logs_retained_bytes(retention_size);
+
+ FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(
+ stats,
+ time_since_flush_.elapsed().wall_millis());
+}
+
+void FlushDeltaMemStoresOp::Perform() {
+ map<int64_t, int64_t> max_idx_to_replay_size;
+ if (!tablet_replica_->GetReplaySizeMap(&max_idx_to_replay_size).ok()) {
+ LOG(WARNING) << "Won't flush deltas since tablet shutting down: "
+ << tablet_replica_->tablet_id();
+ return;
+ }
+ KUDU_CHECK_OK_PREPEND(tablet_replica_->tablet()->FlushDMSWithHighestRetention(
+ max_idx_to_replay_size),
+ Substitute("Failed to flush DMS on $0",
+ tablet_replica_->tablet()->tablet_id()));
+ {
+ std::lock_guard<simple_spinlock> l(lock_);
+ time_since_flush_.start();
+ }
+}
+
+scoped_refptr<Histogram> FlushDeltaMemStoresOp::DurationHistogram() const {
+ return tablet_replica_->tablet()->metrics()->flush_dms_duration;
+}
+
+scoped_refptr<AtomicGauge<uint32_t> > FlushDeltaMemStoresOp::RunningGauge() const {
+ return tablet_replica_->tablet()->metrics()->flush_dms_running;
+}
+
+//
+// LogGCOp.
+//
+
+LogGCOp::LogGCOp(TabletReplica* tablet_replica)
+ : MaintenanceOp(StringPrintf("LogGCOp(%s)", tablet_replica->tablet()->tablet_id().c_str()),
+ MaintenanceOp::LOW_IO_USAGE),
+ tablet_replica_(tablet_replica),
+ log_gc_duration_(METRIC_log_gc_duration.Instantiate(
+ tablet_replica->tablet()->GetMetricEntity())),
+ log_gc_running_(METRIC_log_gc_running.Instantiate(
+ tablet_replica->tablet()->GetMetricEntity(), 0)),
+ sem_(1) {}
+
+void LogGCOp::UpdateStats(MaintenanceOpStats* stats) {
+ int64_t retention_size;
+
+ if (!tablet_replica_->GetGCableDataSize(&retention_size).ok()) {
+ return;
+ }
+
+ stats->set_logs_retained_bytes(retention_size);
+ stats->set_runnable(sem_.GetValue() == 1);
+}
+
+bool LogGCOp::Prepare() {
+ return sem_.try_lock();
+}
+
+void LogGCOp::Perform() {
+ CHECK(!sem_.try_lock());
+
+ tablet_replica_->RunLogGC();
+
+ sem_.unlock();
+}
+
+scoped_refptr<Histogram> LogGCOp::DurationHistogram() const {
+ return log_gc_duration_;
+}
+
+scoped_refptr<AtomicGauge<uint32_t> > LogGCOp::RunningGauge() const {
+ return log_gc_running_;
+}
+
+} // namespace tablet
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/7f72105b/src/kudu/tablet/tablet_replica_mm_ops.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica_mm_ops.h b/src/kudu/tablet/tablet_replica_mm_ops.h
new file mode 100644
index 0000000..fc391ed
--- /dev/null
+++ b/src/kudu/tablet/tablet_replica_mm_ops.h
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_TABLET_TABLET_REPLICA_MM_OPS_H_
+#define KUDU_TABLET_TABLET_REPLICA_MM_OPS_H_
+
+#include "kudu/tablet/tablet_replica.h"
+#include "kudu/util/maintenance_manager.h"
+#include "kudu/util/stopwatch.h"
+
+namespace kudu {
+
+class Histogram;
+template<class T>
+class AtomicGauge;
+
+namespace tablet {
+
+class FlushOpPerfImprovementPolicy {
+ public:
+ ~FlushOpPerfImprovementPolicy() {}
+
+ // Sets the performance improvement based on the anchored ram if it's over the threshold,
+ // else it will set it based on how long it has been since the last flush.
+ static void SetPerfImprovementForFlush(MaintenanceOpStats* stats, double elapsed_ms);
+
+ private:
+ FlushOpPerfImprovementPolicy() {}
+};
+
+// Maintenance op for MRS flush. Only one can happen at a time.
+class FlushMRSOp : public MaintenanceOp {
+ public:
+ explicit FlushMRSOp(TabletReplica* tablet_replica)
+ : MaintenanceOp(StringPrintf("FlushMRSOp(%s)", tablet_replica->tablet()->tablet_id().c_str()),
+ MaintenanceOp::HIGH_IO_USAGE),
+ tablet_replica_(tablet_replica) {
+ time_since_flush_.start();
+ }
+
+ virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
+
+ virtual bool Prepare() OVERRIDE;
+
+ virtual void Perform() OVERRIDE;
+
+ virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE;
+
+ virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE;
+
+ private:
+ // Lock protecting time_since_flush_.
+ mutable simple_spinlock lock_;
+ Stopwatch time_since_flush_;
+
+ TabletReplica *const tablet_replica_;
+};
+
+// Maintenance op for DMS flush.
+// Reports stats for all the DMS this tablet contains but only flushes one in Perform().
+class FlushDeltaMemStoresOp : public MaintenanceOp {
+ public:
+ explicit FlushDeltaMemStoresOp(TabletReplica* tablet_replica)
+ : MaintenanceOp(StringPrintf("FlushDeltaMemStoresOp(%s)",
+ tablet_replica->tablet()->tablet_id().c_str()),
+ MaintenanceOp::HIGH_IO_USAGE),
+ tablet_replica_(tablet_replica) {
+ time_since_flush_.start();
+ }
+
+ virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
+
+ virtual bool Prepare() OVERRIDE {
+ return true;
+ }
+
+ virtual void Perform() OVERRIDE;
+
+ virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE;
+
+ virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE;
+
+ private:
+ // Lock protecting time_since_flush_
+ mutable simple_spinlock lock_;
+ Stopwatch time_since_flush_;
+
+ TabletReplica *const tablet_replica_;
+};
+
+// Maintenance task that runs log GC. Reports log retention that represents the amount of data
+// that can be GC'd.
+//
+// Only one LogGC op can run at a time.
+class LogGCOp : public MaintenanceOp {
+ public:
+ explicit LogGCOp(TabletReplica* tablet_replica);
+
+ virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
+
+ virtual bool Prepare() OVERRIDE;
+
+ virtual void Perform() OVERRIDE;
+
+ virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE;
+
+ virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE;
+
+ private:
+ TabletReplica *const tablet_replica_;
+ scoped_refptr<Histogram> log_gc_duration_;
+ scoped_refptr<AtomicGauge<uint32_t> > log_gc_running_;
+ mutable Semaphore sem_;
+};
+
+} // namespace tablet
+} // namespace kudu
+
+#endif /* KUDU_TABLET_TABLET_REPLICA_MM_OPS_H_ */
http://git-wip-us.apache.org/repos/asf/kudu/blob/7f72105b/src/kudu/tablet/transactions/alter_schema_transaction.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/alter_schema_transaction.cc b/src/kudu/tablet/transactions/alter_schema_transaction.cc
index 92a626d..38d2c56 100644
--- a/src/kudu/tablet/transactions/alter_schema_transaction.cc
+++ b/src/kudu/tablet/transactions/alter_schema_transaction.cc
@@ -23,7 +23,7 @@
#include "kudu/rpc/rpc_context.h"
#include "kudu/server/hybrid_clock.h"
#include "kudu/tablet/tablet.h"
-#include "kudu/tablet/tablet_peer.h"
+#include "kudu/tablet/tablet_replica.h"
#include "kudu/tablet/tablet_metrics.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/util/pb_util.h"
@@ -86,7 +86,7 @@ Status AlterSchemaTransaction::Prepare() {
return s;
}
- Tablet* tablet = state_->tablet_peer()->tablet();
+ Tablet* tablet = state_->tablet_replica()->tablet();
RETURN_NOT_OK(tablet->CreatePreparedAlterSchema(state(), schema.get()));
state_->AddToAutoReleasePool(schema.release());
@@ -106,15 +106,15 @@ Status AlterSchemaTransaction::Start() {
Status AlterSchemaTransaction::Apply(gscoped_ptr<CommitMsg>* commit_msg) {
TRACE("APPLY ALTER-SCHEMA: Starting");
- Tablet* tablet = state_->tablet_peer()->tablet();
+ Tablet* tablet = state_->tablet_replica()->tablet();
RETURN_NOT_OK(tablet->AlterSchema(state()));
- state_->tablet_peer()->log()
+ state_->tablet_replica()->log()
->SetSchemaForNextLogSegment(*DCHECK_NOTNULL(state_->schema()),
state_->schema_version());
// Altered tablets should be included in the next tserver heartbeat so that
// clients waiting on IsAlterTableDone() are unblocked promptly.
- state_->tablet_peer()->MarkTabletDirty("Alter schema finished");
+ state_->tablet_replica()->MarkTabletDirty("Alter schema finished");
commit_msg->reset(new CommitMsg());
(*commit_msg)->set_op_type(ALTER_SCHEMA_OP);
http://git-wip-us.apache.org/repos/asf/kudu/blob/7f72105b/src/kudu/tablet/transactions/alter_schema_transaction.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/alter_schema_transaction.h b/src/kudu/tablet/transactions/alter_schema_transaction.h
index cfcae3f..e1e0ab9 100644
--- a/src/kudu/tablet/transactions/alter_schema_transaction.h
+++ b/src/kudu/tablet/transactions/alter_schema_transaction.h
@@ -42,10 +42,10 @@ class AlterSchemaTransactionState : public TransactionState {
~AlterSchemaTransactionState() {
}
- AlterSchemaTransactionState(TabletPeer* tablet_peer,
+ AlterSchemaTransactionState(TabletReplica* tablet_replica,
const tserver::AlterSchemaRequestPB* request,
tserver::AlterSchemaResponsePB* response)
- : TransactionState(tablet_peer),
+ : TransactionState(tablet_replica),
schema_(NULL),
request_(request),
response_(response) {
http://git-wip-us.apache.org/repos/asf/kudu/blob/7f72105b/src/kudu/tablet/transactions/transaction.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction.cc b/src/kudu/tablet/transactions/transaction.cc
index b8681a1..efa6688 100644
--- a/src/kudu/tablet/transactions/transaction.cc
+++ b/src/kudu/tablet/transactions/transaction.cc
@@ -28,8 +28,8 @@ Transaction::Transaction(TransactionState* state, DriverType type, TransactionTy
tx_type_(tx_type) {
}
-TransactionState::TransactionState(TabletPeer* tablet_peer)
- : tablet_peer_(tablet_peer),
+TransactionState::TransactionState(TabletReplica* tablet_replica)
+ : tablet_replica_(tablet_replica),
completion_clbk_(new TransactionCompletionCallback()),
timestamp_error_(0),
arena_(1024, 4 * 1024 * 1024),
http://git-wip-us.apache.org/repos/asf/kudu/blob/7f72105b/src/kudu/tablet/transactions/transaction.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction.h b/src/kudu/tablet/transactions/transaction.h
index 4d8b4d5..f0696a6 100644
--- a/src/kudu/tablet/transactions/transaction.h
+++ b/src/kudu/tablet/transactions/transaction.h
@@ -37,7 +37,7 @@ class ResultTracker;
} // namespace rpc
namespace tablet {
-class TabletPeer;
+class TabletReplica;
class TransactionCompletionCallback;
class TransactionState;
@@ -169,8 +169,8 @@ class TransactionState {
return consensus_round_.get();
}
- TabletPeer* tablet_peer() const {
- return tablet_peer_;
+ TabletReplica* tablet_replica() const {
+ return tablet_replica_;
}
// Return metrics related to this transaction.
@@ -257,13 +257,13 @@ class TransactionState {
}
protected:
- explicit TransactionState(TabletPeer* tablet_peer);
+ explicit TransactionState(TabletReplica* tablet_replica);
virtual ~TransactionState();
TransactionMetrics tx_metrics_;
- // The tablet peer that is coordinating this transaction.
- TabletPeer* const tablet_peer_;
+ // The TabletReplica that is coordinating this transaction.
+ TabletReplica* const tablet_replica_;
// The result tracker that will cache the result of this transaction.
scoped_refptr<rpc::ResultTracker> result_tracker_;
http://git-wip-us.apache.org/repos/asf/kudu/blob/7f72105b/src/kudu/tablet/transactions/transaction_driver.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction_driver.cc b/src/kudu/tablet/transactions/transaction_driver.cc
index 3050f34..cb6bf64 100644
--- a/src/kudu/tablet/transactions/transaction_driver.cc
+++ b/src/kudu/tablet/transactions/transaction_driver.cc
@@ -23,7 +23,7 @@
#include "kudu/consensus/time_manager.h"
#include "kudu/gutil/strings/strcat.h"
#include "kudu/rpc/result_tracker.h"
-#include "kudu/tablet/tablet_peer.h"
+#include "kudu/tablet/tablet_replica.h"
#include "kudu/tablet/transactions/transaction_tracker.h"
#include "kudu/util/debug-util.h"
#include "kudu/util/debug/trace_event.h"
@@ -442,7 +442,7 @@ Status TransactionDriver::ApplyAsync() {
order_verifier_->CheckApply(op_id_copy_.index(), prepare_physical_timestamp_);
// Now that the transaction is committed in consensus advance the safe time.
if (transaction_->state()->external_consistency_mode() != COMMIT_WAIT) {
- transaction_->state()->tablet_peer()->tablet()->mvcc_manager()->
+ transaction_->state()->tablet_replica()->tablet()->mvcc_manager()->
AdjustSafeTime(transaction_->state()->timestamp());
}
} else {
@@ -515,7 +515,7 @@ void TransactionDriver::SetResponseTimestamp(TransactionState* transaction_state
Status TransactionDriver::CommitWait() {
MonoTime before = MonoTime::Now();
DCHECK(mutable_state()->external_consistency_mode() == COMMIT_WAIT);
- RETURN_NOT_OK(mutable_state()->tablet_peer()->clock()->WaitUntilAfter(
+ RETURN_NOT_OK(mutable_state()->tablet_replica()->clock()->WaitUntilAfter(
mutable_state()->timestamp(), MonoTime::Max()));
mutable_state()->mutable_metrics()->commit_wait_duration_usec =
(MonoTime::Now() - before).ToMicroseconds();
http://git-wip-us.apache.org/repos/asf/kudu/blob/7f72105b/src/kudu/tablet/transactions/transaction_tracker.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction_tracker.cc b/src/kudu/tablet/transactions/transaction_tracker.cc
index 3661747..3ac55a9 100644
--- a/src/kudu/tablet/transactions/transaction_tracker.cc
+++ b/src/kudu/tablet/transactions/transaction_tracker.cc
@@ -23,7 +23,7 @@
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/substitute.h"
-#include "kudu/tablet/tablet_peer.h"
+#include "kudu/tablet/tablet_replica.h"
#include "kudu/tablet/transactions/transaction_driver.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/logging.h"
@@ -97,12 +97,12 @@ Status TransactionTracker::Add(TransactionDriver* driver) {
}
// May be null in unit tests.
- TabletPeer* peer = driver->state()->tablet_peer();
+ TabletReplica* replica = driver->state()->tablet_replica();
string msg = Substitute(
"Transaction failed, tablet $0 transaction memory consumption ($1) "
"has exceeded its limit ($2) or the limit of an ancestral tracker",
- peer ? peer->tablet()->tablet_id() : "(unknown)",
+ replica ? replica->tablet()->tablet_id() : "(unknown)",
mem_tracker_->consumption(), mem_tracker_->limit());
KLOG_EVERY_N_SECS(WARNING, 1) << msg << THROTTLE_MSG;
http://git-wip-us.apache.org/repos/asf/kudu/blob/7f72105b/src/kudu/tablet/transactions/transaction_tracker.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/transactions/transaction_tracker.h b/src/kudu/tablet/transactions/transaction_tracker.h
index 88a9103..3a3725b 100644
--- a/src/kudu/tablet/transactions/transaction_tracker.h
+++ b/src/kudu/tablet/transactions/transaction_tracker.h
@@ -39,7 +39,7 @@ class MetricEntity;
namespace tablet {
class TransactionDriver;
-// Each TabletPeer has a TransactionTracker which keeps track of pending transactions.
+// Each TabletReplica has a TransactionTracker which keeps track of pending transactions.
// Each "LeaderTransaction" will register itself by calling Add().
// It will remove itself by calling Release().
class TransactionTracker {