You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2017/08/22 00:27:21 UTC
[1/2] kudu git commit: consensus: Improve contract for API to fetch
last-logged op id
Repository: kudu
Updated Branches:
refs/heads/master fa2757763 -> a7d589635
consensus: Improve contract for API to fetch last-logged op id
It's important that we differentiate between when we have a known
last-logged op and when we don't actually know what it is or whether we
have ever appended something to the local WAL. This applies both to the
TABLET_DATA_READY case, where this information is stored in the WAL, and
the TABLET_DATA_TOMBSTONED case, where this information is stored in the
superblock.
Cases where we are unable to determine the last-logged OpId from the WAL
when a replica is in TABLET_DATA_READY state:
* Early in the tablet replica lifecycle (before Raft is started).
* When a replica encounters an error during initialization.
Cases where we are unable to determine the last-logged OpId from the
TabletMetadata when a replica is in TABLET_DATA_TOMBSTONED state:
* If the replica was tombstoned while in a failed state.
Included in this patch are the following API improvements:
1. Delete Log::GetLatestEntryOpId(). Previously, this method would only
return something other than MinimumOpId() if a log entry had been
appended during the object's lifetime. It is abandoned in favor of
RaftConsensus::GetLastOpId(RECEIVED_OPID) which delegates to
PeerMessageQueue::GetLastOpIdInLog().
2. Merge PeerMessageQueue::Init() into the PeerMessageQueue constructor.
This allows us to remove one lifecycle state and allows us to
guarantee that, once the queue is constructed, we can always get a
valid last-logged opid from it (see #1).
3. Make TabletMetadata::tombstone_last_logged_opid() return a
boost::optional<OpId>. We need to clearly differentiate between when
we know the last-logged opid and when we don't. We also consider
MinimumOpId() to be equal to boost::none at superblock load time,
since previous versions of Kudu may have written (0,0) into the
TabletMetadata 'tombstone_last_logged_opid' field.
Change-Id: Ia4e4501a61cd40fdee0dc918b77675a0bc2515e7
Reviewed-on: http://gerrit.cloudera.org:8080/7717
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Kudu Jenkins
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/dc65abba
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/dc65abba
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/dc65abba
Branch: refs/heads/master
Commit: dc65abbab208120288a8c26b1099f45c488b865e
Parents: fa27577
Author: Mike Percy <mp...@apache.org>
Authored: Tue Aug 15 19:58:39 2017 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Tue Aug 22 00:26:52 2017 +0000
----------------------------------------------------------------------
src/kudu/consensus/consensus-test-util.h | 18 +--
src/kudu/consensus/consensus_peers-test.cc | 17 +--
src/kudu/consensus/consensus_queue-test.cc | 50 ++++-----
src/kudu/consensus/consensus_queue.cc | 41 +++----
src/kudu/consensus/consensus_queue.h | 17 ++-
src/kudu/consensus/log.cc | 21 ----
src/kudu/consensus/log.h | 12 --
src/kudu/consensus/mt-log-test.cc | 4 +-
src/kudu/consensus/raft_consensus.cc | 109 ++++++++++---------
src/kudu/consensus/raft_consensus.h | 14 +--
.../consensus/raft_consensus_quorum-test.cc | 11 +-
src/kudu/tablet/tablet_metadata.cc | 26 +++--
src/kudu/tablet/tablet_metadata.h | 7 +-
src/kudu/tablet/tablet_replica-test.cc | 24 ++--
src/kudu/tools/kudu-tool-test.cc | 16 +--
src/kudu/tserver/tablet_copy_client.cc | 33 ++++--
src/kudu/tserver/tablet_copy_source_session.cc | 8 +-
src/kudu/tserver/tablet_service.cc | 13 ++-
src/kudu/tserver/ts_tablet_manager.cc | 47 ++++----
19 files changed, 245 insertions(+), 243 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/dc65abba/src/kudu/consensus/consensus-test-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus-test-util.h b/src/kudu/consensus/consensus-test-util.h
index b554dcd..ac4408b 100644
--- a/src/kudu/consensus/consensus-test-util.h
+++ b/src/kudu/consensus/consensus-test-util.h
@@ -47,14 +47,16 @@
#define TOKENPASTE2(x, y) TOKENPASTE(x, y)
#define ASSERT_OPID_EQ(left, right) \
- OpId TOKENPASTE2(_left, __LINE__) = (left); \
- OpId TOKENPASTE2(_right, __LINE__) = (right); \
- if (!consensus::OpIdEquals(TOKENPASTE2(_left, __LINE__), TOKENPASTE2(_right,__LINE__))) { \
- FAIL() << "Expected: " \
- << pb_util::SecureShortDebugString(TOKENPASTE2(_right,__LINE__)) << "\n" \
- << "Value: " \
- << pb_util::SecureShortDebugString(TOKENPASTE2(_left,__LINE__)) << "\n"; \
- }
+ do { \
+ const OpId& TOKENPASTE2(_left, __LINE__) = (left); \
+ const OpId& TOKENPASTE2(_right, __LINE__) = (right); \
+ if (!consensus::OpIdEquals(TOKENPASTE2(_left, __LINE__), TOKENPASTE2(_right, __LINE__))) { \
+ FAIL() << "Expected: " \
+ << pb_util::SecureShortDebugString(TOKENPASTE2(_left, __LINE__)) << "\n" \
+ << "Value: " \
+ << pb_util::SecureShortDebugString(TOKENPASTE2(_right, __LINE__)) << "\n"; \
+ } \
+ } while (false)
namespace kudu {
namespace consensus {
http://git-wip-us.apache.org/repos/asf/kudu/blob/dc65abba/src/kudu/consensus/consensus_peers-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_peers-test.cc b/src/kudu/consensus/consensus_peers-test.cc
index aac7346..8641e75 100644
--- a/src/kudu/consensus/consensus_peers-test.cc
+++ b/src/kudu/consensus/consensus_peers-test.cc
@@ -103,7 +103,9 @@ class ConsensusPeersTest : public KuduTest {
time_manager,
FakeRaftPeerPB(kLeaderUuid),
kTabletId,
- raft_pool_->NewToken(ThreadPool::ExecutionMode::SERIAL)));
+ raft_pool_->NewToken(ThreadPool::ExecutionMode::SERIAL),
+ MinimumOpId(),
+ MinimumOpId()));
MessengerBuilder bld("test");
ASSERT_OK(bld.Build(&messenger_));
@@ -133,13 +135,6 @@ class ConsensusPeersTest : public KuduTest {
return proxy_ptr;
}
- void CheckLastLogEntry(int term, int index) {
- OpId id;
- log_->GetLatestEntryOpId(&id);
- ASSERT_EQ(id.term(), term);
- ASSERT_EQ(id.index(), index);
- }
-
void CheckLastRemoteEntry(DelayablePeerProxy<NoOpTestPeerProxy>* proxy, int term, int index) {
OpId id;
id.CopyFrom(proxy->proxy()->last_received());
@@ -179,7 +174,6 @@ class ConsensusPeersTest : public KuduTest {
TEST_F(ConsensusPeersTest, TestRemotePeer) {
// We use a majority size of 2 since we make one fake remote peer
// in addition to our real local log.
- message_queue_->Init(MinimumOpId(), MinimumOpId());
message_queue_->SetLeaderMode(kMinimumOpIdIndex,
kMinimumTerm,
BuildRaftConfigPBForTests(3));
@@ -203,7 +197,6 @@ TEST_F(ConsensusPeersTest, TestRemotePeer) {
}
TEST_F(ConsensusPeersTest, TestRemotePeers) {
- message_queue_->Init(MinimumOpId(), MinimumOpId());
message_queue_->SetLeaderMode(kMinimumOpIdIndex,
kMinimumTerm,
BuildRaftConfigPBForTests(3));
@@ -233,7 +226,7 @@ TEST_F(ConsensusPeersTest, TestRemotePeers) {
// of remote-peer1 and the local log.
WaitForCommitIndex(first.index());
- CheckLastLogEntry(first.term(), first.index());
+ ASSERT_OPID_EQ(first, message_queue_->GetLastOpIdInLog());
CheckLastRemoteEntry(remote_peer1_proxy, first.term(), first.index());
remote_peer2_proxy->Respond(TestPeerProxy::kUpdate);
@@ -262,7 +255,6 @@ TEST_F(ConsensusPeersTest, TestRemotePeers) {
// Regression test for KUDU-699: even if a peer isn't making progress,
// and thus always has data pending, we should be able to close the peer.
TEST_F(ConsensusPeersTest, TestCloseWhenRemotePeerDoesntMakeProgress) {
- message_queue_->Init(MinimumOpId(), MinimumOpId());
message_queue_->SetLeaderMode(kMinimumOpIdIndex,
kMinimumTerm,
BuildRaftConfigPBForTests(3));
@@ -301,7 +293,6 @@ TEST_F(ConsensusPeersTest, TestCloseWhenRemotePeerDoesntMakeProgress) {
}
TEST_F(ConsensusPeersTest, TestDontSendOneRpcPerWriteWhenPeerIsDown) {
- message_queue_->Init(MinimumOpId(), MinimumOpId());
message_queue_->SetLeaderMode(kMinimumOpIdIndex,
kMinimumTerm,
BuildRaftConfigPBForTests(3));
http://git-wip-us.apache.org/repos/asf/kudu/blob/dc65abba/src/kudu/consensus/consensus_queue-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue-test.cc b/src/kudu/consensus/consensus_queue-test.cc
index 8d6f30b..8722dde 100644
--- a/src/kudu/consensus/consensus_queue-test.cc
+++ b/src/kudu/consensus/consensus_queue-test.cc
@@ -96,19 +96,22 @@ class ConsensusQueueTest : public KuduTest {
ASSERT_OK(clock_->Init());
ASSERT_OK(ThreadPoolBuilder("raft").Build(&raft_pool_));
- CloseAndReopenQueue();
+ CloseAndReopenQueue(MinimumOpId(), MinimumOpId());
}
- void CloseAndReopenQueue() {
+ void CloseAndReopenQueue(const OpId& replicated_opid, const OpId& committed_opid) {
scoped_refptr<clock::Clock> clock(new clock::HybridClock());
ASSERT_OK(clock->Init());
scoped_refptr<TimeManager> time_manager(new TimeManager(clock, Timestamp::kMin));
- queue_.reset(new PeerMessageQueue(metric_entity_,
- log_.get(),
- time_manager,
- FakeRaftPeerPB(kLeaderUuid),
- kTestTablet,
- raft_pool_->NewToken(ThreadPool::ExecutionMode::SERIAL)));
+ queue_.reset(new PeerMessageQueue(
+ metric_entity_,
+ log_.get(),
+ time_manager,
+ FakeRaftPeerPB(kLeaderUuid),
+ kTestTablet,
+ raft_pool_->NewToken(ThreadPool::ExecutionMode::SERIAL),
+ replicated_opid,
+ committed_opid));
}
virtual void TearDown() OVERRIDE {
@@ -229,7 +232,6 @@ class ConsensusQueueTest : public KuduTest {
// with several messages and then starts to track a peer whose watermark
// falls in the middle of the current messages in the queue.
TEST_F(ConsensusQueueTest, TestStartTrackingAfterStart) {
- queue_->Init(MinimumOpId(), MinimumOpId());
queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm, BuildRaftConfigPBForTests(2));
AppendReplicateMessagesToQueue(queue_.get(), clock_, 1, 100);
@@ -269,7 +271,6 @@ TEST_F(ConsensusQueueTest, TestStartTrackingAfterStart) {
// Tests that the peers gets the messages pages, with the size of a page
// being 'consensus_max_batch_size_bytes'
TEST_F(ConsensusQueueTest, TestGetPagedMessages) {
- queue_->Init(MinimumOpId(), MinimumOpId());
queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm, BuildRaftConfigPBForTests(2));
// helper to estimate request size so that we can set the max batch size appropriately
@@ -338,7 +339,6 @@ TEST_F(ConsensusQueueTest, TestGetPagedMessages) {
}
TEST_F(ConsensusQueueTest, TestPeersDontAckBeyondWatermarks) {
- queue_->Init(MinimumOpId(), MinimumOpId());
queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm, BuildRaftConfigPBForTests(3));
AppendReplicateMessagesToQueue(queue_.get(), clock_, 1, 100);
@@ -405,7 +405,6 @@ TEST_F(ConsensusQueueTest, TestPeersDontAckBeyondWatermarks) {
}
TEST_F(ConsensusQueueTest, TestQueueAdvancesCommittedIndex) {
- queue_->Init(MinimumOpId(), MinimumOpId());
queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm, BuildRaftConfigPBForTests(5));
// Track 4 additional peers (in addition to the local peer)
queue_->TrackPeer("peer-1");
@@ -488,7 +487,8 @@ TEST_F(ConsensusQueueTest, TestQueueLoadsOperationsForPeer) {
OpId opid = MakeOpId(1, 1);
- for (int i = 1; i <= 100; i++) {
+ const int kOpsToAppend = 100;
+ for (int i = 1; i <= kOpsToAppend; i++) {
ASSERT_OK(log::AppendNoOpToLogSync(clock_, log_.get(), &opid));
// Roll the log every 10 ops
if (i % 10 == 0) {
@@ -497,16 +497,15 @@ TEST_F(ConsensusQueueTest, TestQueueLoadsOperationsForPeer) {
}
ASSERT_OK(log_->WaitUntilAllFlushed());
- OpId leader_last_op;
- log_->GetLatestEntryOpId(&leader_last_op);
+ ASSERT_OPID_EQ(MakeOpId(1, kOpsToAppend + 1), opid);
+ OpId last_logged_opid = MakeOpId(opid.term(), opid.index() - 1);
// Now reset the queue so that we can pass a new committed index,
// the last operation in the log.
- CloseAndReopenQueue();
+ CloseAndReopenQueue(last_logged_opid, last_logged_opid);
- queue_->Init(leader_last_op, leader_last_op);
- queue_->SetLeaderMode(leader_last_op.index(),
- leader_last_op.term(),
+ queue_->SetLeaderMode(last_logged_opid.index(),
+ last_logged_opid.term(),
BuildRaftConfigPBForTests(3));
ConsensusRequestPB request;
@@ -570,14 +569,12 @@ TEST_F(ConsensusQueueTest, TestQueueHandlesOperationOverwriting) {
}
}
+ OpId last_in_log = MakeOpId(opid.term(), opid.index() - 1);
+ int64_t committed_index = 15;
// Now reset the queue so that we can pass a new committed index (15).
- CloseAndReopenQueue();
+ CloseAndReopenQueue(last_in_log, MakeOpId(2, committed_index));
- OpId last_in_log;
- log_->GetLatestEntryOpId(&last_in_log);
- int64_t committed_index = 15;
- queue_->Init(last_in_log, MakeOpId(2, committed_index));
queue_->SetLeaderMode(committed_index,
last_in_log.term(),
BuildRaftConfigPBForTests(3));
@@ -657,7 +654,6 @@ TEST_F(ConsensusQueueTest, TestQueueHandlesOperationOverwriting) {
// operations, which would cause a check failure on the write immediately
// following the overwriting write.
TEST_F(ConsensusQueueTest, TestQueueMovesWatermarksBackward) {
- queue_->Init(MinimumOpId(), MinimumOpId());
queue_->SetNonLeaderMode();
// Append a bunch of messages and update as if they were also appeneded to the leader.
queue_->UpdateLastIndexAppendedToLeader(10);
@@ -725,7 +721,7 @@ TEST_F(ConsensusQueueTest, TestOnlyAdvancesWatermarkWhenPeerHasAPrefixOfOurLog)
FLAGS_consensus_max_batch_size_bytes = 1024 * 10;
const int kInitialCommittedIndex = 30;
- queue_->Init(MakeOpId(72, 30), MakeOpId(82, 30));
+ CloseAndReopenQueue(MakeOpId(72, 30), MakeOpId(82, 30));
queue_->SetLeaderMode(kInitialCommittedIndex, 76, BuildRaftConfigPBForTests(3));
ConsensusRequestPB request;
@@ -819,7 +815,6 @@ TEST_F(ConsensusQueueTest, TestOnlyAdvancesWatermarkWhenPeerHasAPrefixOfOurLog)
// Test that Tablet Copy is triggered when a "tablet not found" error occurs.
TEST_F(ConsensusQueueTest, TestTriggerTabletCopyIfTabletNotFound) {
- queue_->Init(MinimumOpId(), MinimumOpId());
queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm, BuildRaftConfigPBForTests(3));
AppendReplicateMessagesToQueue(queue_.get(), clock_, 1, 100);
@@ -859,7 +854,6 @@ TEST_F(ConsensusQueueTest, TestTriggerTabletCopyIfTabletNotFound) {
}
TEST_F(ConsensusQueueTest, TestFollowerCommittedIndexAndMetrics) {
- queue_->Init(MinimumOpId(), MinimumOpId());
queue_->SetNonLeaderMode();
// Emulate a follower sending a request to replicate 10 messages.
http://git-wip-us.apache.org/repos/asf/kudu/blob/dc65abba/src/kudu/consensus/consensus_queue.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc
index b22b539..4445137 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -114,40 +114,38 @@ PeerMessageQueue::Metrics::Metrics(const scoped_refptr<MetricEntity>& metric_ent
}
#undef INSTANTIATE_METRIC
-PeerMessageQueue::PeerMessageQueue(const scoped_refptr<MetricEntity>& metric_entity,
- const scoped_refptr<log::Log>& log,
+PeerMessageQueue::PeerMessageQueue(scoped_refptr<MetricEntity> metric_entity,
+ scoped_refptr<log::Log> log,
scoped_refptr<TimeManager> time_manager,
- const RaftPeerPB& local_peer_pb,
- const string& tablet_id,
- unique_ptr<ThreadPoolToken> raft_pool_observers_token)
+ RaftPeerPB local_peer_pb,
+ string tablet_id,
+ unique_ptr<ThreadPoolToken> raft_pool_observers_token,
+ OpId last_locally_replicated,
+ const OpId& last_locally_committed)
: raft_pool_observers_token_(std::move(raft_pool_observers_token)),
- local_peer_pb_(local_peer_pb),
- tablet_id_(tablet_id),
- log_cache_(metric_entity, log, local_peer_pb.permanent_uuid(), tablet_id),
+ local_peer_pb_(std::move(local_peer_pb)),
+ tablet_id_(std::move(tablet_id)),
+ log_cache_(metric_entity, log, local_peer_pb_.permanent_uuid(), tablet_id_),
metrics_(metric_entity),
time_manager_(std::move(time_manager)) {
DCHECK(local_peer_pb_.has_permanent_uuid());
DCHECK(local_peer_pb_.has_last_known_addr());
+ DCHECK(last_locally_replicated.IsInitialized());
+ DCHECK(last_locally_committed.IsInitialized());
queue_state_.current_term = 0;
queue_state_.first_index_in_current_term = boost::none;
queue_state_.committed_index = 0;
queue_state_.all_replicated_index = 0;
queue_state_.majority_replicated_index = 0;
queue_state_.last_idx_appended_to_leader = 0;
- queue_state_.state = kQueueConstructed;
queue_state_.mode = NON_LEADER;
queue_state_.majority_size_ = -1;
-}
-
-void PeerMessageQueue::Init(const OpId& last_locally_replicated,
- const OpId& last_locally_committed) {
- std::lock_guard<simple_spinlock> lock(queue_lock_);
- CHECK_EQ(queue_state_.state, kQueueConstructed);
- log_cache_.Init(last_locally_replicated);
- queue_state_.last_appended = last_locally_replicated;
- queue_state_.state = kQueueOpen;
+ queue_state_.last_appended = std::move(last_locally_replicated);
queue_state_.committed_index = last_locally_committed.index();
- TrackPeerUnlocked(local_peer_pb_.permanent_uuid());
+ queue_state_.state = kQueueOpen;
+ // TODO(mpercy): Merge LogCache::Init() with its constructor.
+ log_cache_.Init(queue_state_.last_appended);
+ TrackPeer(local_peer_pb_.permanent_uuid());
}
void PeerMessageQueue::SetLeaderMode(int64_t committed_index,
@@ -322,6 +320,7 @@ Status PeerMessageQueue::AppendOperations(const vector<ReplicateRefPtr>& msgs,
last_id,
log_append_callback)));
lock.lock();
+ DCHECK(last_id.IsInitialized());
queue_state_.last_appended = last_id;
UpdateMetricsUnlocked();
@@ -337,6 +336,7 @@ void PeerMessageQueue::TruncateOpsAfter(int64_t index) {
index));
{
std::unique_lock<simple_spinlock> lock(queue_lock_);
+ DCHECK(op.IsInitialized());
queue_state_.last_appended = op;
}
log_cache_.TruncateOpsAfter(op.index());
@@ -344,11 +344,13 @@ void PeerMessageQueue::TruncateOpsAfter(int64_t index) {
OpId PeerMessageQueue::GetLastOpIdInLog() const {
std::unique_lock<simple_spinlock> lock(queue_lock_);
+ DCHECK(queue_state_.last_appended.IsInitialized());
return queue_state_.last_appended;
}
OpId PeerMessageQueue::GetNextOpId() const {
std::unique_lock<simple_spinlock> lock(queue_lock_);
+ DCHECK(queue_state_.last_appended.IsInitialized());
return MakeOpId(queue_state_.current_term,
queue_state_.last_appended.index() + 1);
}
@@ -632,7 +634,6 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
Mode mode_copy;
{
std::lock_guard<simple_spinlock> scoped_lock(queue_lock_);
- DCHECK_NE(kQueueConstructed, queue_state_.state);
TrackedPeer* peer = FindPtrOrNull(peers_map_, peer_uuid);
if (PREDICT_FALSE(queue_state_.state != kQueueOpen || peer == nullptr)) {
http://git-wip-us.apache.org/repos/asf/kudu/blob/dc65abba/src/kudu/consensus/consensus_queue.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.h b/src/kudu/consensus/consensus_queue.h
index dfb3712..976dd1d 100644
--- a/src/kudu/consensus/consensus_queue.h
+++ b/src/kudu/consensus/consensus_queue.h
@@ -139,16 +139,14 @@ class PeerMessageQueue {
int64_t last_seen_term_;
};
- PeerMessageQueue(const scoped_refptr<MetricEntity>& metric_entity,
- const scoped_refptr<log::Log>& log,
+ PeerMessageQueue(scoped_refptr<MetricEntity> metric_entity,
+ scoped_refptr<log::Log> log,
scoped_refptr<TimeManager> time_manager,
- const RaftPeerPB& local_peer_pb,
- const std::string& tablet_id,
- std::unique_ptr<ThreadPoolToken> raft_pool_observers_token);
-
- // Initialize the queue.
- void Init(const OpId& last_locally_replicated,
- const OpId& last_locally_committed);
+ RaftPeerPB local_peer_pb,
+ std::string tablet_id,
+ std::unique_ptr<ThreadPoolToken> raft_pool_observers_token,
+ OpId last_locally_replicated,
+ const OpId& last_locally_committed);
// Changes the queue to leader mode, meaning it tracks majority replicated
// operations and notifies observers when those change.
@@ -324,7 +322,6 @@ class PeerMessageQueue {
};
enum State {
- kQueueConstructed,
kQueueOpen,
kQueueClosed
};
http://git-wip-us.apache.org/repos/asf/kudu/blob/dc65abba/src/kudu/consensus/log.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index 1df37d9..7100630 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -637,18 +637,6 @@ Status Log::DoAppend(LogEntryBatch* entry_batch) {
return Status::OK();
}
- // We keep track of the last-written OpId here.
- // This is needed to initialize Consensus on startup.
- if (entry_batch->type_ == REPLICATE) {
- // TODO Probably remove the code below as it looks suspicious: TabletReplica uses this
- // as 'safe' anchor as it believes it in the log, when it actually isn't, i.e. this
- // is not the last durable operation. Either move this to TabletReplica (since we're
- // using in flights anyway no need to scan for ids here) or actually delay doing this
- // until fsync() has been done. See KUDU-527.
- std::lock_guard<rw_spinlock> write_lock(last_entry_op_id_lock_);
- last_entry_op_id_.CopyFrom(entry_batch->MaxReplicateOpId());
- }
-
// if the size of this entry overflows the current segment, get a new one
if (allocation_state() == kAllocationNotStarted) {
if ((active_segment_->Size() + entry_batch_bytes + 4) > max_segment_size_) {
@@ -828,15 +816,6 @@ Status Log::WaitUntilAllFlushed() {
return s.Wait();
}
-void Log::GetLatestEntryOpId(consensus::OpId* op_id) const {
- shared_lock<rw_spinlock> l(last_entry_op_id_lock_);
- if (last_entry_op_id_.IsInitialized()) {
- DCHECK_NOTNULL(op_id)->CopyFrom(last_entry_op_id_);
- } else {
- *op_id = consensus::MinimumOpId();
- }
-}
-
Status Log::GC(RetentionIndexes retention_indexes, int32_t* num_gced) {
CHECK_GE(retention_indexes.for_durability, 0);
http://git-wip-us.apache.org/repos/asf/kudu/blob/dc65abba/src/kudu/consensus/log.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log.h b/src/kudu/consensus/log.h
index 9f4d77c..c71dbbd 100644
--- a/src/kudu/consensus/log.h
+++ b/src/kudu/consensus/log.h
@@ -169,10 +169,6 @@ class Log : public RefCountedThreadSafe<Log> {
return tablet_id_;
}
- // Gets the last-used OpId written to the log.
- // If no entry has ever been written to the log, returns (0, 0)
- void GetLatestEntryOpId(consensus::OpId* op_id) const;
-
// Runs the garbage collector on the set of previous segments. Segments that
// only refer to in-mem state that has been flushed are candidates for
// garbage collection.
@@ -369,14 +365,6 @@ class Log : public RefCountedThreadSafe<Log> {
// of the operation in the log.
scoped_refptr<LogIndex> log_index_;
- // Lock to protect last_entry_op_id_, which is constantly written but
- // read occasionally by things like consensus and log GC.
- mutable rw_spinlock last_entry_op_id_lock_;
-
- // The last known OpId for a REPLICATE message appended to this log
- // (any segment). NOTE: this op is not necessarily durable.
- consensus::OpId last_entry_op_id_;
-
// A footer being prepared for the current segment.
// When the segment is closed, it will be written.
LogSegmentFooterPB footer_builder_;
http://git-wip-us.apache.org/repos/asf/kudu/blob/dc65abba/src/kudu/consensus/mt-log-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/mt-log-test.cc b/src/kudu/consensus/mt-log-test.cc
index b5502e6..6f4ea43 100644
--- a/src/kudu/consensus/mt-log-test.cc
+++ b/src/kudu/consensus/mt-log-test.cc
@@ -184,11 +184,9 @@ class MultiThreadedLogTest : public LogTestBase {
for (int i = 0; i < FLAGS_num_reader_threads; i++) {
reader_threads.emplace_back([&]() {
std::map<int64_t, int64_t> map;
- OpId opid;
while (!stop_reader) {
- log_->GetLatestEntryOpId(&opid);
log_->GetReplaySizeMap(&map);
- IgnoreResult(log_->GetGCableDataSize(RetentionIndexes(FLAGS_num_batches_per_thread)));
+ log_->GetGCableDataSize(RetentionIndexes(FLAGS_num_batches_per_thread));
}
});
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/dc65abba/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index 2c900a5..c082b89 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -149,6 +149,7 @@ METRIC_DEFINE_gauge_int64(tablet, raft_term,
using kudu::pb_util::SecureShortDebugString;
using kudu::tserver::TabletServerErrorPB;
using std::string;
+using std::unique_ptr;
using strings::Substitute;
namespace {
@@ -220,17 +221,24 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
ReplicaTransactionFactory* txn_factory,
scoped_refptr<MetricEntity> metric_entity,
Callback<void(const std::string& reason)> mark_dirty_clbk) {
+ DCHECK(metric_entity);
+
peer_proxy_factory_ = DCHECK_NOTNULL(std::move(peer_proxy_factory));
log_ = DCHECK_NOTNULL(std::move(log));
time_manager_ = DCHECK_NOTNULL(std::move(time_manager));
txn_factory_ = DCHECK_NOTNULL(txn_factory);
mark_dirty_clbk_ = std::move(mark_dirty_clbk);
- DCHECK(metric_entity);
term_metric_ = metric_entity->FindOrCreateGauge(&METRIC_raft_term, cmeta_->current_term());
follower_memory_pressure_rejections_ =
metric_entity->FindOrCreateCounter(&METRIC_follower_memory_pressure_rejections);
+ // A single Raft thread pool token is shared between RaftConsensus and
+ // PeerManager. Because PeerManager is owned by RaftConsensus, it receives a
+ // raw pointer to the token, to emphasize that RaftConsensus is responsible
+ // for destroying the token.
+ raft_pool_token_ = raft_pool_->NewToken(ThreadPool::ExecutionMode::CONCURRENT);
+
// The message queue that keeps track of which operations need to be replicated
// where.
//
@@ -240,30 +248,27 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
//
// TODO(adar): the token is SERIAL to match the previous single-thread
// observer pool behavior, but CONCURRENT may be safe here.
- queue_.reset(new PeerMessageQueue(std::move(metric_entity),
- log_,
- time_manager_,
- local_peer_pb_,
- options_.tablet_id,
- raft_pool_->NewToken(ThreadPool::ExecutionMode::SERIAL)));
-
- // A single Raft thread pool token is shared between RaftConsensus and
- // PeerManager. Because PeerManager is owned by RaftConsensus, it receives a
- // raw pointer to the token, to emphasize that RaftConsensus is responsible
- // for destroying the token.
- raft_pool_token_ = raft_pool_->NewToken(ThreadPool::ExecutionMode::CONCURRENT);
+ unique_ptr<PeerMessageQueue> queue(new PeerMessageQueue(
+ std::move(metric_entity),
+ log_,
+ time_manager_,
+ local_peer_pb_,
+ options_.tablet_id,
+ raft_pool_->NewToken(ThreadPool::ExecutionMode::SERIAL),
+ info.last_id,
+ info.last_committed_id));
// A manager for the set of peers that actually send the operations both remotely
// and to the local wal.
- peer_manager_.reset(new PeerManager(options_.tablet_id,
- peer_uuid(),
- peer_proxy_factory_.get(),
- queue_.get(),
- raft_pool_token_.get(),
- log_));
-
- pending_.reset(new PendingRounds(Substitute("T $0 P $1: ", options_.tablet_id, peer_uuid()),
- time_manager_));
+ unique_ptr<PeerManager> peer_manager(new PeerManager(options_.tablet_id,
+ peer_uuid(),
+ peer_proxy_factory_.get(),
+ queue.get(),
+ raft_pool_token_.get(),
+ log_));
+
+ unique_ptr<PendingRounds> pending(new PendingRounds(LogPrefixThreadSafe(), time_manager_));
+
failure_monitor_.reset(new RandomizedFailureMonitor(GetRandomSeed32(),
GetFailureMonitorCheckMeanMs(),
GetFailureMonitorCheckStddevMs()));
@@ -281,7 +286,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
// That happens separately via the helper functions
// EnsureFailureDetector(Enabled/Disabled)Unlocked();
RETURN_NOT_OK(failure_monitor_->MonitorFailureDetector(options_.tablet_id,
- failure_detector_));
+ failure_detector_));
{
ThreadRestrictions::AssertWaitAllowed();
@@ -289,7 +294,12 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
CHECK_EQ(kInitialized, state_) << LogPrefixUnlocked() << "Illegal state for Start(): "
<< State_Name(state_);
+ queue_ = std::move(queue);
+ peer_manager_ = std::move(peer_manager);
+ pending_ = std::move(pending);
+
ClearLeaderUnlocked();
+ RETURN_NOT_OK(EnsureFailureDetectorEnabled());
// Our last persisted term can be higher than the last persisted operation
// (i.e. if we called an election) but reverse should never happen.
@@ -302,8 +312,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
GetCurrentTermUnlocked()));
}
- state_ = kRunning;
-
+ // Append any uncommitted replicate messages found during log replay to the queue.
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Replica starting. Triggering "
<< info.orphaned_replicates.size()
<< " pending transactions. Active config: "
@@ -313,18 +322,10 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
RETURN_NOT_OK(StartReplicaTransactionUnlocked(replicate_ptr));
}
+ // Set the initial committed opid for the PendingRounds only after
+ // appending any uncommitted replicate messages to the queue.
pending_->SetInitialCommittedOpId(info.last_committed_id);
- queue_->Init(info.last_id, info.last_committed_id);
- }
-
- {
- ThreadRestrictions::AssertWaitAllowed();
- LockGuard l(lock_);
- RETURN_NOT_OK(CheckRunningUnlocked());
-
- RETURN_NOT_OK(EnsureFailureDetectorEnabled());
-
// If this is the first term expire the FD immediately so that we have a fast first
// election, otherwise we just let the timer expire normally.
if (GetCurrentTermUnlocked() == 0) {
@@ -344,6 +345,8 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
// Now assume "follower" duties.
RETURN_NOT_OK(BecomeReplicaUnlocked());
+
+ state_ = kRunning;
}
if (IsSingleVoterConfig() && FLAGS_enable_leader_failure_detection) {
@@ -650,6 +653,7 @@ Status RaftConsensus::AppendNewRoundToQueueUnlocked(const scoped_refptr<Consensu
Status RaftConsensus::AddPendingOperationUnlocked(const scoped_refptr<ConsensusRound>& round) {
DCHECK(lock_.is_locked());
+ DCHECK(pending_);
// If we are adding a pending config change, we need to propagate it to the
// metadata.
@@ -1509,7 +1513,7 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB*
// Candidate must have last-logged OpId at least as large as our own to get
// our vote.
- OpId local_last_logged_opid = GetLatestOpIdFromLog();
+ OpId local_last_logged_opid = queue_->GetLastOpIdInLog();
bool vote_yes = !OpIdLessThan(request->candidate_status().last_received(),
local_last_logged_opid);
@@ -1826,12 +1830,6 @@ void RaftConsensus::Shutdown() {
shutdown_.Store(true, kMemOrderRelease);
}
-OpId RaftConsensus::GetLatestOpIdFromLog() {
- OpId id;
- log_->GetLatestEntryOpId(&id);
- return id;
-}
-
Status RaftConsensus::StartConsensusOnlyRoundUnlocked(const ReplicateRefPtr& msg) {
DCHECK(lock_.is_locked());
OperationType op_type = msg->get()->op_type();
@@ -2241,18 +2239,27 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
}
}
-Status RaftConsensus::GetLastOpId(OpIdType type, OpId* id) {
+boost::optional<OpId> RaftConsensus::GetLastOpId(OpIdType type) {
ThreadRestrictions::AssertWaitAllowed();
LockGuard l(lock_);
- if (type == RECEIVED_OPID) {
- *DCHECK_NOTNULL(id) = queue_->GetLastOpIdInLog();
- } else if (type == COMMITTED_OPID) {
- id->set_term(pending_->GetTermWithLastCommittedOp());
- id->set_index(pending_->GetCommittedIndex());
- } else {
- return Status::InvalidArgument("Unsupported OpIdType", OpIdType_Name(type));
+ return GetLastOpIdUnlocked(type);
+}
+
+boost::optional<OpId> RaftConsensus::GetLastOpIdUnlocked(OpIdType type) {
+ // Return early if this method is called on an instance of RaftConsensus that
+ // has not yet been started, failed during Init(), or failed during Start().
+ if (!queue_ || !pending_) return boost::none;
+
+ switch (type) {
+ case RECEIVED_OPID:
+ return queue_->GetLastOpIdInLog();
+ case COMMITTED_OPID:
+ return MakeOpId(pending_->GetTermWithLastCommittedOp(),
+ pending_->GetCommittedIndex());
+ default:
+ LOG(DFATAL) << LogPrefixUnlocked() << "Invalid OpIdType " << type;
+ return boost::none;
}
- return Status::OK();
}
log::RetentionIndexes RaftConsensus::GetRetentionIndexes() {
http://git-wip-us.apache.org/repos/asf/kudu/blob/dc65abba/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index 4ecb25e..5d761e0 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -262,8 +262,8 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
// Returns the last OpId (either received or committed, depending on the
// 'type' argument) that the Consensus implementation knows about.
- // Primarily used for testing purposes.
- Status GetLastOpId(OpIdType type, OpId* id);
+ // Returns boost::none if RaftConsensus was not properly initialized.
+ boost::optional<OpId> GetLastOpId(OpIdType type);
// Returns the current Raft role of this instance.
RaftPeerPB::Role role() const;
@@ -454,9 +454,6 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
// and also truncate the LogCache accordingly.
void TruncateAndAbortOpsAfterUnlocked(int64_t truncate_after_index);
- // Returns the most recent OpId written to the Log.
- OpId GetLatestOpIdFromLog();
-
// Begin a replica transaction. If the type of message in 'msg' is not a type
// that uses transactions, delegates to StartConsensusOnlyRoundUnlocked().
Status StartReplicaTransactionUnlocked(const ReplicateRefPtr& msg);
@@ -675,6 +672,9 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
const ConsensusOptions& GetOptions() const;
+ // See GetLastOpId().
+ boost::optional<OpId> GetLastOpIdUnlocked(OpIdType type);
+
std::string LogPrefix() const;
std::string LogPrefixUnlocked() const;
@@ -724,10 +724,10 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
// this factory to start it.
ReplicaTransactionFactory* txn_factory_;
- gscoped_ptr<PeerManager> peer_manager_;
+ std::unique_ptr<PeerManager> peer_manager_;
// The queue of messages that must be sent to peers.
- gscoped_ptr<PeerMessageQueue> queue_;
+ std::unique_ptr<PeerMessageQueue> queue_;
// The currently pending rounds that have not yet been committed by
// consensus. Protected by 'lock_'.
http://git-wip-us.apache.org/repos/asf/kudu/blob/dc65abba/src/kudu/consensus/raft_consensus_quorum-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc
index 0790993..5439cd8 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -339,11 +339,14 @@ class RaftConsensusQuorumTest : public KuduTest {
int backoff_exp = 0;
const int kMaxBackoffExp = 8;
- OpId committed;
+ OpId committed = MinimumOpId();
while (true) {
- if (peer->GetLastOpId(COMMITTED_OPID, &committed).ok() &&
- committed.index() >= to_wait_for) {
- return;
+ boost::optional<OpId> opt_committed = peer->GetLastOpId(COMMITTED_OPID);
+ if (opt_committed) {
+ committed = *opt_committed;
+ if (committed.index() >= to_wait_for) {
+ return;
+ }
}
if (MonoTime::Now() > (start + timeout)) {
break;
http://git-wip-us.apache.org/repos/asf/kudu/blob/dc65abba/src/kudu/tablet/tablet_metadata.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_metadata.cc b/src/kudu/tablet/tablet_metadata.cc
index 93502e8..4d9a197 100644
--- a/src/kudu/tablet/tablet_metadata.cc
+++ b/src/kudu/tablet/tablet_metadata.cc
@@ -183,6 +183,7 @@ vector<BlockId> TabletMetadata::CollectBlockIds() {
Status TabletMetadata::DeleteTabletData(TabletDataState delete_type,
const boost::optional<OpId>& last_logged_opid) {
+ DCHECK(!last_logged_opid || last_logged_opid->IsInitialized());
CHECK(delete_type == TABLET_DATA_DELETED ||
delete_type == TABLET_DATA_TOMBSTONED ||
delete_type == TABLET_DATA_COPYING)
@@ -202,9 +203,7 @@ Status TabletMetadata::DeleteTabletData(TabletDataState delete_type,
}
rowsets_.clear();
tablet_data_state_ = delete_type;
- if (last_logged_opid) {
- tombstone_last_logged_opid_ = *last_logged_opid;
- }
+ tombstone_last_logged_opid_ = last_logged_opid;
}
// Keep a copy of the old data dir group in case of flush failure.
@@ -278,7 +277,6 @@ TabletMetadata::TabletMetadata(FsManager* fs_manager, string tablet_id,
table_name_(std::move(table_name)),
partition_schema_(std::move(partition_schema)),
tablet_data_state_(tablet_data_state),
- tombstone_last_logged_opid_(MinimumOpId()),
num_flush_pins_(0),
needs_flush_(false),
pre_flush_callback_(Bind(DoNothingStatusClosure)) {
@@ -297,7 +295,6 @@ TabletMetadata::TabletMetadata(FsManager* fs_manager, string tablet_id)
fs_manager_(fs_manager),
next_rowset_idx_(0),
schema_(nullptr),
- tombstone_last_logged_opid_(MinimumOpId()),
num_flush_pins_(0),
needs_flush_(false),
pre_flush_callback_(Bind(DoNothingStatusClosure)) {}
@@ -403,10 +400,15 @@ Status TabletMetadata::LoadFromSuperBlock(const TabletSuperBlockPB& superblock)
fs::DataDirManager::DirDistributionMode::ACROSS_ALL_DIRS));
}
- if (superblock.has_tombstone_last_logged_opid()) {
+ // Note: Previous versions of Kudu used MinimumOpId() as a "null" value on
+ // disk for the last-logged opid, so we special-case it at load time and
+ // consider it equal to "not present".
+ if (superblock.has_tombstone_last_logged_opid() &&
+ superblock.tombstone_last_logged_opid().IsInitialized() &&
+ !OpIdEquals(MinimumOpId(), superblock.tombstone_last_logged_opid())) {
tombstone_last_logged_opid_ = superblock.tombstone_last_logged_opid();
} else {
- tombstone_last_logged_opid_ = MinimumOpId();
+ tombstone_last_logged_opid_ = boost::none;
}
}
@@ -587,6 +589,11 @@ Status TabletMetadata::ReplaceSuperBlockUnlocked(const TabletSuperBlockPB &pb) {
return Status::OK();
}
+boost::optional<consensus::OpId> TabletMetadata::tombstone_last_logged_opid() const {
+ std::lock_guard<LockType> l(data_lock_);
+ return tombstone_last_logged_opid_;
+}
+
Status TabletMetadata::ReadSuperBlockFromDisk(TabletSuperBlockPB* superblock) const {
string path = fs_manager_->GetTabletMetadataPath(tablet_id_);
RETURN_NOT_OK_PREPEND(
@@ -623,8 +630,9 @@ Status TabletMetadata::ToSuperBlockUnlocked(TabletSuperBlockPB* super_block,
"Couldn't serialize schema into superblock");
pb.set_tablet_data_state(tablet_data_state_);
- if (!OpIdEquals(tombstone_last_logged_opid_, MinimumOpId())) {
- *pb.mutable_tombstone_last_logged_opid() = tombstone_last_logged_opid_;
+ if (tombstone_last_logged_opid_ &&
+ !OpIdEquals(MinimumOpId(), *tombstone_last_logged_opid_)) {
+ *pb.mutable_tombstone_last_logged_opid() = *tombstone_last_logged_opid_;
}
for (const BlockId& block_id : orphaned_blocks_) {
http://git-wip-us.apache.org/repos/asf/kudu/blob/dc65abba/src/kudu/tablet/tablet_metadata.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_metadata.h b/src/kudu/tablet/tablet_metadata.h
index afac374..ca524b7 100644
--- a/src/kudu/tablet/tablet_metadata.h
+++ b/src/kudu/tablet/tablet_metadata.h
@@ -224,9 +224,10 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> {
void SetLastDurableMrsIdForTests(int64_t mrs_id) { last_durable_mrs_id_ = mrs_id; }
- void SetPreFlushCallback(StatusClosure callback) { pre_flush_callback_ = callback; }
+ void SetPreFlushCallback(StatusClosure callback) { pre_flush_callback_ = std::move(callback); }
- consensus::OpId tombstone_last_logged_opid() const { return tombstone_last_logged_opid_; }
+ // Return the last-logged opid of a tombstoned tablet, if known.
+ boost::optional<consensus::OpId> tombstone_last_logged_opid() const;
// Loads the currently-flushed superblock from disk into the given protobuf.
Status ReadSuperBlockFromDisk(TabletSuperBlockPB* superblock) const;
@@ -353,7 +354,7 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> {
// Record of the last opid logged by the tablet before it was last
// tombstoned. Has no meaning for non-tombstoned tablets.
- consensus::OpId tombstone_last_logged_opid_;
+ boost::optional<consensus::OpId> tombstone_last_logged_opid_;
// If this counter is > 0 then Flush() will not write any data to
// disk.
http://git-wip-us.apache.org/repos/asf/kudu/blob/dc65abba/src/kudu/tablet/tablet_replica-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica-test.cc b/src/kudu/tablet/tablet_replica-test.cc
index 7ebb64e..1f7de80 100644
--- a/src/kudu/tablet/tablet_replica-test.cc
+++ b/src/kudu/tablet/tablet_replica-test.cc
@@ -85,7 +85,9 @@ using consensus::ConsensusBootstrapInfo;
using consensus::ConsensusMetadata;
using consensus::ConsensusMetadataManager;
using consensus::OpId;
+using consensus::RECEIVED_OPID;
using consensus::RaftConfigPB;
+using consensus::RaftConsensus;
using consensus::RaftPeerPB;
using log::Log;
using log::LogOptions;
@@ -276,11 +278,11 @@ class TabletReplicaTest : public KuduTabletTest {
// Assert that the Log GC() anchor is earlier than the latest OpId in the Log.
void AssertLogAnchorEarlierThanLogLatest() {
log::RetentionIndexes retention = tablet_replica_->GetRetentionIndexes();
- OpId last_log_opid;
- tablet_replica_->log_->GetLatestEntryOpId(&last_log_opid);
- CHECK_LT(retention.for_durability, last_log_opid.index())
+ boost::optional<OpId> last_log_opid = tablet_replica_->consensus()->GetLastOpId(RECEIVED_OPID);
+ ASSERT_NE(boost::none, last_log_opid);
+ ASSERT_LT(retention.for_durability, last_log_opid->index())
<< "Expected valid log anchor, got earliest opid: " << retention.for_durability
- << " (expected any value earlier than last log id: " << SecureShortDebugString(last_log_opid)
+ << " (expected any value earlier than last log id: " << SecureShortDebugString(*last_log_opid)
<< ")";
}
@@ -343,7 +345,7 @@ TEST_F(TabletReplicaTest, TestMRSAnchorPreventsLogGC) {
ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
ASSERT_EQ(4, segments.size());
- AssertLogAnchorEarlierThanLogLatest();
+ NO_FATALS(AssertLogAnchorEarlierThanLogLatest());
ASSERT_GT(tablet_replica_->log_anchor_registry()->GetAnchorCountForTests(), 0);
// Ensure nothing gets deleted.
@@ -371,6 +373,7 @@ TEST_F(TabletReplicaTest, TestDMSAnchorPreventsLogGC) {
ASSERT_OK(StartReplicaAndWaitUntilLeader(info));
Log* log = tablet_replica_->log_.get();
+ shared_ptr<RaftConsensus> consensus = tablet_replica_->shared_consensus();
int32_t num_gced;
AssertNoLogAnchors();
@@ -394,10 +397,9 @@ TEST_F(TabletReplicaTest, TestDMSAnchorPreventsLogGC) {
ASSERT_EQ(2, segments.size());
AssertNoLogAnchors();
- OpId id;
- log->GetLatestEntryOpId(&id);
- LOG(INFO) << "Before: " << SecureShortDebugString(id);
-
+ boost::optional<OpId> id = consensus->GetLastOpId(consensus::RECEIVED_OPID);
+ ASSERT_NE(boost::none, id);
+ LOG(INFO) << "Before: " << *id;
// We currently have no anchors and the last operation in the log is 0.3
// Before the below was ExecuteDeletesAndRollLogs(1) but that was breaking
@@ -409,7 +411,7 @@ TEST_F(TabletReplicaTest, TestDMSAnchorPreventsLogGC) {
// Execute a mutation.
ASSERT_OK(ExecuteDeletesAndRollLogs(2));
- AssertLogAnchorEarlierThanLogLatest();
+ NO_FATALS(AssertLogAnchorEarlierThanLogLatest());
ASSERT_GT(tablet_replica_->log_anchor_registry()->GetAnchorCountForTests(), 0);
ASSERT_OK(log->reader()->GetSegmentsSnapshot(&segments));
ASSERT_EQ(4, segments.size());
@@ -529,7 +531,7 @@ TEST_F(TabletReplicaTest, TestActiveTransactionPreventsLogGC) {
ASSERT_EQ(0, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests());
ASSERT_EQ(1, tablet_replica_->txn_tracker_.GetNumPendingForTests());
- AssertLogAnchorEarlierThanLogLatest();
+ NO_FATALS(AssertLogAnchorEarlierThanLogLatest());
// Try to GC(), nothing should be deleted due to the in-flight transaction.
retention = tablet_replica_->GetRetentionIndexes();
http://git-wip-us.apache.org/repos/asf/kudu/blob/dc65abba/src/kudu/tools/kudu-tool-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 661a5ce..0051f91 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -123,6 +123,7 @@ using cfile::StringDataGenerator;
using cfile::WriterOptions;
using client::sp::shared_ptr;
using consensus::OpId;
+using consensus::RECEIVED_OPID;
using consensus::ReplicateRefPtr;
using consensus::ReplicateMsg;
using fs::FsReport;
@@ -1552,15 +1553,14 @@ TEST_F(ToolTest, TestLocalReplicaTombstoneDelete) {
// so that we can compare the size of the data on disk before and
// after the deletion of local_replica to verify that the size-on-disk
// is reduced after the tool operation.
- OpId last_logged_opid;
- last_logged_opid.Clear();
+ boost::optional<OpId> last_logged_opid;
string tablet_id;
{
vector<scoped_refptr<TabletReplica>> tablet_replicas;
ts->server()->tablet_manager()->GetTabletReplicas(&tablet_replicas);
ASSERT_EQ(1, tablet_replicas.size());
tablet_id = tablet_replicas[0]->tablet_id();
- tablet_replicas[0]->log()->GetLatestEntryOpId(&last_logged_opid);
+ last_logged_opid = tablet_replicas[0]->shared_consensus()->GetLastOpId(RECEIVED_OPID);
Tablet* tablet = tablet_replicas[0]->tablet();
ASSERT_OK(tablet->Flush());
}
@@ -1594,10 +1594,12 @@ TEST_F(ToolTest, TestLocalReplicaTombstoneDelete) {
ASSERT_EQ(tablet_id, tablet_replicas[0]->tablet_id());
ASSERT_EQ(TabletDataState::TABLET_DATA_TOMBSTONED,
tablet_replicas[0]->tablet_metadata()->tablet_data_state());
- OpId tombstoned_opid = tablet_replicas[0]->tablet_metadata()->tombstone_last_logged_opid();
- ASSERT_TRUE(tombstoned_opid.IsInitialized());
- ASSERT_EQ(last_logged_opid.term(), tombstoned_opid.term());
- ASSERT_EQ(last_logged_opid.index(), tombstoned_opid.index());
+ boost::optional<OpId> tombstoned_opid =
+ tablet_replicas[0]->tablet_metadata()->tombstone_last_logged_opid();
+ ASSERT_NE(boost::none, tombstoned_opid);
+ ASSERT_NE(boost::none, last_logged_opid);
+ ASSERT_EQ(last_logged_opid->term(), tombstoned_opid->term());
+ ASSERT_EQ(last_logged_opid->index(), tombstoned_opid->index());
}
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/dc65abba/src/kudu/tserver/tablet_copy_client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_client.cc b/src/kudu/tserver/tablet_copy_client.cc
index 672840e..a75750c 100644
--- a/src/kudu/tserver/tablet_copy_client.cc
+++ b/src/kudu/tserver/tablet_copy_client.cc
@@ -89,6 +89,7 @@ namespace tserver {
using consensus::ConsensusMetadata;
using consensus::ConsensusMetadataManager;
+using consensus::OpId;
using env_util::CopyFile;
using fs::CreateBlockOptions;
using fs::WritableBlock;
@@ -141,17 +142,28 @@ Status TabletCopyClient::SetTabletToReplace(const scoped_refptr<TabletMetadata>&
data_state));
}
- replace_tombstoned_tablet_ = true;
- meta_ = meta;
-
- int64_t last_logged_term = meta->tombstone_last_logged_opid().term();
- if (last_logged_term > caller_term) {
+ boost::optional<OpId> last_logged_opid = meta->tombstone_last_logged_opid();
+ if (!last_logged_opid) {
+ // There are certain cases where we can end up with a tombstoned replica
+ // that does not store its last-logged opid. One such case is when there is
+ // WAL corruption at startup time, resulting in a replica being evicted and
+ // deleted. In such a case, it is not possible to determine the last-logged
+ // opid. Another such case (at the time of writing) is initialization
+ // failure due to any number of problems, resulting in the replica going
+ // into an error state. If the replica is tombstoned while in an error
+ // state, the last-logged opid will not be stored. See KUDU-2106.
+ LOG_WITH_PREFIX(INFO) << "overwriting existing tombstoned replica "
+ "with an unknown last-logged opid";
+ } else if (last_logged_opid->term() > caller_term) {
return Status::InvalidArgument(
Substitute("Leader has term $0 but the last log entry written by the tombstoned replica "
"for tablet $1 has higher term $2. Refusing tablet copy from leader",
- caller_term, tablet_id_, last_logged_term));
+ caller_term, tablet_id_, last_logged_opid->term()));
}
+ replace_tombstoned_tablet_ = true;
+ meta_ = meta;
+
// Load the old consensus metadata, if it exists.
scoped_refptr<ConsensusMetadata> cmeta;
Status s = cmeta_manager_->Load(tablet_id_, &cmeta);
@@ -243,15 +255,16 @@ Status TabletCopyClient::Start(const HostPort& copy_source_addr,
// misconfiguration causes us to attempt to copy from an out-of-date
// source peer, even after passing the term check from the caller in
// SetTabletToReplace().
- int64_t last_logged_term = meta_->tombstone_last_logged_opid().term();
- if (last_logged_term > remote_cstate_->current_term()) {
+
+ boost::optional<OpId> last_logged_opid = meta_->tombstone_last_logged_opid();
+ if (last_logged_opid && last_logged_opid->term() > remote_cstate_->current_term()) {
return Status::InvalidArgument(
Substitute("Tablet $0: source peer has term $1 but "
"tombstoned replica has last-logged opid with higher term $2. "
"Refusing tablet copy from source peer $3",
tablet_id_,
remote_cstate_->current_term(),
- last_logged_term,
+ last_logged_opid->term(),
copy_peer_uuid));
}
@@ -644,7 +657,7 @@ Status TabletCopyClient::VerifyData(uint64_t offset, const DataChunkPB& chunk) {
}
string TabletCopyClient::LogPrefix() {
- return Substitute("T $0 P $1: Tablet Copy client: ",
+ return Substitute("T $0 P $1: tablet copy: ",
tablet_id_, fs_manager_->uuid());
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/dc65abba/src/kudu/tserver/tablet_copy_source_session.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_source_session.cc b/src/kudu/tserver/tablet_copy_source_session.cc
index 8c57cf0..bf6a313 100644
--- a/src/kudu/tserver/tablet_copy_source_session.cc
+++ b/src/kudu/tserver/tablet_copy_source_session.cc
@@ -114,8 +114,10 @@ Status TabletCopySourceSession::Init() {
}
// Get the latest opid in the log at this point in time so we can re-anchor.
- OpId last_logged_opid;
- CHECK_NOTNULL(tablet_replica_->log())->GetLatestEntryOpId(&last_logged_opid);
+ // TODO(mpercy): Do we need special handling for boost::none case?
+ boost::optional<OpId> last_logged_opid =
+ tablet_replica_->consensus()->GetLastOpId(consensus::RECEIVED_OPID);
+ if (!last_logged_opid) last_logged_opid = MinimumOpId();
// Get the current segments from the log, including the active segment.
// The Log doesn't add the active segment to the log reader's list until
@@ -153,7 +155,7 @@ Status TabletCopySourceSession::Init() {
// leader's log when tablet copy is slow. The remote controls when
// this anchor is released by ending the tablet copy session.
RETURN_NOT_OK(tablet_replica_->log_anchor_registry()->UpdateRegistration(
- last_logged_opid.index(), anchor_owner_token, &log_anchor_));
+ last_logged_opid->index(), anchor_owner_token, &log_anchor_));
LOG(INFO) << Substitute(
"T $0 P $1: Tablet Copy: opened $2 blocks and $3 log segments",
http://git-wip-us.apache.org/repos/asf/kudu/blob/dc65abba/src/kudu/tserver/tablet_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index f4cb01d..9f0bfe2 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -143,6 +143,9 @@ using kudu::consensus::GetNodeInstanceRequestPB;
using kudu::consensus::GetNodeInstanceResponsePB;
using kudu::consensus::LeaderStepDownRequestPB;
using kudu::consensus::LeaderStepDownResponsePB;
+using kudu::consensus::OpId;
+using kudu::consensus::UnsafeChangeConfigRequestPB;
+using kudu::consensus::UnsafeChangeConfigResponsePB;
using kudu::consensus::RaftConsensus;
using kudu::consensus::RunLeaderElectionRequestPB;
using kudu::consensus::RunLeaderElectionResponsePB;
@@ -1070,13 +1073,15 @@ void ConsensusServiceImpl::GetLastOpId(const consensus::GetLastOpIdRequestPB *re
resp, context);
return;
}
- Status s = consensus->GetLastOpId(req->opid_type(), resp->mutable_opid());
- if (PREDICT_FALSE(!s.ok())) {
- SetupErrorAndRespond(resp->mutable_error(), s,
- TabletServerErrorPB::UNKNOWN_ERROR,
+ boost::optional<OpId> opid = consensus->GetLastOpId(req->opid_type());
+ if (!opid) {
+ SetupErrorAndRespond(resp->mutable_error(),
+ Status::IllegalState("Cannot fetch last OpId in WAL"),
+ TabletServerErrorPB::TABLET_NOT_RUNNING,
context);
return;
}
+ *resp->mutable_opid() = *opid;
context->RespondSuccess();
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/dc65abba/src/kudu/tserver/ts_tablet_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index 7972380..47b8102 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -126,7 +126,10 @@ class Tablet;
using consensus::ConsensusMetadata;
using consensus::ConsensusMetadataManager;
using consensus::OpId;
+using consensus::OpIdToString;
+using consensus::RECEIVED_OPID;
using consensus::RaftConfigPB;
+using consensus::RaftConsensus;
using consensus::StartTabletCopyRequestPB;
using consensus::kMinimumTerm;
using fs::DataDirManager;
@@ -470,23 +473,29 @@ void TSTabletManager::RunTabletCopy(
LOG(FATAL) << LogPrefix(tablet_id) << "Tablet Copy: "
<< "Found tablet in TABLET_DATA_COPYING state during StartTabletCopy()";
case TABLET_DATA_TOMBSTONED: {
- int64_t last_logged_term = meta->tombstone_last_logged_opid().term();
- CALLBACK_RETURN_NOT_OK_WITH_ERROR(
- CheckLeaderTermNotLower(tablet_id, leader_term, last_logged_term),
- TabletServerErrorPB::INVALID_CONFIG);
+ boost::optional<OpId> last_logged_opid = meta->tombstone_last_logged_opid();
+ if (last_logged_opid) {
+ CALLBACK_RETURN_NOT_OK_WITH_ERROR(CheckLeaderTermNotLower(tablet_id, leader_term,
+ last_logged_opid->term()),
+ TabletServerErrorPB::INVALID_CONFIG);
+ }
break;
}
case TABLET_DATA_READY: {
- Log* log = old_replica->log();
- if (!log) {
+ shared_ptr<RaftConsensus> consensus = old_replica->shared_consensus();
+ if (!consensus) {
CALLBACK_AND_RETURN(
- Status::IllegalState("Log unavailable. Tablet is not running", tablet_id));
+ Status::IllegalState("consensus unavailable: tablet not running", tablet_id));
}
- OpId last_logged_opid;
- log->GetLatestEntryOpId(&last_logged_opid);
- int64_t last_logged_term = last_logged_opid.term();
+ boost::optional<OpId> opt_last_logged_opid = consensus->GetLastOpId(RECEIVED_OPID);
+ if (!opt_last_logged_opid) {
+ CALLBACK_AND_RETURN(
+ Status::IllegalState("cannot determine last-logged opid: tablet not running",
+ tablet_id));
+ }
+ CHECK(opt_last_logged_opid);
CALLBACK_RETURN_NOT_OK_WITH_ERROR(
- CheckLeaderTermNotLower(tablet_id, leader_term, last_logged_term),
+ CheckLeaderTermNotLower(tablet_id, leader_term, opt_last_logged_opid->term()),
TabletServerErrorPB::INVALID_CONFIG);
// Tombstone the tablet and store the last-logged OpId.
@@ -503,7 +512,8 @@ void TSTabletManager::RunTabletCopy(
// will simply tablet copy this replica again. We could try to
// check again after calling Shutdown(), and if the check fails, try to
// reopen the tablet. For now, we live with the (unlikely) race.
- Status s = DeleteTabletData(meta, cmeta_manager_, TABLET_DATA_TOMBSTONED, last_logged_opid);
+ Status s = DeleteTabletData(meta, cmeta_manager_, TABLET_DATA_TOMBSTONED,
+ opt_last_logged_opid);
if (PREDICT_FALSE(!s.ok())) {
CALLBACK_AND_RETURN(
s.CloneAndPrepend(Substitute("Unable to delete on-disk data from tablet $0",
@@ -636,8 +646,8 @@ Status TSTabletManager::DeleteTablet(
// it's tricky to fix. We could try checking again after the shutdown and
// restarting the tablet if the local replica committed a higher config
// change op during that time, or potentially something else more invasive.
+ shared_ptr<RaftConsensus> consensus = replica->shared_consensus();
if (cas_config_opid_index_less_or_equal && !tablet_deleted) {
- shared_ptr<consensus::RaftConsensus> consensus = replica->shared_consensus();
if (!consensus) {
*error_code = TabletServerErrorPB::TABLET_NOT_RUNNING;
return Status::IllegalState("Raft Consensus not available. Tablet shutting down");
@@ -655,10 +665,9 @@ Status TSTabletManager::DeleteTablet(
replica->Shutdown();
boost::optional<OpId> opt_last_logged_opid;
- if (replica->log()) {
- OpId last_logged_opid;
- replica->log()->GetLatestEntryOpId(&last_logged_opid);
- opt_last_logged_opid = last_logged_opid;
+ if (consensus) {
+ opt_last_logged_opid = consensus->GetLastOpId(RECEIVED_OPID);
+ DCHECK(!opt_last_logged_opid || opt_last_logged_opid->IsInitialized());
}
Status s = DeleteTabletData(replica->tablet_metadata(), cmeta_manager_, delete_type,
@@ -1063,8 +1072,8 @@ Status TSTabletManager::DeleteTabletData(
// that was previously in the metadata.
RETURN_NOT_OK(meta->DeleteTabletData(delete_type, last_logged_opid));
LOG(INFO) << LogPrefix(tablet_id, meta->fs_manager())
- << "Tablet deleted. Last logged OpId: "
- << meta->tombstone_last_logged_opid();
+ << "tablet deleted: last-logged OpId: "
+ << (last_logged_opid ? OpIdToString(*last_logged_opid) : "(unknown)");
MAYBE_FAULT(FLAGS_fault_crash_after_blocks_deleted);
RETURN_NOT_OK(Log::DeleteOnDiskData(meta->fs_manager(), meta->tablet_id()));
[2/2] kudu git commit: consensus: Tablet copy should clear
last-logged opid from superblock
Posted by mp...@apache.org.
consensus: Tablet copy should clear last-logged opid from superblock
Keeping around irrelevant information is bad hygiene.
Change-Id: Iaa84d59c63222e9ddb05dca492f9ecd47b5c63ea
Reviewed-on: http://gerrit.cloudera.org:8080/7718
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Mike Percy <mp...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/a7d58963
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/a7d58963
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/a7d58963
Branch: refs/heads/master
Commit: a7d589635bd889fe31205a600739ae938b803d97
Parents: dc65abb
Author: Mike Percy <mp...@apache.org>
Authored: Thu Aug 17 00:26:00 2017 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Tue Aug 22 00:26:58 2017 +0000
----------------------------------------------------------------------
src/kudu/integration-tests/tablet_copy-itest.cc | 62 ++++++++++++++++++++
src/kudu/tablet/tablet_metadata.cc | 3 +
src/kudu/tablet/tablet_metadata.h | 1 +
3 files changed, 66 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/a7d58963/src/kudu/integration-tests/tablet_copy-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/tablet_copy-itest.cc b/src/kudu/integration-tests/tablet_copy-itest.cc
index e8f44b1..0e0977d 100644
--- a/src/kudu/integration-tests/tablet_copy-itest.cc
+++ b/src/kudu/integration-tests/tablet_copy-itest.cc
@@ -904,6 +904,68 @@ TEST_F(TabletCopyITest, TestTabletCopyingDeletedTabletFails) {
ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id, 1));
}
+// Test that tablet copy clears the last-logged opid stored in the TabletMetadata.
+TEST_F(TabletCopyITest, TestTabletCopyClearsLastLoggedOpId) {
+ MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+ NO_FATALS(StartCluster({"--enable_leader_failure_detection=false"},
+ {"--catalog_manager_wait_for_new_tablets_to_elect_leader=false"}));
+
+ TestWorkload workload(cluster_.get());
+ workload.set_num_replicas(3);
+ workload.Setup();
+
+ int leader_index = 0;
+ TServerDetails* leader = ts_map_[cluster_->tablet_server(leader_index)->uuid()];
+
+ // Figure out the tablet id of the created tablet.
+ vector<ListTabletsResponsePB::StatusAndSchemaPB> tablets;
+ ASSERT_OK(WaitForNumTabletsOnTS(leader, 1, kTimeout, &tablets));
+ string tablet_id = tablets[0].tablet_status().tablet_id();
+
+ // Wait until all replicas are up and running.
+ for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+ ASSERT_OK(itest::WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(i)->uuid()],
+ tablet_id, kTimeout));
+ }
+
+ // Elect a leader for term 1, then generate some data to copy.
+ ASSERT_OK(itest::StartElection(leader, tablet_id, kTimeout));
+ workload.Start();
+ const int kMinBatches = 10;
+ while (workload.batches_completed() < kMinBatches) {
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ }
+ workload.StopAndJoin();
+ ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id, 1));
+
+ // No last-logged opid should initially be stored in the superblock.
+ tablet::TabletSuperBlockPB superblock_pb;
+ inspect_->ReadTabletSuperBlockOnTS(leader_index, tablet_id, &superblock_pb);
+ ASSERT_FALSE(superblock_pb.has_tombstone_last_logged_opid());
+
+ // Now tombstone the leader.
+ ASSERT_OK(itest::DeleteTablet(leader, tablet_id, TABLET_DATA_TOMBSTONED, boost::none, kTimeout));
+
+ // We should end up with a last-logged opid in the superblock.
+ inspect_->ReadTabletSuperBlockOnTS(leader_index, tablet_id, &superblock_pb);
+ ASSERT_TRUE(superblock_pb.has_tombstone_last_logged_opid());
+ consensus::OpId last_logged_opid = superblock_pb.tombstone_last_logged_opid();
+ ASSERT_EQ(1, last_logged_opid.term());
+ ASSERT_GT(last_logged_opid.index(), kMinBatches);
+
+ int follower_index = 1;
+ ASSERT_OK(itest::StartTabletCopy(leader, tablet_id,
+ cluster_->tablet_server(follower_index)->uuid(),
+ cluster_->tablet_server(follower_index)->bound_rpc_hostport(),
+ 1, // We are in term 1.
+ kTimeout));
+ ASSERT_OK(itest::WaitUntilTabletRunning(leader, tablet_id, kTimeout));
+
+ // Ensure that the last-logged opid has been cleared from the superblock.
+ inspect_->ReadTabletSuperBlockOnTS(leader_index, tablet_id, &superblock_pb);
+ ASSERT_FALSE(superblock_pb.has_tombstone_last_logged_opid());
+}
+
// Test that the tablet copy thread pool being full results in throttling and
// backpressure on the callers.
TEST_F(TabletCopyITest, TestTabletCopyThrottling) {
http://git-wip-us.apache.org/repos/asf/kudu/blob/a7d58963/src/kudu/tablet/tablet_metadata.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_metadata.cc b/src/kudu/tablet/tablet_metadata.cc
index 4d9a197..4ed5352 100644
--- a/src/kudu/tablet/tablet_metadata.cc
+++ b/src/kudu/tablet/tablet_metadata.cc
@@ -717,6 +717,9 @@ uint32_t TabletMetadata::schema_version() const {
void TabletMetadata::set_tablet_data_state(TabletDataState state) {
std::lock_guard<LockType> l(data_lock_);
+ if (state == TABLET_DATA_READY) {
+ tombstone_last_logged_opid_ = boost::none;
+ }
tablet_data_state_ = state;
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/a7d58963/src/kudu/tablet/tablet_metadata.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_metadata.h b/src/kudu/tablet/tablet_metadata.h
index ca524b7..3bbd849 100644
--- a/src/kudu/tablet/tablet_metadata.h
+++ b/src/kudu/tablet/tablet_metadata.h
@@ -147,6 +147,7 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> {
}
// Set / get the tablet copy / tablet data state.
+ // If set to TABLET_DATA_READY, also clears 'tombstone_last_logged_opid_'.
void set_tablet_data_state(TabletDataState state);
TabletDataState tablet_data_state() const;