You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2017/05/04 22:14:29 UTC
[4/5] kudu git commit: Rename TabletPeer to TabletReplica
http://git-wip-us.apache.org/repos/asf/kudu/blob/7f72105b/src/kudu/tablet/tablet_peer-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_peer-test.cc b/src/kudu/tablet/tablet_peer-test.cc
deleted file mode 100644
index 95c7d0c..0000000
--- a/src/kudu/tablet/tablet_peer-test.cc
+++ /dev/null
@@ -1,564 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <glog/logging.h>
-#include <gtest/gtest.h>
-
-#include "kudu/common/partial_row.h"
-#include "kudu/common/timestamp.h"
-#include "kudu/common/wire_protocol.h"
-#include "kudu/common/wire_protocol-test-util.h"
-#include "kudu/consensus/consensus_meta.h"
-#include "kudu/consensus/log.h"
-#include "kudu/consensus/log_reader.h"
-#include "kudu/consensus/log_util.h"
-#include "kudu/consensus/opid_util.h"
-#include "kudu/gutil/gscoped_ptr.h"
-#include "kudu/gutil/macros.h"
-#include "kudu/rpc/messenger.h"
-#include "kudu/server/clock.h"
-#include "kudu/server/logical_clock.h"
-#include "kudu/tablet/transactions/transaction.h"
-#include "kudu/tablet/transactions/transaction_driver.h"
-#include "kudu/tablet/transactions/write_transaction.h"
-#include "kudu/tablet/tablet_peer.h"
-#include "kudu/tablet/tablet_peer_mm_ops.h"
-#include "kudu/tablet/tablet-test-util.h"
-#include "kudu/tserver/tserver.pb.h"
-#include "kudu/util/maintenance_manager.h"
-#include "kudu/util/metrics.h"
-#include "kudu/util/pb_util.h"
-#include "kudu/util/test_macros.h"
-#include "kudu/util/test_util.h"
-#include "kudu/util/threadpool.h"
-
-METRIC_DECLARE_entity(tablet);
-
-DECLARE_int32(flush_threshold_mb);
-
-namespace kudu {
-namespace tablet {
-
-using consensus::CommitMsg;
-using consensus::Consensus;
-using consensus::ConsensusBootstrapInfo;
-using consensus::ConsensusMetadata;
-using consensus::MakeOpId;
-using consensus::MinimumOpId;
-using consensus::OpId;
-using consensus::OpIdEquals;
-using consensus::RaftPeerPB;
-using consensus::WRITE_OP;
-using log::Log;
-using log::LogAnchorRegistry;
-using log::LogOptions;
-using rpc::Messenger;
-using server::Clock;
-using server::LogicalClock;
-using std::shared_ptr;
-using std::string;
-using std::unique_ptr;
-using strings::Substitute;
-using tserver::WriteRequestPB;
-using tserver::WriteResponsePB;
-
-static Schema GetTestSchema() {
- return Schema({ ColumnSchema("key", INT32) }, 1);
-}
-
-class TabletPeerTest : public KuduTabletTest {
- public:
- TabletPeerTest()
- : KuduTabletTest(GetTestSchema()),
- insert_counter_(0),
- delete_counter_(0) {
- }
-
- virtual void SetUp() OVERRIDE {
- KuduTabletTest::SetUp();
-
- ASSERT_OK(ThreadPoolBuilder("apply").Build(&apply_pool_));
-
- rpc::MessengerBuilder builder(CURRENT_TEST_NAME());
- ASSERT_OK(builder.Build(&messenger_));
-
- metric_entity_ = METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "test-tablet");
-
- RaftPeerPB config_peer;
- config_peer.set_permanent_uuid(tablet()->metadata()->fs_manager()->uuid());
- config_peer.mutable_last_known_addr()->set_host("0.0.0.0");
- config_peer.mutable_last_known_addr()->set_port(0);
- config_peer.set_member_type(RaftPeerPB::VOTER);
-
- // "Bootstrap" and start the TabletPeer.
- tablet_peer_.reset(
- new TabletPeer(make_scoped_refptr(tablet()->metadata()),
- config_peer,
- apply_pool_.get(),
- Bind(&TabletPeerTest::TabletPeerStateChangedCallback,
- Unretained(this),
- tablet()->tablet_id())));
-
- // Make TabletPeer use the same LogAnchorRegistry as the Tablet created by the harness.
- // TODO: Refactor TabletHarness to allow taking a LogAnchorRegistry, while also providing
- // TabletMetadata for consumption by TabletPeer before Tablet is instantiated.
- tablet_peer_->log_anchor_registry_ = tablet()->log_anchor_registry_;
-
- RaftConfigPB config;
- config.add_peers()->CopyFrom(config_peer);
- config.set_opid_index(consensus::kInvalidOpIdIndex);
-
- unique_ptr<ConsensusMetadata> cmeta;
- ASSERT_OK(ConsensusMetadata::Create(tablet()->metadata()->fs_manager(),
- tablet()->tablet_id(),
- tablet()->metadata()->fs_manager()->uuid(),
- config,
- consensus::kMinimumTerm, &cmeta));
-
- scoped_refptr<Log> log;
- ASSERT_OK(Log::Open(LogOptions(), fs_manager(), tablet()->tablet_id(),
- *tablet()->schema(), tablet()->metadata()->schema_version(),
- metric_entity_.get(), &log));
-
- tablet_peer_->SetBootstrapping();
- ASSERT_OK(tablet_peer_->Init(tablet(),
- clock(),
- messenger_,
- scoped_refptr<rpc::ResultTracker>(),
- log,
- metric_entity_));
- }
-
- Status StartPeer(const ConsensusBootstrapInfo& info) {
- RETURN_NOT_OK(tablet_peer_->Start(info));
- RETURN_NOT_OK(tablet_peer_->consensus()->WaitUntilLeaderForTests(MonoDelta::FromSeconds(10)));
- return Status::OK();
- }
-
- void TabletPeerStateChangedCallback(const string& tablet_id, const string& reason) {
- LOG(INFO) << "Tablet peer state changed for tablet " << tablet_id << ". Reason: " << reason;
- }
-
- virtual void TearDown() OVERRIDE {
- tablet_peer_->Shutdown();
- apply_pool_->Shutdown();
- KuduTabletTest::TearDown();
- }
-
- protected:
- // Generate monotonic sequence of key column integers.
- Status GenerateSequentialInsertRequest(WriteRequestPB* write_req) {
- Schema schema(GetTestSchema());
- write_req->set_tablet_id(tablet()->tablet_id());
- CHECK_OK(SchemaToPB(schema, write_req->mutable_schema()));
-
- KuduPartialRow row(&schema);
- CHECK_OK(row.SetInt32("key", insert_counter_++));
-
- RowOperationsPBEncoder enc(write_req->mutable_row_operations());
- enc.Add(RowOperationsPB::INSERT, row);
- return Status::OK();
- }
-
- // Generate monotonic sequence of deletions, starting with 0.
- // Will assert if you try to delete more rows than you inserted.
- Status GenerateSequentialDeleteRequest(WriteRequestPB* write_req) {
- CHECK_LT(delete_counter_, insert_counter_);
- Schema schema(GetTestSchema());
- write_req->set_tablet_id(tablet()->tablet_id());
- CHECK_OK(SchemaToPB(schema, write_req->mutable_schema()));
-
- KuduPartialRow row(&schema);
- CHECK_OK(row.SetInt32("key", delete_counter_++));
-
- RowOperationsPBEncoder enc(write_req->mutable_row_operations());
- enc.Add(RowOperationsPB::DELETE, row);
- return Status::OK();
- }
-
- Status ExecuteWriteAndRollLog(TabletPeer* tablet_peer, const WriteRequestPB& req) {
- gscoped_ptr<WriteResponsePB> resp(new WriteResponsePB());
- unique_ptr<WriteTransactionState> tx_state(new WriteTransactionState(tablet_peer,
- &req,
- nullptr, // No RequestIdPB
- resp.get()));
-
- CountDownLatch rpc_latch(1);
- tx_state->set_completion_callback(gscoped_ptr<TransactionCompletionCallback>(
- new LatchTransactionCompletionCallback<WriteResponsePB>(&rpc_latch, resp.get())));
-
- CHECK_OK(tablet_peer->SubmitWrite(std::move(tx_state)));
- rpc_latch.Wait();
- CHECK(!resp->has_error())
- << "\nReq:\n" << SecureDebugString(req) << "Resp:\n" << SecureDebugString(*resp);
-
- // Roll the log after each write.
- // Usually the append thread does the roll and no additional sync is required. However in
- // this test the thread that is appending is not the same thread that is rolling the log
- // so we must make sure the Log's queue is flushed before we roll or we might have a race
- // between the appender thread and the thread executing the test.
- CHECK_OK(tablet_peer->log_->WaitUntilAllFlushed());
- CHECK_OK(tablet_peer->log_->AllocateSegmentAndRollOver());
- return Status::OK();
- }
-
- // Execute insert requests and roll log after each one.
- Status ExecuteInsertsAndRollLogs(int num_inserts) {
- for (int i = 0; i < num_inserts; i++) {
- gscoped_ptr<WriteRequestPB> req(new WriteRequestPB());
- RETURN_NOT_OK(GenerateSequentialInsertRequest(req.get()));
- RETURN_NOT_OK(ExecuteWriteAndRollLog(tablet_peer_.get(), *req));
- }
-
- return Status::OK();
- }
-
- // Execute delete requests and roll log after each one.
- Status ExecuteDeletesAndRollLogs(int num_deletes) {
- for (int i = 0; i < num_deletes; i++) {
- gscoped_ptr<WriteRequestPB> req(new WriteRequestPB());
- CHECK_OK(GenerateSequentialDeleteRequest(req.get()));
- CHECK_OK(ExecuteWriteAndRollLog(tablet_peer_.get(), *req));
- }
-
- return Status::OK();
- }
-
- void AssertNoLogAnchors() {
- // Make sure that there are no registered anchors in the registry
- CHECK_EQ(0, tablet_peer_->log_anchor_registry()->GetAnchorCountForTests());
- }
-
- // Assert that the Log GC() anchor is earlier than the latest OpId in the Log.
- void AssertLogAnchorEarlierThanLogLatest() {
- log::RetentionIndexes retention = tablet_peer_->GetRetentionIndexes();
- OpId last_log_opid;
- tablet_peer_->log_->GetLatestEntryOpId(&last_log_opid);
- CHECK_LT(retention.for_durability, last_log_opid.index())
- << "Expected valid log anchor, got earliest opid: " << retention.for_durability
- << " (expected any value earlier than last log id: " << SecureShortDebugString(last_log_opid)
- << ")";
- }
-
- // We disable automatic log GC. Don't leak those changes.
- google::FlagSaver flag_saver_;
-
- int32_t insert_counter_;
- int32_t delete_counter_;
- MetricRegistry metric_registry_;
- scoped_refptr<MetricEntity> metric_entity_;
- shared_ptr<Messenger> messenger_;
- scoped_refptr<TabletPeer> tablet_peer_;
- gscoped_ptr<ThreadPool> apply_pool_;
-};
-
-// A Transaction that waits on the apply_continue latch inside of Apply().
-class DelayedApplyTransaction : public WriteTransaction {
- public:
- DelayedApplyTransaction(CountDownLatch* apply_started,
- CountDownLatch* apply_continue,
- unique_ptr<WriteTransactionState> state)
- : WriteTransaction(std::move(state), consensus::LEADER),
- apply_started_(DCHECK_NOTNULL(apply_started)),
- apply_continue_(DCHECK_NOTNULL(apply_continue)) {
- }
-
- virtual Status Apply(gscoped_ptr<CommitMsg>* commit_msg) OVERRIDE {
- apply_started_->CountDown();
- LOG(INFO) << "Delaying apply...";
- apply_continue_->Wait();
- LOG(INFO) << "Apply proceeding";
- return WriteTransaction::Apply(commit_msg);
- }
-
- private:
- CountDownLatch* apply_started_;
- CountDownLatch* apply_continue_;
- DISALLOW_COPY_AND_ASSIGN(DelayedApplyTransaction);
-};
-
-// Ensure that Log::GC() doesn't delete logs when the MRS has an anchor.
-TEST_F(TabletPeerTest, TestMRSAnchorPreventsLogGC) {
- ConsensusBootstrapInfo info;
- ASSERT_OK(StartPeer(info));
-
- Log* log = tablet_peer_->log_.get();
- int32_t num_gced;
-
- AssertNoLogAnchors();
-
- log::SegmentSequence segments;
- ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
-
- ASSERT_EQ(1, segments.size());
- ASSERT_OK(ExecuteInsertsAndRollLogs(3));
- ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
- ASSERT_EQ(4, segments.size());
-
- AssertLogAnchorEarlierThanLogLatest();
- ASSERT_GT(tablet_peer_->log_anchor_registry()->GetAnchorCountForTests(), 0);
-
- // Ensure nothing gets deleted.
- log::RetentionIndexes retention = tablet_peer_->GetRetentionIndexes();
- ASSERT_OK(log->GC(retention, &num_gced));
- ASSERT_EQ(0, num_gced) << "earliest needed: " << retention.for_durability;
-
- // Flush MRS as needed to ensure that we don't have OpId anchors in the MRS.
- tablet_peer_->tablet()->Flush();
- AssertNoLogAnchors();
-
- // The first two segments should be deleted.
- // The last is anchored due to the commit in the last segment being the last
- // OpId in the log.
- retention = tablet_peer_->GetRetentionIndexes();
- ASSERT_OK(log->GC(retention, &num_gced));
- ASSERT_EQ(2, num_gced) << "earliest needed: " << retention.for_durability;
- ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
- ASSERT_EQ(2, segments.size());
-}
-
-// Ensure that Log::GC() doesn't delete logs when the DMS has an anchor.
-TEST_F(TabletPeerTest, TestDMSAnchorPreventsLogGC) {
- ConsensusBootstrapInfo info;
- ASSERT_OK(StartPeer(info));
-
- Log* log = tablet_peer_->log_.get();
- int32_t num_gced;
-
- AssertNoLogAnchors();
-
- log::SegmentSequence segments;
- ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
-
- ASSERT_EQ(1, segments.size());
- ASSERT_OK(ExecuteInsertsAndRollLogs(2));
- ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
- ASSERT_EQ(3, segments.size());
-
- // Flush MRS & GC log so the next mutation goes into a DMS.
- ASSERT_OK(tablet_peer_->tablet()->Flush());
- log::RetentionIndexes retention = tablet_peer_->GetRetentionIndexes();
- ASSERT_OK(log->GC(retention, &num_gced));
- // We will only GC 1, and have 1 left because the earliest needed OpId falls
- // back to the latest OpId written to the Log if no anchors are set.
- ASSERT_EQ(1, num_gced);
- ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
- ASSERT_EQ(2, segments.size());
- AssertNoLogAnchors();
-
- OpId id;
- log->GetLatestEntryOpId(&id);
- LOG(INFO) << "Before: " << SecureShortDebugString(id);
-
-
- // We currently have no anchors and the last operation in the log is 0.3
- // Before the below was ExecuteDeletesAndRollLogs(1) but that was breaking
- // what I think is a wrong assertion.
- // I.e. since 0.4 is the last operation that we know is in memory 0.4 is the
- // last anchor we expect _and_ it's the last op in the log.
- // Only if we apply two operations is the last anchored operation and the
- // last operation in the log different.
-
- // Execute a mutation.
- ASSERT_OK(ExecuteDeletesAndRollLogs(2));
- AssertLogAnchorEarlierThanLogLatest();
- ASSERT_GT(tablet_peer_->log_anchor_registry()->GetAnchorCountForTests(), 0);
- ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
- ASSERT_EQ(4, segments.size());
-
- // Execute another couple inserts, but Flush it so it doesn't anchor.
- ASSERT_OK(ExecuteInsertsAndRollLogs(2));
- ASSERT_OK(tablet_peer_->tablet()->Flush());
- ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
- ASSERT_EQ(6, segments.size());
-
- // Ensure the delta and last insert remain in the logs, anchored by the delta.
- // Note that this will allow GC of the 2nd insert done above.
- retention = tablet_peer_->GetRetentionIndexes();
- ASSERT_OK(log->GC(retention, &num_gced));
- ASSERT_EQ(1, num_gced);
- ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
- ASSERT_EQ(5, segments.size());
-
- // Flush DMS to release the anchor.
- tablet_peer_->tablet()->FlushBiggestDMS();
-
- // Verify no anchors after Flush().
- AssertNoLogAnchors();
-
- // We should only hang onto one segment due to no anchors.
- // The last log OpId is the commit in the last segment, so it only anchors
- // that segment, not the previous, because it's not the first OpId in the
- // segment.
- retention = tablet_peer_->GetRetentionIndexes();
- ASSERT_OK(log->GC(retention, &num_gced));
- ASSERT_EQ(3, num_gced);
- ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
- ASSERT_EQ(2, segments.size());
-}
-
-// Ensure that Log::GC() doesn't compact logs with OpIds of active transactions.
-TEST_F(TabletPeerTest, TestActiveTransactionPreventsLogGC) {
- ConsensusBootstrapInfo info;
- ASSERT_OK(StartPeer(info));
-
- Log* log = tablet_peer_->log_.get();
- int32_t num_gced;
-
- AssertNoLogAnchors();
-
- log::SegmentSequence segments;
- ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
-
- ASSERT_EQ(1, segments.size());
- ASSERT_OK(ExecuteInsertsAndRollLogs(4));
- ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
- ASSERT_EQ(5, segments.size());
-
- // Flush MRS as needed to ensure that we don't have OpId anchors in the MRS.
- ASSERT_EQ(1, tablet_peer_->log_anchor_registry()->GetAnchorCountForTests());
- tablet_peer_->tablet()->Flush();
-
- // Verify no anchors after Flush().
- AssertNoLogAnchors();
-
- // Now create a long-lived Transaction that hangs during Apply().
- // Allow other transactions to go through. Logs should be populated, but the
- // long-lived Transaction should prevent the log from being deleted since it
- // is in-flight.
- CountDownLatch rpc_latch(1);
- CountDownLatch apply_started(1);
- CountDownLatch apply_continue(1);
- gscoped_ptr<WriteRequestPB> req(new WriteRequestPB());
- gscoped_ptr<WriteResponsePB> resp(new WriteResponsePB());
- {
- // Long-running mutation.
- ASSERT_OK(GenerateSequentialDeleteRequest(req.get()));
- unique_ptr<WriteTransactionState> tx_state(new WriteTransactionState(tablet_peer_.get(),
- req.get(),
- nullptr, // No RequestIdPB
- resp.get()));
-
- tx_state->set_completion_callback(gscoped_ptr<TransactionCompletionCallback>(
- new LatchTransactionCompletionCallback<WriteResponsePB>(&rpc_latch, resp.get())));
-
- gscoped_ptr<DelayedApplyTransaction> transaction(
- new DelayedApplyTransaction(&apply_started,
- &apply_continue,
- std::move(tx_state)));
-
- scoped_refptr<TransactionDriver> driver;
- ASSERT_OK(tablet_peer_->NewLeaderTransactionDriver(transaction.PassAs<Transaction>(),
- &driver));
-
- ASSERT_OK(driver->ExecuteAsync());
- apply_started.Wait();
- ASSERT_TRUE(driver->GetOpId().IsInitialized())
- << "By the time a transaction is applied, it should have an Opid";
- // The apply will hang until we CountDown() the continue latch.
- // Now, roll the log. Below, we execute a few more insertions with rolling.
- ASSERT_OK(log->AllocateSegmentAndRollOver());
- }
-
- ASSERT_EQ(1, tablet_peer_->txn_tracker_.GetNumPendingForTests());
- // The log anchor is currently equal to the latest OpId written to the Log
- // because we are delaying the Commit message with the CountDownLatch.
-
- // GC the first four segments created by the inserts.
- log::RetentionIndexes retention = tablet_peer_->GetRetentionIndexes();
- ASSERT_OK(log->GC(retention, &num_gced));
- ASSERT_EQ(4, num_gced);
- ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
- ASSERT_EQ(2, segments.size());
-
- // We use mutations here, since an MRS Flush() quiesces the tablet, and we
- // want to ensure the only thing "anchoring" is the TransactionTracker.
- ASSERT_OK(ExecuteDeletesAndRollLogs(3));
- ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
- ASSERT_EQ(5, segments.size());
- ASSERT_EQ(1, tablet_peer_->log_anchor_registry()->GetAnchorCountForTests());
- tablet_peer_->tablet()->FlushBiggestDMS();
- ASSERT_EQ(0, tablet_peer_->log_anchor_registry()->GetAnchorCountForTests());
- ASSERT_EQ(1, tablet_peer_->txn_tracker_.GetNumPendingForTests());
-
- AssertLogAnchorEarlierThanLogLatest();
-
- // Try to GC(), nothing should be deleted due to the in-flight transaction.
- retention = tablet_peer_->GetRetentionIndexes();
- ASSERT_OK(log->GC(retention, &num_gced));
- ASSERT_EQ(0, num_gced);
- ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
- ASSERT_EQ(5, segments.size());
-
- // Now we release the transaction and wait for everything to complete.
- // We fully quiesce and flush, which should release all anchors.
- ASSERT_EQ(1, tablet_peer_->txn_tracker_.GetNumPendingForTests());
- apply_continue.CountDown();
- rpc_latch.Wait();
- tablet_peer_->txn_tracker_.WaitForAllToFinish();
- ASSERT_EQ(0, tablet_peer_->txn_tracker_.GetNumPendingForTests());
- tablet_peer_->tablet()->FlushBiggestDMS();
- AssertNoLogAnchors();
-
- // All should be deleted except the two last segments.
- retention = tablet_peer_->GetRetentionIndexes();
- ASSERT_OK(log->GC(retention, &num_gced));
- ASSERT_EQ(3, num_gced);
- ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
- ASSERT_EQ(2, segments.size());
-}
-
-TEST_F(TabletPeerTest, TestGCEmptyLog) {
- ConsensusBootstrapInfo info;
- tablet_peer_->Start(info);
- // We don't wait on consensus on purpose.
- ASSERT_OK(tablet_peer_->RunLogGC());
-}
-
-TEST_F(TabletPeerTest, TestFlushOpsPerfImprovements) {
- FLAGS_flush_threshold_mb = 64;
-
- MaintenanceOpStats stats;
-
- // Just on the threshold and not enough time has passed for a time-based flush.
- stats.set_ram_anchored(64 * 1024 * 1024);
- FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(&stats, 1);
- ASSERT_EQ(0.0, stats.perf_improvement());
- stats.Clear();
-
- // Just on the threshold and enough time has passed, we'll have a low improvement.
- stats.set_ram_anchored(64 * 1024 * 1024);
- FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(&stats, 3 * 60 * 1000);
- ASSERT_GT(stats.perf_improvement(), 0.01);
- stats.Clear();
-
- // Way over the threshold, number is much higher than 1.
- stats.set_ram_anchored(128 * 1024 * 1024);
- FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(&stats, 1);
- ASSERT_LT(1.0, stats.perf_improvement());
- stats.Clear();
-
- // Below the threshold but have been there a long time, closing in to 1.0.
- stats.set_ram_anchored(30 * 1024 * 1024);
- FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(&stats, 60 * 50 * 1000);
- ASSERT_LT(0.7, stats.perf_improvement());
- ASSERT_GT(1.0, stats.perf_improvement());
- stats.Clear();
-}
-
-} // namespace tablet
-} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/7f72105b/src/kudu/tablet/tablet_peer.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_peer.cc b/src/kudu/tablet/tablet_peer.cc
deleted file mode 100644
index d9bddb4..0000000
--- a/src/kudu/tablet/tablet_peer.cc
+++ /dev/null
@@ -1,672 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "kudu/tablet/tablet_peer.h"
-
-#include <algorithm>
-#include <gflags/gflags.h>
-#include <memory>
-#include <mutex>
-#include <string>
-#include <utility>
-#include <vector>
-
-#include "kudu/consensus/consensus.h"
-#include "kudu/consensus/consensus_meta.h"
-#include "kudu/consensus/log.h"
-#include "kudu/consensus/log_util.h"
-#include "kudu/consensus/opid_util.h"
-#include "kudu/consensus/log_anchor_registry.h"
-#include "kudu/consensus/quorum_util.h"
-#include "kudu/consensus/raft_consensus.h"
-#include "kudu/gutil/mathlimits.h"
-#include "kudu/gutil/stl_util.h"
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/gutil/sysinfo.h"
-#include "kudu/rpc/messenger.h"
-#include "kudu/rpc/remote_method.h"
-#include "kudu/rpc/rpc_service.h"
-#include "kudu/rpc/service_pool.h"
-#include "kudu/tablet/transactions/transaction_driver.h"
-#include "kudu/tablet/transactions/alter_schema_transaction.h"
-#include "kudu/tablet/transactions/write_transaction.h"
-#include "kudu/tablet/tablet_bootstrap.h"
-#include "kudu/tablet/tablet_metrics.h"
-#include "kudu/tablet/tablet_peer_mm_ops.h"
-#include "kudu/tablet/tablet.pb.h"
-#include "kudu/util/logging.h"
-#include "kudu/util/metrics.h"
-#include "kudu/util/pb_util.h"
-#include "kudu/util/stopwatch.h"
-#include "kudu/util/threadpool.h"
-#include "kudu/util/trace.h"
-
-using std::map;
-using std::shared_ptr;
-using std::unique_ptr;
-
-namespace kudu {
-namespace tablet {
-
-METRIC_DEFINE_histogram(tablet, op_prepare_queue_length, "Operation Prepare Queue Length",
- MetricUnit::kTasks,
- "Number of operations waiting to be prepared within this tablet. "
- "High queue lengths indicate that the server is unable to process "
- "operations as fast as they are being written to the WAL.",
- 10000, 2);
-
-METRIC_DEFINE_histogram(tablet, op_prepare_queue_time, "Operation Prepare Queue Time",
- MetricUnit::kMicroseconds,
- "Time that operations spent waiting in the prepare queue before being "
- "processed. High queue times indicate that the server is unable to "
- "process operations as fast as they are being written to the WAL.",
- 10000000, 2);
-
-METRIC_DEFINE_histogram(tablet, op_prepare_run_time, "Operation Prepare Run Time",
- MetricUnit::kMicroseconds,
- "Time that operations spent being prepared in the tablet. "
- "High values may indicate that the server is under-provisioned or "
- "that operations are experiencing high contention with one another for "
- "locks.",
- 10000000, 2);
-
-using consensus::Consensus;
-using consensus::ConsensusBootstrapInfo;
-using consensus::ConsensusMetadata;
-using consensus::ConsensusOptions;
-using consensus::ConsensusRound;
-using consensus::OpId;
-using consensus::RaftConfigPB;
-using consensus::RaftPeerPB;
-using consensus::RaftConsensus;
-using consensus::TimeManager;
-using consensus::ALTER_SCHEMA_OP;
-using consensus::WRITE_OP;
-using log::Log;
-using log::LogAnchorRegistry;
-using rpc::Messenger;
-using rpc::ResultTracker;
-using strings::Substitute;
-using tserver::TabletServerErrorPB;
-
-// ============================================================================
-// Tablet Peer
-// ============================================================================
-TabletPeer::TabletPeer(const scoped_refptr<TabletMetadata>& meta,
- const consensus::RaftPeerPB& local_peer_pb,
- ThreadPool* apply_pool,
- Callback<void(const std::string& reason)> mark_dirty_clbk)
- : meta_(meta),
- tablet_id_(meta->tablet_id()),
- local_peer_pb_(local_peer_pb),
- state_(NOT_STARTED),
- last_status_("Tablet initializing..."),
- apply_pool_(apply_pool),
- log_anchor_registry_(new LogAnchorRegistry()),
- mark_dirty_clbk_(std::move(mark_dirty_clbk)) {}
-
-TabletPeer::~TabletPeer() {
- std::lock_guard<simple_spinlock> lock(lock_);
- // We should either have called Shutdown(), or we should have never called
- // Init().
- CHECK(!tablet_)
- << "TabletPeer not fully shut down. State: "
- << TabletStatePB_Name(state_);
-}
-
-Status TabletPeer::Init(const shared_ptr<Tablet>& tablet,
- const scoped_refptr<server::Clock>& clock,
- const shared_ptr<Messenger>& messenger,
- const scoped_refptr<ResultTracker>& result_tracker,
- const scoped_refptr<Log>& log,
- const scoped_refptr<MetricEntity>& metric_entity) {
-
- DCHECK(tablet) << "A TabletPeer must be provided with a Tablet";
- DCHECK(log) << "A TabletPeer must be provided with a Log";
-
- RETURN_NOT_OK(ThreadPoolBuilder("prepare").set_max_threads(1).Build(&prepare_pool_));
- prepare_pool_->SetQueueLengthHistogram(
- METRIC_op_prepare_queue_length.Instantiate(metric_entity));
- prepare_pool_->SetQueueTimeMicrosHistogram(
- METRIC_op_prepare_queue_time.Instantiate(metric_entity));
- prepare_pool_->SetRunTimeMicrosHistogram(
- METRIC_op_prepare_run_time.Instantiate(metric_entity));
-
- {
- std::lock_guard<simple_spinlock> lock(lock_);
- CHECK_EQ(BOOTSTRAPPING, state_);
- tablet_ = tablet;
- clock_ = clock;
- messenger_ = messenger;
- log_ = log;
- result_tracker_ = result_tracker;
-
- ConsensusOptions options;
- options.tablet_id = meta_->tablet_id();
-
- TRACE("Creating consensus instance");
-
- unique_ptr<ConsensusMetadata> cmeta;
- RETURN_NOT_OK(ConsensusMetadata::Load(meta_->fs_manager(), tablet_id_,
- meta_->fs_manager()->uuid(), &cmeta));
-
- scoped_refptr<TimeManager> time_manager(new TimeManager(
- clock, tablet_->mvcc_manager()->GetCleanTimestamp()));
-
- consensus_ = RaftConsensus::Create(options,
- std::move(cmeta),
- local_peer_pb_,
- metric_entity,
- time_manager,
- this,
- messenger_,
- log_.get(),
- tablet_->mem_tracker(),
- mark_dirty_clbk_);
- }
-
- if (tablet_->metrics() != nullptr) {
- TRACE("Starting instrumentation");
- txn_tracker_.StartInstrumentation(tablet_->GetMetricEntity());
- }
- txn_tracker_.StartMemoryTracking(tablet_->mem_tracker());
-
- TRACE("TabletPeer::Init() finished");
- VLOG(2) << "T " << tablet_id() << " P " << consensus_->peer_uuid() << ": Peer Initted";
- return Status::OK();
-}
-
-Status TabletPeer::Start(const ConsensusBootstrapInfo& bootstrap_info) {
- std::lock_guard<simple_spinlock> l(state_change_lock_);
- TRACE("Starting consensus");
-
- VLOG(2) << "T " << tablet_id() << " P " << consensus_->peer_uuid() << ": Peer starting";
-
- VLOG(2) << "RaftConfig before starting: " << SecureDebugString(consensus_->CommittedConfig());
-
- RETURN_NOT_OK(consensus_->Start(bootstrap_info));
- {
- std::lock_guard<simple_spinlock> lock(lock_);
- CHECK_EQ(state_, BOOTSTRAPPING);
- state_ = RUNNING;
- }
-
- // Because we changed the tablet state, we need to re-report the tablet to the master.
- mark_dirty_clbk_.Run("Started TabletPeer");
-
- return Status::OK();
-}
-
-const consensus::RaftConfigPB TabletPeer::RaftConfig() const {
- CHECK(consensus_) << "consensus is null";
- return consensus_->CommittedConfig();
-}
-
-void TabletPeer::Shutdown() {
-
- LOG(INFO) << "Initiating TabletPeer shutdown for tablet: " << tablet_id_;
-
- {
- std::unique_lock<simple_spinlock> lock(lock_);
- if (state_ == QUIESCING || state_ == SHUTDOWN) {
- lock.unlock();
- WaitUntilShutdown();
- return;
- }
- state_ = QUIESCING;
- }
-
- std::lock_guard<simple_spinlock> l(state_change_lock_);
- // Even though Tablet::Shutdown() also unregisters its ops, we have to do it here
- // to ensure that any currently running operation finishes before we proceed with
- // the rest of the shutdown sequence. In particular, a maintenance operation could
- // indirectly end up calling into the log, which we are about to shut down.
- if (tablet_) tablet_->UnregisterMaintenanceOps();
- UnregisterMaintenanceOps();
-
- if (consensus_) consensus_->Shutdown();
-
- // TODO: KUDU-183: Keep track of the pending tasks and send an "abort" message.
- LOG_SLOW_EXECUTION(WARNING, 1000,
- Substitute("TabletPeer: tablet $0: Waiting for Transactions to complete", tablet_id())) {
- txn_tracker_.WaitForAllToFinish();
- }
-
- if (prepare_pool_) {
- prepare_pool_->Shutdown();
- }
-
- if (log_) {
- WARN_NOT_OK(log_->Close(), "Error closing the Log.");
- }
-
- if (VLOG_IS_ON(1)) {
- VLOG(1) << "TabletPeer: tablet " << tablet_id() << " shut down!";
- }
-
- if (tablet_) {
- tablet_->Shutdown();
- }
-
- // Only mark the peer as SHUTDOWN when all other components have shut down.
- {
- std::lock_guard<simple_spinlock> lock(lock_);
- // Release mem tracker resources.
- consensus_.reset();
- tablet_.reset();
- state_ = SHUTDOWN;
- }
-}
-
-void TabletPeer::WaitUntilShutdown() {
- while (true) {
- {
- std::lock_guard<simple_spinlock> lock(lock_);
- if (state_ == SHUTDOWN) {
- return;
- }
- }
- SleepFor(MonoDelta::FromMilliseconds(10));
- }
-}
-
-Status TabletPeer::CheckRunning() const {
- {
- std::lock_guard<simple_spinlock> lock(lock_);
- if (state_ != RUNNING) {
- return Status::IllegalState(Substitute("The tablet is not in a running state: $0",
- TabletStatePB_Name(state_)));
- }
- }
- return Status::OK();
-}
-
-Status TabletPeer::WaitUntilConsensusRunning(const MonoDelta& timeout) {
- MonoTime start(MonoTime::Now());
-
- int backoff_exp = 0;
- const int kMaxBackoffExp = 8;
- while (true) {
- bool has_consensus = false;
- TabletStatePB cached_state;
- {
- std::lock_guard<simple_spinlock> lock(lock_);
- cached_state = state_;
- if (consensus_) {
- has_consensus = true; // consensus_ is a set-once object.
- }
- }
- if (cached_state == QUIESCING || cached_state == SHUTDOWN) {
- return Status::IllegalState(
- Substitute("The tablet is already shutting down or shutdown. State: $0",
- TabletStatePB_Name(cached_state)));
- }
- if (cached_state == RUNNING && has_consensus && consensus_->IsRunning()) {
- break;
- }
- MonoTime now(MonoTime::Now());
- MonoDelta elapsed(now - start);
- if (elapsed > timeout) {
- return Status::TimedOut(Substitute("Consensus is not running after waiting for $0. State; $1",
- elapsed.ToString(), TabletStatePB_Name(cached_state)));
- }
- SleepFor(MonoDelta::FromMilliseconds(1 << backoff_exp));
- backoff_exp = std::min(backoff_exp + 1, kMaxBackoffExp);
- }
- return Status::OK();
-}
-
-Status TabletPeer::SubmitWrite(unique_ptr<WriteTransactionState> state) {
- RETURN_NOT_OK(CheckRunning());
-
- state->SetResultTracker(result_tracker_);
- gscoped_ptr<WriteTransaction> transaction(new WriteTransaction(std::move(state),
- consensus::LEADER));
- scoped_refptr<TransactionDriver> driver;
- RETURN_NOT_OK(NewLeaderTransactionDriver(transaction.PassAs<Transaction>(),
- &driver));
- return driver->ExecuteAsync();
-}
-
-Status TabletPeer::SubmitAlterSchema(unique_ptr<AlterSchemaTransactionState> state) {
- RETURN_NOT_OK(CheckRunning());
-
- gscoped_ptr<AlterSchemaTransaction> transaction(
- new AlterSchemaTransaction(std::move(state), consensus::LEADER));
- scoped_refptr<TransactionDriver> driver;
- RETURN_NOT_OK(NewLeaderTransactionDriver(transaction.PassAs<Transaction>(), &driver));
- return driver->ExecuteAsync();
-}
-
-void TabletPeer::GetTabletStatusPB(TabletStatusPB* status_pb_out) const {
- std::lock_guard<simple_spinlock> lock(lock_);
- DCHECK(status_pb_out != nullptr);
- status_pb_out->set_tablet_id(meta_->tablet_id());
- status_pb_out->set_table_name(meta_->table_name());
- status_pb_out->set_last_status(last_status_);
- meta_->partition().ToPB(status_pb_out->mutable_partition());
- status_pb_out->set_state(state_);
- status_pb_out->set_tablet_data_state(meta_->tablet_data_state());
- if (tablet_) {
- status_pb_out->set_estimated_on_disk_size(tablet_->EstimateOnDiskSize());
- }
-}
-
-Status TabletPeer::RunLogGC() {
- if (!CheckRunning().ok()) {
- return Status::OK();
- }
- int32_t num_gced;
- log::RetentionIndexes retention = GetRetentionIndexes();
- Status s = log_->GC(retention, &num_gced);
- if (!s.ok()) {
- s = s.CloneAndPrepend("Unexpected error while running Log GC from TabletPeer");
- LOG(ERROR) << s.ToString();
- }
- return Status::OK();
-}
-
-void TabletPeer::StatusMessage(const std::string& status) {
- std::lock_guard<simple_spinlock> lock(lock_);
- last_status_ = status;
-}
-
-string TabletPeer::last_status() const {
- std::lock_guard<simple_spinlock> lock(lock_);
- return last_status_;
-}
-
-void TabletPeer::SetFailed(const Status& error) {
- std::lock_guard<simple_spinlock> lock(lock_);
- CHECK(!error.ok());
- state_ = FAILED;
- error_ = error;
- last_status_ = error.ToString();
-}
-
-string TabletPeer::HumanReadableState() const {
- std::lock_guard<simple_spinlock> lock(lock_);
- TabletDataState data_state = meta_->tablet_data_state();
- // If failed, any number of things could have gone wrong.
- if (state_ == FAILED) {
- return Substitute("$0 ($1): $2", TabletStatePB_Name(state_),
- TabletDataState_Name(data_state),
- error_.ToString());
- // If it's copying, or tombstoned, that is the important thing
- // to show.
- } else if (data_state != TABLET_DATA_READY) {
- return TabletDataState_Name(data_state);
- }
- // Otherwise, the tablet's data is in a "normal" state, so we just display
- // the runtime state (BOOTSTRAPPING, RUNNING, etc).
- return TabletStatePB_Name(state_);
-}
-
-void TabletPeer::GetInFlightTransactions(Transaction::TraceType trace_type,
- vector<consensus::TransactionStatusPB>* out) const {
- vector<scoped_refptr<TransactionDriver> > pending_transactions;
- txn_tracker_.GetPendingTransactions(&pending_transactions);
- for (const scoped_refptr<TransactionDriver>& driver : pending_transactions) {
- if (driver->state() != nullptr) {
- consensus::TransactionStatusPB status_pb;
- status_pb.mutable_op_id()->CopyFrom(driver->GetOpId());
- switch (driver->tx_type()) {
- case Transaction::WRITE_TXN:
- status_pb.set_tx_type(consensus::WRITE_OP);
- break;
- case Transaction::ALTER_SCHEMA_TXN:
- status_pb.set_tx_type(consensus::ALTER_SCHEMA_OP);
- break;
- }
- status_pb.set_description(driver->ToString());
- int64_t running_for_micros =
- (MonoTime::Now() - driver->start_time()).ToMicroseconds();
- status_pb.set_running_for_micros(running_for_micros);
- if (trace_type == Transaction::TRACE_TXNS) {
- status_pb.set_trace_buffer(driver->trace()->DumpToString());
- }
- out->push_back(status_pb);
- }
- }
-}
-
-log::RetentionIndexes TabletPeer::GetRetentionIndexes() const {
- // Let consensus set a minimum index that should be anchored.
- // This ensures that we:
- // (a) don't GC any operations which are still in flight
- // (b) don't GC any operations that are needed to catch up lagging peers.
- log::RetentionIndexes ret = consensus_->GetRetentionIndexes();
-
- // If we never have written to the log, no need to proceed.
- if (ret.for_durability == 0) return ret;
-
- // Next, we interrogate the anchor registry.
- // Returns OK if minimum known, NotFound if no anchors are registered.
- {
- int64_t min_anchor_index;
- Status s = log_anchor_registry_->GetEarliestRegisteredLogIndex(&min_anchor_index);
- if (PREDICT_FALSE(!s.ok())) {
- DCHECK(s.IsNotFound()) << "Unexpected error calling LogAnchorRegistry: " << s.ToString();
- } else {
- ret.for_durability = std::min(ret.for_durability, min_anchor_index);
- }
- }
-
- // Next, interrogate the TransactionTracker.
- vector<scoped_refptr<TransactionDriver> > pending_transactions;
- txn_tracker_.GetPendingTransactions(&pending_transactions);
- for (const scoped_refptr<TransactionDriver>& driver : pending_transactions) {
- OpId tx_op_id = driver->GetOpId();
- // A transaction which doesn't have an opid hasn't been submitted for replication yet and
- // thus has no need to anchor the log.
- if (tx_op_id.IsInitialized()) {
- ret.for_durability = std::min(ret.for_durability, tx_op_id.index());
- }
- }
-
- return ret;
-}
-
-Status TabletPeer::GetReplaySizeMap(map<int64_t, int64_t>* replay_size_map) const {
- RETURN_NOT_OK(CheckRunning());
- log_->GetReplaySizeMap(replay_size_map);
- return Status::OK();
-}
-
-Status TabletPeer::GetGCableDataSize(int64_t* retention_size) const {
- RETURN_NOT_OK(CheckRunning());
- *retention_size = log_->GetGCableDataSize(GetRetentionIndexes());
- return Status::OK();
-}
-
-Status TabletPeer::StartReplicaTransaction(const scoped_refptr<ConsensusRound>& round) {
- {
- std::lock_guard<simple_spinlock> lock(lock_);
- if (state_ != RUNNING && state_ != BOOTSTRAPPING) {
- return Status::IllegalState(TabletStatePB_Name(state_));
- }
- }
-
- consensus::ReplicateMsg* replicate_msg = round->replicate_msg();
- DCHECK(replicate_msg->has_timestamp());
- gscoped_ptr<Transaction> transaction;
- switch (replicate_msg->op_type()) {
- case WRITE_OP:
- {
- DCHECK(replicate_msg->has_write_request()) << "WRITE_OP replica"
- " transaction must receive a WriteRequestPB";
- unique_ptr<WriteTransactionState> tx_state(
- new WriteTransactionState(
- this,
- &replicate_msg->write_request(),
- replicate_msg->has_request_id() ? &replicate_msg->request_id() : nullptr));
- tx_state->SetResultTracker(result_tracker_);
-
- transaction.reset(new WriteTransaction(std::move(tx_state), consensus::REPLICA));
- break;
- }
- case ALTER_SCHEMA_OP:
- {
- DCHECK(replicate_msg->has_alter_schema_request()) << "ALTER_SCHEMA_OP replica"
- " transaction must receive an AlterSchemaRequestPB";
- unique_ptr<AlterSchemaTransactionState> tx_state(
- new AlterSchemaTransactionState(this, &replicate_msg->alter_schema_request(),
- nullptr));
- transaction.reset(
- new AlterSchemaTransaction(std::move(tx_state), consensus::REPLICA));
- break;
- }
- default:
- LOG(FATAL) << "Unsupported Operation Type";
- }
-
- // TODO(todd) Look at wiring the stuff below on the driver
- TransactionState* state = transaction->state();
- state->set_consensus_round(round);
-
- scoped_refptr<TransactionDriver> driver;
- RETURN_NOT_OK(NewReplicaTransactionDriver(std::move(transaction), &driver));
-
- // Unretained is required to avoid a refcount cycle.
- state->consensus_round()->SetConsensusReplicatedCallback(
- Bind(&TransactionDriver::ReplicationFinished, Unretained(driver.get())));
-
- RETURN_NOT_OK(driver->ExecuteAsync());
- return Status::OK();
-}
-
-Status TabletPeer::NewLeaderTransactionDriver(gscoped_ptr<Transaction> transaction,
- scoped_refptr<TransactionDriver>* driver) {
- scoped_refptr<TransactionDriver> tx_driver = new TransactionDriver(
- &txn_tracker_,
- consensus_.get(),
- log_.get(),
- prepare_pool_.get(),
- apply_pool_,
- &txn_order_verifier_);
- RETURN_NOT_OK(tx_driver->Init(std::move(transaction), consensus::LEADER));
- driver->swap(tx_driver);
-
- return Status::OK();
-}
-
-Status TabletPeer::NewReplicaTransactionDriver(gscoped_ptr<Transaction> transaction,
- scoped_refptr<TransactionDriver>* driver) {
- scoped_refptr<TransactionDriver> tx_driver = new TransactionDriver(
- &txn_tracker_,
- consensus_.get(),
- log_.get(),
- prepare_pool_.get(),
- apply_pool_,
- &txn_order_verifier_);
- RETURN_NOT_OK(tx_driver->Init(std::move(transaction), consensus::REPLICA));
- driver->swap(tx_driver);
-
- return Status::OK();
-}
-
-void TabletPeer::RegisterMaintenanceOps(MaintenanceManager* maint_mgr) {
- // Taking state_change_lock_ ensures that we don't shut down concurrently with
- // this last start-up task.
- std::lock_guard<simple_spinlock> l(state_change_lock_);
-
- if (state() != RUNNING) {
- LOG(WARNING) << "Not registering maintenance operations for " << tablet_
- << ": tablet not in RUNNING state";
- return;
- }
-
- DCHECK(maintenance_ops_.empty());
-
- gscoped_ptr<MaintenanceOp> mrs_flush_op(new FlushMRSOp(this));
- maint_mgr->RegisterOp(mrs_flush_op.get());
- maintenance_ops_.push_back(mrs_flush_op.release());
-
- gscoped_ptr<MaintenanceOp> dms_flush_op(new FlushDeltaMemStoresOp(this));
- maint_mgr->RegisterOp(dms_flush_op.get());
- maintenance_ops_.push_back(dms_flush_op.release());
-
- gscoped_ptr<MaintenanceOp> log_gc(new LogGCOp(this));
- maint_mgr->RegisterOp(log_gc.get());
- maintenance_ops_.push_back(log_gc.release());
-
- tablet_->RegisterMaintenanceOps(maint_mgr);
-}
-
-void TabletPeer::UnregisterMaintenanceOps() {
- DCHECK(state_change_lock_.is_locked());
- for (MaintenanceOp* op : maintenance_ops_) {
- op->Unregister();
- }
- STLDeleteElements(&maintenance_ops_);
-}
-
-Status FlushInflightsToLogCallback::WaitForInflightsAndFlushLog() {
- // This callback is triggered prior to any TabletMetadata flush.
- // The guarantee that we are trying to enforce is this:
- //
- // If an operation has been flushed to stable storage (eg a DRS or DeltaFile)
- // then its COMMIT message must be present in the log.
- //
- // The purpose for this is so that, during bootstrap, we can accurately identify
- // whether each operation has been flushed. If we don't see a COMMIT message for
- // an operation, then we assume it was not completely applied and needs to be
- // re-applied. Thus, if we had something on disk but with no COMMIT message,
- // we'd attempt to double-apply the write, resulting in an error (eg trying to
- // delete an already-deleted row).
- //
- // So, to enforce this property, we do two steps:
- //
- // 1) Wait for any operations which are already mid-Apply() to Commit() in MVCC.
- //
- // Because the operations always enqueue their COMMIT message to the log
- // before calling Commit(), this ensures that any in-flight operations have
- // their commit messages "en route".
- //
- // NOTE: we only wait for those operations that have started their Apply() phase.
- // Any operations which haven't yet started applying haven't made any changes
- // to in-memory state: thus, they obviously couldn't have made any changes to
- // on-disk storage either (data can only get to the disk by going through an in-memory
- // store). Only those that have started Apply() could have potentially written some
- // data which is now on disk.
- //
- // Perhaps more importantly, if we waited on operations that hadn't started their
- // Apply() phase, we might be waiting forever -- for example, if a follower has been
- // partitioned from its leader, it may have operations sitting around in flight
- // for quite a long time before eventually aborting or committing. This would
- // end up blocking all flushes if we waited on it.
- //
- // 2) Flush the log
- //
- // This ensures that the above-mentioned commit messages are not just enqueued
- // to the log, but also on disk.
- VLOG(1) << "T " << tablet_->metadata()->tablet_id()
- << ": Waiting for in-flight transactions to commit.";
- LOG_SLOW_EXECUTION(WARNING, 200, "Committing in-flights took a long time.") {
- tablet_->mvcc_manager()->WaitForApplyingTransactionsToCommit();
- }
- VLOG(1) << "T " << tablet_->metadata()->tablet_id()
- << ": Waiting for the log queue to be flushed.";
- LOG_SLOW_EXECUTION(WARNING, 200, "Flushing the Log queue took a long time.") {
- RETURN_NOT_OK(log_->WaitUntilAllFlushed());
- }
- return Status::OK();
-}
-
-
-} // namespace tablet
-} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/7f72105b/src/kudu/tablet/tablet_peer.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_peer.h b/src/kudu/tablet/tablet_peer.h
deleted file mode 100644
index d622626..0000000
--- a/src/kudu/tablet/tablet_peer.h
+++ /dev/null
@@ -1,377 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef KUDU_TABLET_TABLET_PEER_H_
-#define KUDU_TABLET_TABLET_PEER_H_
-
-#include <map>
-#include <memory>
-#include <mutex>
-#include <string>
-#include <vector>
-
-#include "kudu/consensus/consensus.h"
-#include "kudu/consensus/log.h"
-#include "kudu/consensus/time_manager.h"
-#include "kudu/gutil/callback.h"
-#include "kudu/gutil/ref_counted.h"
-#include "kudu/tablet/tablet.h"
-#include "kudu/tablet/transaction_order_verifier.h"
-#include "kudu/tablet/transactions/transaction_tracker.h"
-#include "kudu/util/metrics.h"
-#include "kudu/util/semaphore.h"
-
-namespace kudu {
-
-namespace log {
-class LogAnchorRegistry;
-}
-
-namespace rpc {
-class Messenger;
-class ResultTracker;
-}
-
-namespace tserver {
-class CatchUpServiceTest;
-}
-
-class MaintenanceManager;
-class MaintenanceOp;
-
-namespace tablet {
-class LeaderTransactionDriver;
-class ReplicaTransactionDriver;
-class TabletPeer;
-class TabletStatusPB;
-class TabletStatusListener;
-class TransactionDriver;
-
-// Interface by which various tablet-related processes can report back their status
-// to TabletPeer without having to have a circular class dependency, and so that
-// those other classes can be easily tested without constructing a TabletPeer.
-class TabletStatusListener {
- public:
- virtual ~TabletStatusListener() {}
-
- virtual void StatusMessage(const std::string& status) = 0;
-};
-
-// A peer in a tablet consensus configuration, which coordinates writes to tablets.
-// Each time Write() is called this class appends a new entry to a replicated
-// state machine through a consensus algorithm, which makes sure that other
-// peers see the same updates in the same order. In addition to this, this
-// class also splits the work and coordinates multi-threaded execution.
-class TabletPeer : public RefCountedThreadSafe<TabletPeer>,
- public consensus::ReplicaTransactionFactory,
- public TabletStatusListener {
- public:
- TabletPeer(const scoped_refptr<TabletMetadata>& meta,
- const consensus::RaftPeerPB& local_peer_pb, ThreadPool* apply_pool,
- Callback<void(const std::string& reason)> mark_dirty_clbk);
-
- // Initializes the TabletPeer, namely creating the Log and initializing
- // Consensus.
- Status Init(const std::shared_ptr<tablet::Tablet>& tablet,
- const scoped_refptr<server::Clock>& clock,
- const std::shared_ptr<rpc::Messenger>& messenger,
- const scoped_refptr<rpc::ResultTracker>& result_tracker,
- const scoped_refptr<log::Log>& log,
- const scoped_refptr<MetricEntity>& metric_entity);
-
- // Starts the TabletPeer, making it available for Write()s. If this
- // TabletPeer is part of a consensus configuration this will connect it to other peers
- // in the consensus configuration.
- Status Start(const consensus::ConsensusBootstrapInfo& info);
-
- // Shutdown this tablet peer.
- // If a shutdown is already in progress, blocks until that shutdown is complete.
- void Shutdown();
-
- // Check that the tablet is in a RUNNING state.
- Status CheckRunning() const;
-
- // Wait until the tablet is in a RUNNING state or if there's a timeout.
- // TODO have a way to wait for any state?
- Status WaitUntilConsensusRunning(const MonoDelta& timeout);
-
- // Submits a write to a tablet and executes it asynchronously.
- // The caller is expected to build and pass a TrasactionContext that points
- // to the RPC WriteRequest, WriteResponse, RpcContext and to the tablet's
- // MvccManager.
- Status SubmitWrite(std::unique_ptr<WriteTransactionState> tx_state);
-
- // Called by the tablet service to start an alter schema transaction.
- //
- // The transaction contains all the information required to execute the
- // AlterSchema operation and send the response back.
- //
- // If the returned Status is OK, the response to the client will be sent
- // asynchronously. Otherwise the tablet service will have to send the response directly.
- //
- // The AlterSchema operation is taking the tablet component lock in exclusive mode
- // meaning that no other operation on the tablet can be executed while the
- // AlterSchema is in progress.
- Status SubmitAlterSchema(std::unique_ptr<AlterSchemaTransactionState> tx_state);
-
- void GetTabletStatusPB(TabletStatusPB* status_pb_out) const;
-
- // Used by consensus to create and start a new ReplicaTransaction.
- virtual Status StartReplicaTransaction(
- const scoped_refptr<consensus::ConsensusRound>& round) OVERRIDE;
-
- consensus::Consensus* consensus() {
- std::lock_guard<simple_spinlock> lock(lock_);
- return consensus_.get();
- }
-
- scoped_refptr<consensus::Consensus> shared_consensus() const {
- std::lock_guard<simple_spinlock> lock(lock_);
- return consensus_;
- }
-
- Tablet* tablet() const {
- std::lock_guard<simple_spinlock> lock(lock_);
- return tablet_.get();
- }
-
- scoped_refptr<consensus::TimeManager> time_manager() const {
- return consensus_->time_manager();
- }
-
- std::shared_ptr<Tablet> shared_tablet() const {
- std::lock_guard<simple_spinlock> lock(lock_);
- return tablet_;
- }
-
- const TabletStatePB state() const {
- std::lock_guard<simple_spinlock> lock(lock_);
- return state_;
- }
-
- // Returns the current Raft configuration.
- const consensus::RaftConfigPB RaftConfig() const;
-
- // If any peers in the consensus configuration lack permanent uuids, get them via an
- // RPC call and update.
- // TODO: move this to raft_consensus.h.
- Status UpdatePermanentUuids();
-
- // Sets the tablet to a BOOTSTRAPPING state, indicating it is starting up.
- void SetBootstrapping() {
- std::lock_guard<simple_spinlock> lock(lock_);
- CHECK_EQ(NOT_STARTED, state_);
- state_ = BOOTSTRAPPING;
- }
-
- // Implementation of TabletStatusListener::StatusMessage().
- void StatusMessage(const std::string& status) override;
-
- // Retrieve the last human-readable status of this tablet peer.
- std::string last_status() const;
-
- // Sets the tablet state to FAILED additionally setting the error to the provided
- // one.
- void SetFailed(const Status& error);
-
- // Returns the error that occurred, when state is FAILED.
- Status error() const {
- std::lock_guard<simple_spinlock> lock(lock_);
- return error_;
- }
-
- // Returns a human-readable string indicating the state of the tablet.
- // Typically this looks like "NOT_STARTED", "TABLET_DATA_COPYING",
- // etc. For use in places like the Web UI.
- std::string HumanReadableState() const;
-
- // Adds list of transactions in-flight at the time of the call to 'out'.
- void GetInFlightTransactions(Transaction::TraceType trace_type,
- std::vector<consensus::TransactionStatusPB>* out) const;
-
- // Returns the log indexes to be retained for durability and to catch up peers.
- // Used for selection of log segments to delete during Log GC.
- log::RetentionIndexes GetRetentionIndexes() const;
-
- // See Log::GetReplaySizeMap(...).
- //
- // Returns a non-ok status if the tablet isn't running.
- Status GetReplaySizeMap(std::map<int64_t, int64_t>* replay_size_map) const;
-
- // Returns the amount of bytes that would be GC'd if RunLogGC() was called.
- //
- // Returns a non-ok status if the tablet isn't running.
- Status GetGCableDataSize(int64_t* retention_size) const;
-
- // Return a pointer to the Log.
- // TabletPeer keeps a reference to Log after Init().
- log::Log* log() const {
- return log_.get();
- }
-
- server::Clock* clock() {
- return clock_.get();
- }
-
- const scoped_refptr<log::LogAnchorRegistry>& log_anchor_registry() const {
- return log_anchor_registry_;
- }
-
- // Returns the tablet_id of the tablet managed by this TabletPeer.
- // Returns the correct tablet_id even if the underlying tablet is not available
- // yet.
- const std::string& tablet_id() const { return tablet_id_; }
-
- // Convenience method to return the permanent_uuid of this peer.
- std::string permanent_uuid() const { return tablet_->metadata()->fs_manager()->uuid(); }
-
- Status NewLeaderTransactionDriver(gscoped_ptr<Transaction> transaction,
- scoped_refptr<TransactionDriver>* driver);
-
- Status NewReplicaTransactionDriver(gscoped_ptr<Transaction> transaction,
- scoped_refptr<TransactionDriver>* driver);
-
- // Tells the tablet's log to garbage collect.
- Status RunLogGC();
-
- // Register the maintenance ops associated with this peer's tablet, also invokes
- // Tablet::RegisterMaintenanceOps().
- void RegisterMaintenanceOps(MaintenanceManager* maintenance_manager);
-
- // Unregister the maintenance ops associated with this peer's tablet.
- // This method is not thread safe.
- void UnregisterMaintenanceOps();
-
- // Return pointer to the transaction tracker for this peer.
- const TransactionTracker* transaction_tracker() const { return &txn_tracker_; }
-
- const scoped_refptr<TabletMetadata>& tablet_metadata() const {
- return meta_;
- }
-
- // Marks the tablet as dirty so that it's included in the next heartbeat.
- void MarkTabletDirty(const std::string& reason) {
- mark_dirty_clbk_.Run(reason);
- }
-
- private:
- friend class RefCountedThreadSafe<TabletPeer>;
- friend class TabletPeerTest;
- FRIEND_TEST(TabletPeerTest, TestMRSAnchorPreventsLogGC);
- FRIEND_TEST(TabletPeerTest, TestDMSAnchorPreventsLogGC);
- FRIEND_TEST(TabletPeerTest, TestActiveTransactionPreventsLogGC);
-
- ~TabletPeer();
-
- // Wait until the TabletPeer is fully in SHUTDOWN state.
- void WaitUntilShutdown();
-
- // After bootstrap is complete and consensus is setup this initiates the transactions
- // that were not complete on bootstrap.
- // Not implemented yet. See .cc file.
- Status StartPendingTransactions(consensus::RaftPeerPB::Role my_role,
- const consensus::ConsensusBootstrapInfo& bootstrap_info);
-
- const scoped_refptr<TabletMetadata> meta_;
-
- const std::string tablet_id_;
-
- const consensus::RaftPeerPB local_peer_pb_;
-
- TabletStatePB state_;
- Status error_;
- TransactionTracker txn_tracker_;
- TransactionOrderVerifier txn_order_verifier_;
- scoped_refptr<log::Log> log_;
- std::shared_ptr<Tablet> tablet_;
- std::shared_ptr<rpc::Messenger> messenger_;
- scoped_refptr<consensus::Consensus> consensus_;
- simple_spinlock prepare_replicate_lock_;
-
- // Lock protecting state_, last_status_, as well as smart pointers to collaborating
- // classes such as tablet_ and consensus_.
- mutable simple_spinlock lock_;
-
- // The human-readable last status of the tablet, displayed on the web page, command line
- // tools, etc.
- std::string last_status_;
-
- // Lock taken during Init/Shutdown which ensures that only a single thread
- // attempts to perform major lifecycle operations (Init/Shutdown) at once.
- // This must be acquired before acquiring lock_ if they are acquired together.
- // We don't just use lock_ since the lifecycle operations may take a while
- // and we'd like other threads to be able to quickly poll the state_ variable
- // during them in order to reject RPCs, etc.
- mutable simple_spinlock state_change_lock_;
-
- // IMPORTANT: correct execution of PrepareTask assumes that 'prepare_pool_'
- // is single-threaded, moving to a multi-tablet setup where multiple TabletPeers
- // use the same 'prepare_pool_' needs to enforce that, for a single
- // TabletPeer, PrepareTasks are executed *serially*.
- // TODO move the prepare pool to TabletServer.
- gscoped_ptr<ThreadPool> prepare_pool_;
-
- // Pool that executes apply tasks for transactions. This is a multi-threaded
- // pool, constructor-injected by either the Master (for system tables) or
- // the Tablet server.
- ThreadPool* apply_pool_;
-
- scoped_refptr<server::Clock> clock_;
-
- scoped_refptr<log::LogAnchorRegistry> log_anchor_registry_;
-
- // Function to mark this TabletPeer's tablet as dirty in the TSTabletManager.
- //
- // Must be called whenever cluster membership or leadership changes, or when
- // the tablet's schema changes.
- Callback<void(const std::string& reason)> mark_dirty_clbk_;
-
- // List of maintenance operations for the tablet that need information that only the peer
- // can provide.
- std::vector<MaintenanceOp*> maintenance_ops_;
-
- // The result tracker for writes.
- scoped_refptr<rpc::ResultTracker> result_tracker_;
-
- DISALLOW_COPY_AND_ASSIGN(TabletPeer);
-};
-
-// A callback to wait for the in-flight transactions to complete and to flush
-// the Log when they do.
-// Tablet is passed as a raw pointer as this callback is set in TabletMetadata and
-// were we to keep the tablet as a shared_ptr a circular dependency would occur:
-// callback->tablet->metadata->callback. Since the tablet indirectly owns this
-// callback we know that is must still be alive when it fires.
-class FlushInflightsToLogCallback : public RefCountedThreadSafe<FlushInflightsToLogCallback> {
- public:
- FlushInflightsToLogCallback(Tablet* tablet,
- const scoped_refptr<log::Log>& log)
- : tablet_(tablet),
- log_(log) {}
-
- Status WaitForInflightsAndFlushLog();
-
- private:
- Tablet* tablet_;
- scoped_refptr<log::Log> log_;
-};
-
-
-} // namespace tablet
-} // namespace kudu
-
-#endif /* KUDU_TABLET_TABLET_PEER_H_ */
http://git-wip-us.apache.org/repos/asf/kudu/blob/7f72105b/src/kudu/tablet/tablet_peer_mm_ops.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_peer_mm_ops.cc b/src/kudu/tablet/tablet_peer_mm_ops.cc
deleted file mode 100644
index 508d69f..0000000
--- a/src/kudu/tablet/tablet_peer_mm_ops.cc
+++ /dev/null
@@ -1,244 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "kudu/tablet/tablet_peer_mm_ops.h"
-
-#include <algorithm>
-#include <gflags/gflags.h>
-#include <map>
-#include <mutex>
-#include <string>
-
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/tablet/tablet_metrics.h"
-#include "kudu/util/flag_tags.h"
-#include "kudu/util/maintenance_manager.h"
-#include "kudu/util/metrics.h"
-
-DEFINE_int32(flush_threshold_mb, 1024,
- "Size at which MemRowSet flushes are triggered. "
- "A MRS can still flush below this threshold if it if hasn't flushed in a while, "
- "or if the server-wide memory limit has been reached.");
-TAG_FLAG(flush_threshold_mb, experimental);
-
-DEFINE_int32(flush_threshold_secs, 2 * 60,
- "Number of seconds after which a non-empty MemRowSet will become flushable "
- "even if it is not large.");
-TAG_FLAG(flush_threshold_secs, experimental);
-
-
-METRIC_DEFINE_gauge_uint32(tablet, log_gc_running,
- "Log GCs Running",
- kudu::MetricUnit::kOperations,
- "Number of log GC operations currently running.");
-METRIC_DEFINE_histogram(tablet, log_gc_duration,
- "Log GC Duration",
- kudu::MetricUnit::kMilliseconds,
- "Time spent garbage collecting the logs.", 60000LU, 1);
-
-namespace kudu {
-namespace tablet {
-
-using std::map;
-using strings::Substitute;
-
-// Upper bound for how long it takes to reach "full perf improvement" in time-based flushing.
-const double kFlushUpperBoundMs = 60 * 60 * 1000;
-
-//
-// FlushOpPerfImprovementPolicy.
-//
-
-void FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(MaintenanceOpStats* stats,
- double elapsed_ms) {
- if (stats->ram_anchored() > FLAGS_flush_threshold_mb * 1024 * 1024) {
- // If we're over the user-specified flush threshold, then consider the perf
- // improvement to be 1 for every extra MB. This produces perf_improvement results
- // which are much higher than any compaction would produce, and means that, when
- // there is an MRS over threshold, a flush will almost always be selected instead of
- // a compaction. That's not necessarily a good thing, but in the absence of better
- // heuristics, it will do for now.
- double extra_mb =
- static_cast<double>(FLAGS_flush_threshold_mb - (stats->ram_anchored()) / (1024 * 1024));
- stats->set_perf_improvement(extra_mb);
- } else if (elapsed_ms > FLAGS_flush_threshold_secs * 1000) {
- // Even if we aren't over the threshold, consider flushing if we haven't flushed
- // in a long time. But, don't give it a large perf_improvement score. We should
- // only do this if we really don't have much else to do, and if we've already waited a bit.
- // The following will give an improvement that's between 0.0 and 1.0, gradually growing
- // as 'elapsed_ms' approaches 'kFlushUpperBoundMs'.
- double perf = elapsed_ms / kFlushUpperBoundMs;
- if (perf > 1.0) {
- perf = 1.0;
- }
- stats->set_perf_improvement(perf);
- }
-}
-
-//
-// FlushMRSOp.
-//
-
-void FlushMRSOp::UpdateStats(MaintenanceOpStats* stats) {
- std::lock_guard<simple_spinlock> l(lock_);
-
- map<int64_t, int64_t> replay_size_map;
- if (tablet_peer_->tablet()->MemRowSetEmpty() ||
- !tablet_peer_->GetReplaySizeMap(&replay_size_map).ok()) {
- return;
- }
-
- {
- std::unique_lock<Semaphore> lock(tablet_peer_->tablet()->rowsets_flush_sem_, std::defer_lock);
- stats->set_runnable(lock.try_lock());
- }
-
- stats->set_ram_anchored(tablet_peer_->tablet()->MemRowSetSize());
- stats->set_logs_retained_bytes(
- tablet_peer_->tablet()->MemRowSetLogReplaySize(replay_size_map));
-
- // TODO(todd): use workload statistics here to find out how "hot" the tablet has
- // been in the last 5 minutes.
- FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(
- stats,
- time_since_flush_.elapsed().wall_millis());
-}
-
-bool FlushMRSOp::Prepare() {
- // Try to acquire the rowsets_flush_sem_. If we can't, the Prepare step
- // fails. This also implies that only one instance of FlushMRSOp can be
- // running at once.
- return tablet_peer_->tablet()->rowsets_flush_sem_.try_lock();
-}
-
-void FlushMRSOp::Perform() {
- CHECK(!tablet_peer_->tablet()->rowsets_flush_sem_.try_lock());
-
- KUDU_CHECK_OK_PREPEND(tablet_peer_->tablet()->FlushUnlocked(),
- Substitute("FlushMRS failed on $0", tablet_peer_->tablet_id()));
-
- {
- std::lock_guard<simple_spinlock> l(lock_);
- time_since_flush_.start();
- }
- tablet_peer_->tablet()->rowsets_flush_sem_.unlock();
-}
-
-scoped_refptr<Histogram> FlushMRSOp::DurationHistogram() const {
- return tablet_peer_->tablet()->metrics()->flush_mrs_duration;
-}
-
-scoped_refptr<AtomicGauge<uint32_t> > FlushMRSOp::RunningGauge() const {
- return tablet_peer_->tablet()->metrics()->flush_mrs_running;
-}
-
-//
-// FlushDeltaMemStoresOp.
-//
-
-void FlushDeltaMemStoresOp::UpdateStats(MaintenanceOpStats* stats) {
- std::lock_guard<simple_spinlock> l(lock_);
- int64_t dms_size;
- int64_t retention_size;
- map<int64_t, int64_t> max_idx_to_replay_size;
- if (tablet_peer_->tablet()->DeltaMemRowSetEmpty() ||
- !tablet_peer_->GetReplaySizeMap(&max_idx_to_replay_size).ok()) {
- return;
- }
- tablet_peer_->tablet()->GetInfoForBestDMSToFlush(max_idx_to_replay_size,
- &dms_size, &retention_size);
-
- stats->set_ram_anchored(dms_size);
- stats->set_runnable(true);
- stats->set_logs_retained_bytes(retention_size);
-
- FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(
- stats,
- time_since_flush_.elapsed().wall_millis());
-}
-
-void FlushDeltaMemStoresOp::Perform() {
- map<int64_t, int64_t> max_idx_to_replay_size;
- if (!tablet_peer_->GetReplaySizeMap(&max_idx_to_replay_size).ok()) {
- LOG(WARNING) << "Won't flush deltas since tablet shutting down: " << tablet_peer_->tablet_id();
- return;
- }
- KUDU_CHECK_OK_PREPEND(tablet_peer_->tablet()->FlushDMSWithHighestRetention(
- max_idx_to_replay_size),
- Substitute("Failed to flush DMS on $0",
- tablet_peer_->tablet()->tablet_id()));
- {
- std::lock_guard<simple_spinlock> l(lock_);
- time_since_flush_.start();
- }
-}
-
-scoped_refptr<Histogram> FlushDeltaMemStoresOp::DurationHistogram() const {
- return tablet_peer_->tablet()->metrics()->flush_dms_duration;
-}
-
-scoped_refptr<AtomicGauge<uint32_t> > FlushDeltaMemStoresOp::RunningGauge() const {
- return tablet_peer_->tablet()->metrics()->flush_dms_running;
-}
-
-//
-// LogGCOp.
-//
-
-LogGCOp::LogGCOp(TabletPeer* tablet_peer)
- : MaintenanceOp(StringPrintf("LogGCOp(%s)", tablet_peer->tablet()->tablet_id().c_str()),
- MaintenanceOp::LOW_IO_USAGE),
- tablet_peer_(tablet_peer),
- log_gc_duration_(METRIC_log_gc_duration.Instantiate(
- tablet_peer->tablet()->GetMetricEntity())),
- log_gc_running_(METRIC_log_gc_running.Instantiate(
- tablet_peer->tablet()->GetMetricEntity(), 0)),
- sem_(1) {}
-
-void LogGCOp::UpdateStats(MaintenanceOpStats* stats) {
- int64_t retention_size;
-
- if (!tablet_peer_->GetGCableDataSize(&retention_size).ok()) {
- return;
- }
-
- stats->set_logs_retained_bytes(retention_size);
- stats->set_runnable(sem_.GetValue() == 1);
-}
-
-bool LogGCOp::Prepare() {
- return sem_.try_lock();
-}
-
-void LogGCOp::Perform() {
- CHECK(!sem_.try_lock());
-
- tablet_peer_->RunLogGC();
-
- sem_.unlock();
-}
-
-scoped_refptr<Histogram> LogGCOp::DurationHistogram() const {
- return log_gc_duration_;
-}
-
-scoped_refptr<AtomicGauge<uint32_t> > LogGCOp::RunningGauge() const {
- return log_gc_running_;
-}
-
-} // namespace tablet
-} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/7f72105b/src/kudu/tablet/tablet_peer_mm_ops.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_peer_mm_ops.h b/src/kudu/tablet/tablet_peer_mm_ops.h
deleted file mode 100644
index a985802..0000000
--- a/src/kudu/tablet/tablet_peer_mm_ops.h
+++ /dev/null
@@ -1,133 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef KUDU_TABLET_TABLET_PEER_MM_OPS_H_
-#define KUDU_TABLET_TABLET_PEER_MM_OPS_H_
-
-#include "kudu/tablet/tablet_peer.h"
-#include "kudu/util/maintenance_manager.h"
-#include "kudu/util/stopwatch.h"
-
-namespace kudu {
-
-class Histogram;
-template<class T>
-class AtomicGauge;
-
-namespace tablet {
-
-class FlushOpPerfImprovementPolicy {
- public:
- ~FlushOpPerfImprovementPolicy() {}
-
- // Sets the performance improvement based on the anchored ram if it's over the threshold,
- // else it will set it based on how long it has been since the last flush.
- static void SetPerfImprovementForFlush(MaintenanceOpStats* stats, double elapsed_ms);
-
- private:
- FlushOpPerfImprovementPolicy() {}
-};
-
-// Maintenance op for MRS flush. Only one can happen at a time.
-class FlushMRSOp : public MaintenanceOp {
- public:
- explicit FlushMRSOp(TabletPeer* tablet_peer)
- : MaintenanceOp(StringPrintf("FlushMRSOp(%s)", tablet_peer->tablet()->tablet_id().c_str()),
- MaintenanceOp::HIGH_IO_USAGE),
- tablet_peer_(tablet_peer) {
- time_since_flush_.start();
- }
-
- virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
-
- virtual bool Prepare() OVERRIDE;
-
- virtual void Perform() OVERRIDE;
-
- virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE;
-
- virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE;
-
- private:
- // Lock protecting time_since_flush_.
- mutable simple_spinlock lock_;
- Stopwatch time_since_flush_;
-
- TabletPeer *const tablet_peer_;
-};
-
-// Maintenance op for DMS flush.
-// Reports stats for all the DMS this tablet contains but only flushes one in Perform().
-class FlushDeltaMemStoresOp : public MaintenanceOp {
- public:
- explicit FlushDeltaMemStoresOp(TabletPeer* tablet_peer)
- : MaintenanceOp(StringPrintf("FlushDeltaMemStoresOp(%s)",
- tablet_peer->tablet()->tablet_id().c_str()),
- MaintenanceOp::HIGH_IO_USAGE),
- tablet_peer_(tablet_peer) {
- time_since_flush_.start();
- }
-
- virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
-
- virtual bool Prepare() OVERRIDE {
- return true;
- }
-
- virtual void Perform() OVERRIDE;
-
- virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE;
-
- virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE;
-
- private:
- // Lock protecting time_since_flush_
- mutable simple_spinlock lock_;
- Stopwatch time_since_flush_;
-
- TabletPeer *const tablet_peer_;
-};
-
-// Maintenance task that runs log GC. Reports log retention that represents the amount of data
-// that can be GC'd.
-//
-// Only one LogGC op can run at a time.
-class LogGCOp : public MaintenanceOp {
- public:
- explicit LogGCOp(TabletPeer* tablet_peer);
-
- virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
-
- virtual bool Prepare() OVERRIDE;
-
- virtual void Perform() OVERRIDE;
-
- virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE;
-
- virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE;
-
- private:
- TabletPeer *const tablet_peer_;
- scoped_refptr<Histogram> log_gc_duration_;
- scoped_refptr<AtomicGauge<uint32_t> > log_gc_running_;
- mutable Semaphore sem_;
-};
-
-} // namespace tablet
-} // namespace kudu
-
-#endif /* KUDU_TABLET_TABLET_PEER_MM_OPS_H_ */