You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2020/09/01 22:24:59 UTC
[kudu] branch master updated: KUDU-2612 p9: anchor participant ops
in WAL
This is an automated email from the ASF dual-hosted git repository.
awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 68ee03c KUDU-2612 p9: anchor participant ops in WAL
68ee03c is described below
commit 68ee03c2bd0bae950c08582910e675cda54b6ca7
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Fri Aug 7 18:16:46 2020 -0700
KUDU-2612 p9: anchor participant ops in WAL
Participant ops will now anchor WAL segments in the same way that we
anchor WAL segments for other in-memory state (e.g. MRS, DMS) that gets
rebuilt upon recovery.
Each participant op for a given transaction will update the single
anchor associated with that transaction. As a transaction is
transitioned from state to state, the transaction's prior participant op
is unanchored in favor of anchoring the new op.
Since there currently isn't a need to rid a participant of in-memory
state associated with a transaction, this adds a test-only method to
remove a committed or aborted transaction -- once this method called, a
transaction's anchor is removed and WAL GC may proceed to remove the
latest participant op.
Change-Id: I936f0a345c4b6095f0d99b6dd244e3092ae3f9d7
Reviewed-on: http://gerrit.cloudera.org:8080/16358
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Tested-by: Andrew Wong <aw...@cloudera.com>
---
src/kudu/consensus/log_anchor_registry-test.cc | 17 +-
src/kudu/consensus/log_anchor_registry.cc | 19 +-
src/kudu/consensus/log_anchor_registry.h | 12 +-
src/kudu/gutil/map-util.h | 14 +-
src/kudu/tablet/ops/participant_op.cc | 17 +-
src/kudu/tablet/ops/participant_op.h | 9 +-
src/kudu/tablet/tablet.h | 6 +-
src/kudu/tablet/tablet_bootstrap.cc | 2 +-
src/kudu/tablet/tablet_replica-test-base.cc | 40 +++-
src/kudu/tablet/tablet_replica-test-base.h | 16 +-
src/kudu/tablet/tablet_replica-test.cc | 18 --
src/kudu/tablet/txn_participant-test-util.h | 28 ++-
src/kudu/tablet/txn_participant-test.cc | 250 ++++++++++++++++++++++++-
src/kudu/tablet/txn_participant.cc | 20 +-
src/kudu/tablet/txn_participant.h | 55 +++++-
src/kudu/tserver/tablet_copy_source_session.cc | 2 +-
16 files changed, 449 insertions(+), 76 deletions(-)
diff --git a/src/kudu/consensus/log_anchor_registry-test.cc b/src/kudu/consensus/log_anchor_registry-test.cc
index a824152..04daf0c 100644
--- a/src/kudu/consensus/log_anchor_registry-test.cc
+++ b/src/kudu/consensus/log_anchor_registry-test.cc
@@ -25,6 +25,7 @@
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/monotime.h"
+#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
@@ -48,10 +49,24 @@ TEST_F(LogAnchorRegistryTest, TestUpdateRegistration) {
ASSERT_FALSE(anchor.is_registered);
ASSERT_FALSE(anchor.when_registered.Initialized());
reg->Register(kInitialIndex, test_name, &anchor);
+ SCOPED_CLEANUP({
+ ASSERT_OK(reg->Unregister(&anchor));
+ ASSERT_FALSE(anchor.is_registered);
+ });
ASSERT_TRUE(anchor.is_registered);
ASSERT_TRUE(anchor.when_registered.Initialized());
- ASSERT_OK(reg->UpdateRegistration(kInitialIndex + 1, test_name, &anchor));
+ ASSERT_OK(reg->RegisterOrUpdate(kInitialIndex + 1, test_name, &anchor));
+ ASSERT_EQ(kInitialIndex + 1, anchor.log_index);
ASSERT_OK(reg->Unregister(&anchor));
+ ASSERT_FALSE(anchor.is_registered);
+
+ ASSERT_OK(reg->RegisterOrUpdate(kInitialIndex + 2, test_name, &anchor));
+ ASSERT_TRUE(anchor.is_registered);
+ ASSERT_EQ(kInitialIndex + 2, anchor.log_index);
+
+ ASSERT_OK(reg->RegisterOrUpdate(kInitialIndex + 3, test_name, &anchor));
+ ASSERT_TRUE(anchor.is_registered);
+ ASSERT_EQ(kInitialIndex + 3, anchor.log_index);
}
TEST_F(LogAnchorRegistryTest, TestDuplicateInserts) {
diff --git a/src/kudu/consensus/log_anchor_registry.cc b/src/kudu/consensus/log_anchor_registry.cc
index 648e2ab..a2e5c5e 100644
--- a/src/kudu/consensus/log_anchor_registry.cc
+++ b/src/kudu/consensus/log_anchor_registry.cc
@@ -50,12 +50,13 @@ void LogAnchorRegistry::Register(int64_t log_index,
RegisterUnlocked(log_index, owner, anchor);
}
-Status LogAnchorRegistry::UpdateRegistration(int64_t log_index,
- const std::string& owner,
- LogAnchor* anchor) {
+Status LogAnchorRegistry::RegisterOrUpdate(int64_t log_index,
+ const std::string& owner,
+ LogAnchor* anchor) {
std::lock_guard<simple_spinlock> l(lock_);
- RETURN_NOT_OK_PREPEND(UnregisterUnlocked(anchor),
- "Unable to swap registration, anchor not registered");
+ if (anchor->is_registered) {
+ RETURN_NOT_OK(UnregisterUnlocked(anchor));
+ }
RegisterUnlocked(log_index, owner, anchor);
return Status::OK();
}
@@ -157,12 +158,10 @@ MinLogIndexAnchorer::~MinLogIndexAnchorer() {
void MinLogIndexAnchorer::AnchorIfMinimum(int64_t log_index) {
std::lock_guard<simple_spinlock> l(lock_);
- if (PREDICT_FALSE(minimum_log_index_ == kInvalidOpIdIndex)) {
- minimum_log_index_ = log_index;
- registry_->Register(minimum_log_index_, owner_, &anchor_);
- } else if (log_index < minimum_log_index_) {
+ if (log_index < minimum_log_index_ ||
+ PREDICT_FALSE(minimum_log_index_ == kInvalidOpIdIndex)) {
minimum_log_index_ = log_index;
- CHECK_OK(registry_->UpdateRegistration(minimum_log_index_, owner_, &anchor_));
+ registry_->RegisterOrUpdate(minimum_log_index_, owner_, &anchor_);
}
}
diff --git a/src/kudu/consensus/log_anchor_registry.h b/src/kudu/consensus/log_anchor_registry.h
index 56857d7..6ccb40f 100644
--- a/src/kudu/consensus/log_anchor_registry.h
+++ b/src/kudu/consensus/log_anchor_registry.h
@@ -51,13 +51,11 @@ class LogAnchorRegistry : public RefCountedThreadSafe<LogAnchorRegistry> {
// anchor: Pointer to LogAnchor structure that will be populated on registration.
void Register(int64_t log_index, const std::string& owner, LogAnchor* anchor);
- // Atomically update the registration of an anchor to a new log index.
- // Before: anchor must be registered with some log index.
- // After: anchor is now registered using index 'log_index'.
- // See Register().
- Status UpdateRegistration(int64_t log_index,
- const std::string& owner,
- LogAnchor* anchor);
+ // Register interest for a particular log index, or update an anchor if it
+ // already exists.
+ Status RegisterOrUpdate(int64_t log_index,
+ const std::string& owner,
+ LogAnchor* anchor);
// Release the anchor on a log index.
// Note: anchor must be the original pointer passed to Register().
diff --git a/src/kudu/gutil/map-util.h b/src/kudu/gutil/map-util.h
index ae6d2d1..985b91c 100644
--- a/src/kudu/gutil/map-util.h
+++ b/src/kudu/gutil/map-util.h
@@ -616,22 +616,22 @@ LookupOrInsertNewSharedPtr(
}
// A variant of LookupOrInsertNewSharedPtr where the value is constructed using
-// a single-parameter constructor. Note: the constructor argument is computed
-// even if it will not be used, so only values cheap to compute should be passed
-// here. On the other hand it does not matter how expensive the construction of
-// the actual stored value is, as that only occurs if necessary.
-template <class Collection, class Arg>
+// constructor arguments. Note: the constructor arguments are computed even if
+// they will not be used, so only values cheap to compute should be passed
+// here. On the other hand it does not matter how expensive the construction
+// of the actual stored value is, as that only occurs if necessary.
+template <class Collection, class... Args>
typename Collection::mapped_type&
LookupOrInsertNewSharedPtr(
Collection* const collection,
const typename Collection::key_type& key,
- const Arg& arg) {
+ const Args&... args) {
typedef typename Collection::mapped_type SharedPtr;
typedef typename Collection::mapped_type::element_type Element;
std::pair<typename Collection::iterator, bool> ret =
collection->insert(typename Collection::value_type(key, SharedPtr()));
if (ret.second) {
- ret.first->second.reset(new Element(arg));
+ ret.first->second.reset(new Element(args...));
}
return ret.first->second;
}
diff --git a/src/kudu/tablet/ops/participant_op.cc b/src/kudu/tablet/ops/participant_op.cc
index cf9250d..25b70c6 100644
--- a/src/kudu/tablet/ops/participant_op.cc
+++ b/src/kudu/tablet/ops/participant_op.cc
@@ -31,6 +31,7 @@
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/rpc_header.pb.h"
#include "kudu/tablet/ops/op.h"
+#include "kudu/tablet/tablet_replica.h"
#include "kudu/tablet/txn_participant.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/pb_util.h"
@@ -40,6 +41,7 @@
using kudu::consensus::CommitMsg;
using kudu::consensus::ReplicateMsg;
using kudu::consensus::OperationType;
+using kudu::consensus::OpId;
using kudu::pb_util::SecureShortDebugString;
using kudu::tablet::TabletReplica;
using kudu::tserver::ParticipantOpPB;
@@ -65,7 +67,8 @@ void ParticipantOpState::AcquireTxnAndLock() {
DCHECK(!txn_lock_);
DCHECK(!txn_);
int64_t txn_id = request_->op().txn_id();
- txn_ = txn_participant_->GetOrCreateTransaction(txn_id);
+ txn_ = txn_participant_->GetOrCreateTransaction(txn_id,
+ tablet_replica_->log_anchor_registry().get());
txn_->AcquireWriteLock(&txn_lock_);
}
@@ -103,7 +106,7 @@ Status ParticipantOpState::ValidateOp() const {
return Status::OK();
}
-Status ParticipantOpState::PerformOp() {
+Status ParticipantOpState::PerformOp(const OpId& op_id) {
const auto& op = request()->op();
Txn* txn = txn_.get();
Status s;
@@ -112,19 +115,19 @@ Status ParticipantOpState::PerformOp() {
// metadata. When we begin validating write ops before committing, we'll
// need to populate the response with errors.
case ParticipantOpPB::BEGIN_TXN: {
- txn->BeginTransaction();
+ txn->BeginTransaction(op_id);
break;
}
case ParticipantOpPB::BEGIN_COMMIT: {
- txn->BeginCommit();
+ txn->BeginCommit(op_id);
break;
}
case ParticipantOpPB::FINALIZE_COMMIT: {
- txn->FinalizeCommit(op.finalized_commit_timestamp());
+ txn->FinalizeCommit(op_id, op.finalized_commit_timestamp());
break;
}
case ParticipantOpPB::ABORT_TXN: {
- txn->AbortTransaction();
+ txn->AbortTransaction(op_id);
break;
}
case ParticipantOpPB::UNKNOWN: {
@@ -163,7 +166,7 @@ Status ParticipantOp::Start() {
Status ParticipantOp::Apply(CommitMsg** commit_msg) {
TRACE_EVENT0("op", "ParticipantOp::Apply");
TRACE("APPLY: Starting.");
- CHECK_OK(state_->PerformOp());
+ CHECK_OK(state_->PerformOp(state()->op_id()));
*commit_msg = google::protobuf::Arena::CreateMessage<CommitMsg>(state_->pb_arena());
(*commit_msg)->set_op_type(OperationType::PARTICIPANT_OP);
TRACE("APPLY: Finished.");
diff --git a/src/kudu/tablet/ops/participant_op.h b/src/kudu/tablet/ops/participant_op.h
index ea2ccc2..11e9e04 100644
--- a/src/kudu/tablet/ops/participant_op.h
+++ b/src/kudu/tablet/ops/participant_op.h
@@ -32,6 +32,10 @@
namespace kudu {
class rw_semaphore;
+namespace consensus {
+class OpId;
+} // namespace consensus
+
namespace tablet {
class TabletReplica;
@@ -61,7 +65,10 @@ class ParticipantOpState : public OpState {
// Performs the transaction state change requested by this op. Must be called
// while the transaction lock is held, i.e. between the calls to
// AcquireTxnAndLock() and ReleaseTxn().
- Status PerformOp();
+ //
+ // Anchors the given 'op_id' in the WAL, ensuring that subsequent bootstraps
+ // of the tablet's WAL will leave the transaction in the appropriate state.
+ Status PerformOp(const consensus::OpId& op_id);
// Releases the transaction and its lock.
void ReleaseTxn();
diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h
index e4ef3b9..2b642c2 100644
--- a/src/kudu/tablet/tablet.h
+++ b/src/kudu/tablet/tablet.h
@@ -698,8 +698,6 @@ class Tablet {
// released after the schema change has been applied.
mutable rw_semaphore schema_lock_;
- TxnParticipant txn_participant_;
-
const Schema key_schema_;
scoped_refptr<TabletMetadata> metadata_;
@@ -729,6 +727,10 @@ class Tablet {
scoped_refptr<log::LogAnchorRegistry> log_anchor_registry_;
TabletMemTrackers mem_trackers_;
+ // Maintains the set of in-flight transactions, and any WAL anchors
+ // associated with them.
+ TxnParticipant txn_participant_;
+
scoped_refptr<MetricEntity> metric_entity_;
std::unique_ptr<TabletMetrics> metrics_;
diff --git a/src/kudu/tablet/tablet_bootstrap.cc b/src/kudu/tablet/tablet_bootstrap.cc
index 976db5d..e691191 100644
--- a/src/kudu/tablet/tablet_bootstrap.cc
+++ b/src/kudu/tablet/tablet_bootstrap.cc
@@ -1537,7 +1537,7 @@ Status TabletBootstrap::PlayTxnParticipantOpRequest(const IOContext* /*io_contex
});
// NOTE: don't bother validating the current state of the op. Presumably that
// happened the first time this op was written.
- RETURN_NOT_OK(op_state.PerformOp());
+ RETURN_NOT_OK(op_state.PerformOp(replicate_msg->id()));
return AppendCommitMsg(commit_msg);
}
diff --git a/src/kudu/tablet/tablet_replica-test-base.cc b/src/kudu/tablet/tablet_replica-test-base.cc
index 3e0fb8a..d7a7560 100644
--- a/src/kudu/tablet/tablet_replica-test-base.cc
+++ b/src/kudu/tablet/tablet_replica-test-base.cc
@@ -20,11 +20,13 @@
#include <memory>
#include <ostream>
#include <string>
+#include <utility>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/common/common.pb.h"
+#include "kudu/common/wire_protocol.h"
#include "kudu/consensus/consensus_meta.h"
#include "kudu/consensus/consensus_meta_manager.h"
#include "kudu/consensus/log.h"
@@ -38,12 +40,17 @@
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/result_tracker.h"
+#include "kudu/tablet/ops/op.h"
+#include "kudu/tablet/ops/write_op.h"
#include "kudu/tablet/tablet-test-util.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_bootstrap.h"
#include "kudu/tablet/tablet_metadata.h"
#include "kudu/tablet/tablet_replica.h"
+#include "kudu/tserver/tserver.pb.h"
+#include "kudu/util/countdown_latch.h"
#include "kudu/util/monotime.h"
+#include "kudu/util/pb_util.h"
#include "kudu/util/test_macros.h"
using kudu::consensus::ConsensusBootstrapInfo;
@@ -53,11 +60,14 @@ using kudu::consensus::RaftConfigPB;
using kudu::consensus::RaftPeerPB;
using kudu::log::Log;
using kudu::log::LogOptions;
+using kudu::pb_util::SecureDebugString;
using kudu::rpc::MessengerBuilder;
using kudu::rpc::ResultTracker;
using kudu::tablet::KuduTabletTest;
-using kudu::tablet::TabletReplica;
+using kudu::tserver::WriteRequestPB;
+using kudu::tserver::WriteResponsePB;
using std::shared_ptr;
+using std::unique_ptr;
using std::string;
METRIC_DECLARE_entity(tablet);
@@ -69,6 +79,25 @@ namespace {
const MonoDelta kLeadershipTimeout = MonoDelta::FromSeconds(10);
} // anonymous namespace
+Status TabletReplicaTestBase::ExecuteWrite(TabletReplica* replica, const WriteRequestPB& req) {
+ WriteResponsePB resp;
+ unique_ptr<WriteOpState> op_state(new WriteOpState(replica,
+ &req,
+ nullptr, // No RequestIdPB
+ &resp));
+
+ CountDownLatch rpc_latch(1);
+ op_state->set_completion_callback(unique_ptr<OpCompletionCallback>(
+ new LatchOpCompletionCallback<WriteResponsePB>(&rpc_latch, &resp)));
+
+ RETURN_NOT_OK(replica->SubmitWrite(std::move(op_state)));
+ rpc_latch.Wait();
+ if (resp.has_error()) {
+ return StatusFromPB(resp.error().status());
+ }
+ return Status::OK();
+}
+
void TabletReplicaTestBase::SetUp() {
KuduTabletTest::SetUp();
@@ -154,9 +183,16 @@ Status TabletReplicaTestBase::StartReplicaAndWaitUntilLeader(const ConsensusBoot
return tablet_replica_->consensus()->WaitUntilLeaderForTests(kLeadershipTimeout);
}
-Status TabletReplicaTestBase::RestartReplica() {
+Status TabletReplicaTestBase::RestartReplica(bool reset_tablet) {
tablet_replica_->Shutdown();
tablet_replica_.reset();
+ // Reset the underlying harness's tablet if requested.
+ if (reset_tablet) {
+ CreateTestTablet();
+ // NOTE: the FsManager is owned by the harness, so refreshing the harness
+ // means we have to refresh anything that depends on its FsManager.
+ cmeta_manager_.reset(new ConsensusMetadataManager(fs_manager()));
+ }
RETURN_NOT_OK(SetUpReplica(/*new_replica=*/ false));
scoped_refptr<ConsensusMetadata> cmeta;
RETURN_NOT_OK(cmeta_manager_->Load(tablet_replica_->tablet_id(), &cmeta));
diff --git a/src/kudu/tablet/tablet_replica-test-base.h b/src/kudu/tablet/tablet_replica-test-base.h
index a302dd2..f47478c 100644
--- a/src/kudu/tablet/tablet_replica-test-base.h
+++ b/src/kudu/tablet/tablet_replica-test-base.h
@@ -39,6 +39,10 @@ namespace rpc {
class Messenger;
} // namespace rpc
+namespace tserver {
+class WriteRequestPB;
+} // namespace tserver
+
namespace tablet {
class TabletReplicaTestBase : public KuduTabletTest {
@@ -46,6 +50,11 @@ class TabletReplicaTestBase : public KuduTabletTest {
explicit TabletReplicaTestBase(const Schema& schema)
: KuduTabletTest(schema, TabletHarness::Options::ClockType::HYBRID_CLOCK),
dns_resolver_(new DnsResolver) {}
+
+ // Submits the given request to the given replica, waiting for the request to
+ // complete before returning.
+ static Status ExecuteWrite(TabletReplica* replica, const tserver::WriteRequestPB& req);
+
void SetUp() override;
void TearDown() override;
@@ -58,7 +67,12 @@ class TabletReplicaTestBase : public KuduTabletTest {
// Shuts down and restarts the tablet replica, bootstrapping it from its
// on-disk stores and WALs.
- Status RestartReplica();
+ //
+ // If 'reset_tablet' is set to true, resets the underlying tablet harness'
+ // Tablet instance. The 'false' default is for cases in which the tablet may
+ // have persisted a schema change; otherwise rebuilding the tablet may crash
+ // with a schema mismatch.
+ Status RestartReplica(bool reset_tablet = false);
const scoped_refptr<TabletReplica>& tablet_replica() const { return tablet_replica_; }
diff --git a/src/kudu/tablet/tablet_replica-test.cc b/src/kudu/tablet/tablet_replica-test.cc
index e6f31ef..32a2c62 100644
--- a/src/kudu/tablet/tablet_replica-test.cc
+++ b/src/kudu/tablet/tablet_replica-test.cc
@@ -140,24 +140,6 @@ class TabletReplicaTest : public TabletReplicaTestBase {
return Status::OK();
}
- Status ExecuteWrite(TabletReplica* replica, const WriteRequestPB& req) {
- unique_ptr<WriteResponsePB> resp(new WriteResponsePB());
- unique_ptr<WriteOpState> op_state(new WriteOpState(replica,
- &req,
- nullptr, // No RequestIdPB
- resp.get()));
-
- CountDownLatch rpc_latch(1);
- op_state->set_completion_callback(unique_ptr<OpCompletionCallback>(
- new LatchOpCompletionCallback<WriteResponsePB>(&rpc_latch, resp.get())));
-
- RETURN_NOT_OK(replica->SubmitWrite(std::move(op_state)));
- rpc_latch.Wait();
- CHECK(!resp->has_error())
- << "\nReq:\n" << SecureDebugString(req) << "Resp:\n" << SecureDebugString(*resp);
- return Status::OK();
- }
-
Status UpdateSchema(const SchemaPB& schema, int schema_version) {
AlterSchemaRequestPB alter;
alter.set_dest_uuid(tablet()->metadata()->fs_manager()->uuid());
diff --git a/src/kudu/tablet/txn_participant-test-util.h b/src/kudu/tablet/txn_participant-test-util.h
index b4bed41..fcce4ee 100644
--- a/src/kudu/tablet/txn_participant-test-util.h
+++ b/src/kudu/tablet/txn_participant-test-util.h
@@ -35,13 +35,14 @@ const std::vector<tserver::ParticipantOpPB::ParticipantOpType> kCommitSequence =
tserver::ParticipantOpPB::FINALIZE_COMMIT,
};
-Status CallParticipantOp(TabletReplica* replica,
- int64_t txn_id,
- tserver::ParticipantOpPB::ParticipantOpType type,
- int64_t finalized_commit_timestamp,
- tserver::ParticipantResponsePB* resp) {
- tserver::ParticipantRequestPB req;
- auto* op = req.mutable_op();
+std::unique_ptr<ParticipantOpState> NewParticipantOp(
+ TabletReplica* replica,
+ int64_t txn_id,
+ tserver::ParticipantOpPB::ParticipantOpType type,
+ int64_t finalized_commit_timestamp,
+ tserver::ParticipantRequestPB* req,
+ tserver::ParticipantResponsePB* resp) {
+ auto* op = req->mutable_op();
op->set_txn_id(txn_id);
op->set_type(type);
if (type == tserver::ParticipantOpPB::FINALIZE_COMMIT) {
@@ -50,8 +51,19 @@ Status CallParticipantOp(TabletReplica* replica,
std::unique_ptr<ParticipantOpState> op_state(new ParticipantOpState(
replica,
replica->tablet()->txn_participant(),
- &req,
+ req,
resp));
+ return op_state;
+}
+
+Status CallParticipantOp(TabletReplica* replica,
+ int64_t txn_id,
+ tserver::ParticipantOpPB::ParticipantOpType type,
+ int64_t finalized_commit_timestamp,
+ tserver::ParticipantResponsePB* resp) {
+ tserver::ParticipantRequestPB req;
+ std::unique_ptr<ParticipantOpState> op_state =
+ NewParticipantOp(replica, txn_id, type, finalized_commit_timestamp, &req, resp);
CountDownLatch latch(1);
op_state->set_completion_callback(std::unique_ptr<OpCompletionCallback>(
new LatchOpCompletionCallback<tserver::ParticipantResponsePB>(&latch, resp)));
diff --git a/src/kudu/tablet/txn_participant-test.cc b/src/kudu/tablet/txn_participant-test.cc
index 5dbc925..44ded0e 100644
--- a/src/kudu/tablet/txn_participant-test.cc
+++ b/src/kudu/tablet/txn_participant-test.cc
@@ -21,55 +21,133 @@
#include <cstdint>
#include <map>
#include <memory>
+#include <ostream>
#include <string>
#include <thread>
#include <utility>
#include <vector>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/common/common.pb.h"
+#include "kudu/common/partial_row.h"
+#include "kudu/common/row_operations.h"
#include "kudu/common/schema.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/common/wire_protocol.pb.h"
+#include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/log.h"
+#include "kudu/consensus/log_anchor_registry.h"
+#include "kudu/consensus/opid.pb.h"
#include "kudu/consensus/raft_consensus.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/ref_counted.h"
+#include "kudu/tablet/ops/op.h"
+#include "kudu/tablet/ops/op_driver.h"
+#include "kudu/tablet/ops/op_tracker.h"
+#include "kudu/tablet/ops/participant_op.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_replica-test-base.h"
#include "kudu/tablet/tablet_replica.h"
#include "kudu/tablet/txn_participant-test-util.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/tserver/tserver_admin.pb.h"
+#include "kudu/util/countdown_latch.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
+using kudu::consensus::CommitMsg;
using kudu::consensus::ConsensusBootstrapInfo;
using kudu::pb_util::SecureShortDebugString;
using kudu::tserver::ParticipantRequestPB;
using kudu::tserver::ParticipantResponsePB;
using kudu::tserver::ParticipantOpPB;
+using kudu::tserver::WriteRequestPB;
using std::map;
using std::thread;
using std::unique_ptr;
using std::vector;
+DECLARE_bool(enable_maintenance_manager);
+DECLARE_bool(log_preallocate_segments);
+DECLARE_bool(log_async_preallocate_segments);
+
namespace kudu {
namespace tablet {
+namespace {
+Schema GetTestSchema() {
+ return Schema({ ColumnSchema("key", INT32) }, 1);
+}
+
+// A participant op that waits to start and finish applying based on input
+// latches.
+class DelayedParticipantOp : public ParticipantOp {
+ public:
+ DelayedParticipantOp(CountDownLatch* apply_started,
+ CountDownLatch* apply_continue,
+ unique_ptr<ParticipantOpState> state)
+ : ParticipantOp(std::move(state), consensus::LEADER),
+ apply_started_(apply_started),
+ apply_continue_(apply_continue) {}
+
+ Status Apply(CommitMsg** commit_msg) override {
+ apply_started_->CountDown();
+ LOG(INFO) << "Delaying apply...";
+ apply_continue_->Wait();
+ return ParticipantOp::Apply(commit_msg);
+ }
+
+ private:
+ CountDownLatch* apply_started_;
+ CountDownLatch* apply_continue_;
+};
+} // anonymous namespace
+
class TxnParticipantTest : public TabletReplicaTestBase {
public:
TxnParticipantTest()
- : TabletReplicaTestBase(Schema({ ColumnSchema("key", INT32) }, 1)) {}
+ : TabletReplicaTestBase(GetTestSchema()) {}
void SetUp() override {
+ // Some of these tests will test the durability semantics of participants.
+ // So we have finer-grained control of on-disk state, disable anything that
+ // might write to disk in the background.
+ FLAGS_enable_maintenance_manager = false;
+ FLAGS_log_preallocate_segments = false;
+ FLAGS_log_async_preallocate_segments = false;
+
NO_FATALS(TabletReplicaTestBase::SetUp());
ConsensusBootstrapInfo info;
ASSERT_OK(StartReplicaAndWaitUntilLeader(info));
}
- const TxnParticipant* txn_participant() const {
+ Status Write(int key) {
+ WriteRequestPB req;
+ req.set_tablet_id(tablet_replica_->tablet_id());
+ const auto& schema = GetTestSchema();
+ RETURN_NOT_OK(SchemaToPB(schema, req.mutable_schema()));
+ KuduPartialRow row(&schema);
+ RETURN_NOT_OK(row.SetInt32(0, key));
+ RowOperationsPBEncoder enc(req.mutable_row_operations());
+ enc.Add(RowOperationsPB::INSERT, row);
+ return ExecuteWrite(tablet_replica_.get(), req);
+ }
+
+ // Writes an op to the WAL, rolls over onto a new WAL segment, and flushes
+ // the MRS, leaving us with a new WAL segment that should be GC-able unless
+ // previous WAL segments are anchored.
+ Status WriteRolloverAndFlush(int* current_key) {
+ RETURN_NOT_OK(Write(*current_key++));
+ RETURN_NOT_OK(tablet_replica_->log()->WaitUntilAllFlushed());
+ RETURN_NOT_OK(tablet_replica_->log()->AllocateSegmentAndRollOverForTests());
+ return tablet_replica_->tablet()->Flush();
+ }
+
+ TxnParticipant* txn_participant() {
return tablet_replica_->tablet()->txn_participant();
}
};
@@ -300,6 +378,174 @@ TEST_F(TxnParticipantTest, TestReplayParticipantOps) {
ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
{ kTxnId, Txn::kCommitted, kDummyCommitTimestamp }
}), txn_participant()->GetTxnsForTests());
+ ASSERT_OK(RestartReplica());
+ ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
+ { kTxnId, Txn::kCommitted, kDummyCommitTimestamp }
+ }), txn_participant()->GetTxnsForTests());
+}
+
+// Test that each transaction has a single anchor that gets updated as
+// participant ops land.
+TEST_F(TxnParticipantTest, TestAllOpsRegisterAnchors) {
+ int64_t expected_index = 1;
+ // Validates that each op in the given sequence updates the single anchor
+ // maintained for the transaction.
+ const auto check_participant_ops_are_anchored =
+ [&] (int64_t txn_id, const vector<ParticipantOpPB::ParticipantOpType>& ops) {
+ for (const auto& op : ops) {
+ ParticipantResponsePB resp;
+ ASSERT_OK(CallParticipantOp(tablet_replica_.get(), txn_id, op,
+ kDummyCommitTimestamp, &resp));
+ ASSERT_EQ(1, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests());
+ int64_t log_index = -1;
+ tablet_replica_->log_anchor_registry()->GetEarliestRegisteredLogIndex(&log_index);
+ ASSERT_EQ(++expected_index, log_index);
+ }
+ ASSERT_TRUE(txn_participant()->ClearIfCompleteForTests(txn_id));
+ ASSERT_EQ(0, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests());
+ };
+ NO_FATALS(check_participant_ops_are_anchored(1, {
+ ParticipantOpPB::BEGIN_TXN,
+ ParticipantOpPB::BEGIN_COMMIT,
+ ParticipantOpPB::FINALIZE_COMMIT
+ }));
+ NO_FATALS(check_participant_ops_are_anchored(2, {
+ ParticipantOpPB::BEGIN_TXN,
+ ParticipantOpPB::BEGIN_COMMIT,
+ ParticipantOpPB::ABORT_TXN
+ }));
+}
+
+// Test that participant ops are anchored, the anchors are updated as a
+// transaction's state gets updated.
+TEST_F(TxnParticipantTest, TestParticipantOpsAnchorWALs) {
+ const int64_t kTxnId = 1;
+ // First, perform some initial participant ops and roll the WAL segments so
+ // there are some candidates for WAL GC.
+ ParticipantResponsePB resp;
+ ASSERT_OK(CallParticipantOp(tablet_replica_.get(), kTxnId, ParticipantOpPB::BEGIN_TXN,
+ kDummyCommitTimestamp, &resp));
+ ASSERT_FALSE(resp.has_error());
+ ASSERT_OK(tablet_replica_->log()->WaitUntilAllFlushed());
+ ASSERT_OK(tablet_replica_->log()->AllocateSegmentAndRollOverForTests());
+
+ // Write and flush some ops that would otherwise lead to GC-able WAL
+ // segments. Since there is an anchored participant op in the WAL before
+ // these writes, the tablet should not be GC-able.
+ int current_key = 0;
+ ASSERT_OK(WriteRolloverAndFlush(¤t_key));
+ ASSERT_OK(WriteRolloverAndFlush(¤t_key));
+ int64_t gcable_size;
+ ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size));
+ ASSERT_EQ(0, gcable_size);
+
+ // WAL GC should proceed to clear out ops for both the transaction and the
+ // inserts.
+ ASSERT_OK(CallParticipantOp(tablet_replica_.get(), kTxnId, ParticipantOpPB::BEGIN_COMMIT,
+ kDummyCommitTimestamp, &resp));
+ ASSERT_FALSE(resp.has_error());
+ ASSERT_OK(CallParticipantOp(tablet_replica_.get(), kTxnId, ParticipantOpPB::FINALIZE_COMMIT,
+ kDummyCommitTimestamp, &resp));
+ ASSERT_FALSE(resp.has_error());
+ ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size));
+ ASSERT_GT(gcable_size, 0);
+
+ ASSERT_OK(tablet_replica_->RunLogGC());
+ ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size));
+ ASSERT_EQ(0, gcable_size);
+
+ // Ensure the transaction bootstraps to the expected state.
+ // NOTE: we need to reset the tablet here to reset the TxnParticipant.
+ // Otherwise, we might start the replica with a LogAnchor already registered.
+ ASSERT_OK(RestartReplica(/*reset_tablet*/true));
+ ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
+ { kTxnId, Txn::kCommitted, kDummyCommitTimestamp }
+ }), txn_participant()->GetTxnsForTests());
+
+ // Roll onto new WAL segments and add more segments so we can get to a state
+ // without any transaction ops in the WALs.
+ ASSERT_OK(WriteRolloverAndFlush(¤t_key));
+ ASSERT_OK(WriteRolloverAndFlush(¤t_key));
+
+ // While the transaction still exists, we shouldn't GC anything.
+ ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size));
+ ASSERT_EQ(0, gcable_size);
+
+ // Once we cull the transaction state in memory, we should be left with no
+ // trace of the transaction.
+ ASSERT_TRUE(txn_participant()->ClearIfCompleteForTests(kTxnId));
+ ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size));
+ ASSERT_GT(gcable_size, 0);
+
+ ASSERT_OK(tablet_replica_->RunLogGC());
+ ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size));
+ ASSERT_EQ(0, gcable_size);
+
+ // Do a final check that we bootstrap to the expected state (i.e. the
+ // transaction is culled).
+ ASSERT_OK(RestartReplica(/*reset_tablet*/true));
+ ASSERT_TRUE(txn_participant()->GetTxnsForTests().empty());
+}
+
+// Similar to the above test, but checking that in-flight ops anchor the WALs.
+TEST_F(TxnParticipantTest, TestActiveParticipantOpsAnchorWALs) {
+ const int64_t kTxnId = 1;
+ ParticipantRequestPB req;
+ ParticipantResponsePB resp;
+ auto op_state = NewParticipantOp(tablet_replica_.get(), kTxnId, ParticipantOpPB::BEGIN_TXN,
+ kDummyCommitTimestamp, &req, &resp);
+ CountDownLatch latch(1);
+ CountDownLatch apply_start(1);
+ CountDownLatch apply_continue(1);
+ op_state->set_completion_callback(std::unique_ptr<OpCompletionCallback>(
+ new LatchOpCompletionCallback<tserver::ParticipantResponsePB>(&latch, &resp)));
+ scoped_refptr<OpDriver> driver;
+ unique_ptr<DelayedParticipantOp> op(
+ new DelayedParticipantOp(&apply_start, &apply_continue, std::move(op_state)));
+ ASSERT_OK(tablet_replica_->NewLeaderOpDriver(std::move(op), &driver));
+ ASSERT_OK(driver->ExecuteAsync());
+ // Wait for the apply to start, indicating that we have persisted and
+ // replicated but not yet Raft committed the participant op.
+ apply_start.Wait();
+ ASSERT_TRUE(driver->GetOpId().IsInitialized());
+ ASSERT_EQ(1, tablet_replica_->op_tracker()->GetNumPendingForTests());
+
+ // Create some WAL segments to ensure some would-be-GC-able segments.
+ int current_key = 0;
+ ASSERT_OK(WriteRolloverAndFlush(¤t_key));
+ ASSERT_OK(WriteRolloverAndFlush(¤t_key));
+
+ // Our participant op is still pending, and nothing should be GC-able.
+ ASSERT_EQ(1, tablet_replica_->op_tracker()->GetNumPendingForTests());
+ int64_t gcable_size;
+ ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size));
+ ASSERT_EQ(0, gcable_size);
+
+ // Finish applying the participant op and proceed to completion.
+ apply_continue.CountDown();
+ latch.Wait();
+
+ // Even though we've completed the op, the replicate message should still be
+ // anchored while the in-memory transaction state exists on this participant.
+ ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size));
+ ASSERT_EQ(0, gcable_size);
+ ASSERT_OK(WriteRolloverAndFlush(¤t_key));
+ ASSERT_OK(WriteRolloverAndFlush(¤t_key));
+ ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size));
+ ASSERT_EQ(0, gcable_size);
+
+ // The moment we update the in-memory state, we should be able to GC.
+ ASSERT_OK(CallParticipantOp(tablet_replica_.get(), kTxnId, ParticipantOpPB::BEGIN_COMMIT,
+ kDummyCommitTimestamp, &resp));
+ ASSERT_FALSE(resp.has_error());
+ ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size));
+ ASSERT_GT(gcable_size, 0);
+
+ // As a sanity check, ensure we get to the expected state if we reboot.
+ ASSERT_OK(RestartReplica(/*reset_tablet*/true));
+ ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
+ { kTxnId, Txn::kCommitInProgress, -1 }
+ }), txn_participant()->GetTxnsForTests());
}
} // namespace tablet
diff --git a/src/kudu/tablet/txn_participant.cc b/src/kudu/tablet/txn_participant.cc
index a3ef237..8ade9ec 100644
--- a/src/kudu/tablet/txn_participant.cc
+++ b/src/kudu/tablet/txn_participant.cc
@@ -27,6 +27,7 @@
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
+using kudu::log::LogAnchorRegistry;
using std::vector;
using strings::Substitute;
@@ -38,10 +39,11 @@ void Txn::AcquireWriteLock(std::unique_lock<rw_semaphore>* txn_lock) {
*txn_lock = std::move(l);
}
-scoped_refptr<Txn> TxnParticipant::GetOrCreateTransaction(int64_t txn_id) {
+scoped_refptr<Txn> TxnParticipant::GetOrCreateTransaction(int64_t txn_id,
+ LogAnchorRegistry* log_anchor_registry) {
// TODO(awong): add a 'user' field to these transactions.
std::lock_guard<simple_spinlock> l(lock_);
- return LookupOrInsertNewSharedPtr(&txns_, txn_id);
+ return LookupOrInsertNewSharedPtr(&txns_, txn_id, txn_id, log_anchor_registry);
}
void TxnParticipant::ClearIfInitFailed(int64_t txn_id) {
@@ -54,6 +56,20 @@ void TxnParticipant::ClearIfInitFailed(int64_t txn_id) {
}
}
+bool TxnParticipant::ClearIfCompleteForTests(int64_t txn_id) {
+ std::lock_guard<simple_spinlock> l(lock_);
+ Txn* txn = FindPointeeOrNull(txns_, txn_id);
+ // NOTE: If this is the only reference to the transaction, we can forego
+ // locking the state.
+ if (txn && txn->HasOneRef() &&
+ (txn->state() == Txn::kAborted ||
+ txn->state() == Txn::kCommitted)) {
+ txns_.erase(txn_id);
+ return true;
+ }
+ return false;
+}
+
vector<TxnParticipant::TxnEntry> TxnParticipant::GetTxnsForTests() const {
vector<TxnEntry> txns;
{
diff --git a/src/kudu/tablet/txn_participant.h b/src/kudu/tablet/txn_participant.h
index c2c91c9..098d32b 100644
--- a/src/kudu/tablet/txn_participant.h
+++ b/src/kudu/tablet/txn_participant.h
@@ -24,6 +24,8 @@
#include <glog/logging.h>
+#include "kudu/consensus/log_anchor_registry.h"
+#include "kudu/consensus/opid.pb.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
@@ -79,7 +81,22 @@ class Txn : public RefCountedThreadSafe<Txn> {
__builtin_unreachable();
}
- Txn() : state_(kInitializing), commit_timestamp_(-1) {}
+ // Constructs a transaction instance with the given transaction ID and WAL
+ // anchor registry.
+ //
+ // The WAL anchor registry is used to ensure that the WAL segment that
+ // contains the participant op that replays to the transaction's current
+ // in-memory state is not GCed, allowing us to rebuild this transaction's
+ // in-memory state upon rebooting a server.
+ Txn(int64_t txn_id, log::LogAnchorRegistry* log_anchor_registry)
+ : txn_id_(txn_id),
+ log_anchor_registry_(log_anchor_registry),
+ state_(kInitializing),
+ commit_timestamp_(-1) {}
+
+ ~Txn() {
+ CHECK_OK(log_anchor_registry_->UnregisterIfAnchored(&log_anchor_));
+ }
// Takes the state lock in write mode and returns it. As transaction state is
// meant to be driven via an op driver, lock acquisition is expected to be
@@ -132,17 +149,25 @@ class Txn : public RefCountedThreadSafe<Txn> {
// Applies the given state transitions. Should be called while holding the
// state lock in write mode after successfully replicating a participant op.
- void BeginTransaction() {
+ void BeginTransaction(const consensus::OpId& op_id) {
+ CHECK_OK(log_anchor_registry_->RegisterOrUpdate(
+ op_id.index(), strings::Substitute("Txn-$0-$1", txn_id_, this), &log_anchor_));
SetState(kOpen);
}
- void BeginCommit() {
+ void BeginCommit(const consensus::OpId& op_id) {
+ CHECK_OK(log_anchor_registry_->RegisterOrUpdate(
+ op_id.index(), strings::Substitute("Txn-$0-$1", txn_id_, this), &log_anchor_));
SetState(kCommitInProgress);
}
- void FinalizeCommit(int64_t finalized_commit_timestamp) {
+ void FinalizeCommit(const consensus::OpId& op_id, int64_t finalized_commit_timestamp) {
+ CHECK_OK(log_anchor_registry_->RegisterOrUpdate(
+ op_id.index(), strings::Substitute("Txn-$0-$1", txn_id_, this), &log_anchor_));
SetState(kCommitted);
commit_timestamp_ = finalized_commit_timestamp;
}
- void AbortTransaction() {
+ void AbortTransaction(const consensus::OpId& op_id) {
+ CHECK_OK(log_anchor_registry_->RegisterOrUpdate(
+ op_id.index(), strings::Substitute("Txn-$0-$1", txn_id_, this), &log_anchor_));
SetState(kAborted);
}
@@ -171,6 +196,14 @@ class Txn : public RefCountedThreadSafe<Txn> {
return Status::OK();
}
+ // Transaction ID for this transaction.
+ const int64_t txn_id_;
+
+ // Log anchor registry with which to anchor WAL segments, and an anchor to
+ // update upon applying a state change.
+ log::LogAnchorRegistry* log_anchor_registry_;
+ log::LogAnchor log_anchor_;
+
// Lock protecting access to 'state_' and 'commit_timestamp'. Ops that intend
// on mutating 'state_' must take this lock in write mode. Ops that intend on
// reading 'state_' and relying on it remaining constant must take this lock
@@ -198,7 +231,8 @@ class TxnParticipant {
// Gets the transaction state for the given transaction ID, creating it in
// the kInitializing state if one doesn't already exist.
- scoped_refptr<Txn> GetOrCreateTransaction(int64_t txn_id);
+ scoped_refptr<Txn> GetOrCreateTransaction(int64_t txn_id,
+ log::LogAnchorRegistry* log_anchor_registry);
// Removes the given transaction if it failed to initialize, e.g. the op that
// created it failed to replicate, leaving it in the kInitializing state but
@@ -209,6 +243,15 @@ class TxnParticipant {
// the Txn, we can thread-safely determine whether it has been abandoned.
void ClearIfInitFailed(int64_t txn_id);
+ // Removes the given transaction if it is in a terminal state, i.e. it is
+ // either kAborted or kCommitted, freeing any WAL anchors it may have held.
+ // Assumes there are no active op drivers updating state (i.e. that the
+ // transaction reference in our map is the only one).
+ //
+ // Returns whether or not this call actually cleared the transaction (i.e.
+ // returns 'false' if the transaction was not found)..
+ bool ClearIfCompleteForTests(int64_t txn_id);
+
// Returns the transactions, sorted by transaction ID.
std::vector<TxnEntry> GetTxnsForTests() const;
diff --git a/src/kudu/tserver/tablet_copy_source_session.cc b/src/kudu/tserver/tablet_copy_source_session.cc
index b919ce3..f48f036 100644
--- a/src/kudu/tserver/tablet_copy_source_session.cc
+++ b/src/kudu/tserver/tablet_copy_source_session.cc
@@ -198,7 +198,7 @@ Status TabletCopySourceSession::InitOnce() {
// tablet copy loop due to a follower falling too far behind the
// leader's log when tablet copy is slow. The remote controls when
// this anchor is released by ending the tablet copy session.
- RETURN_NOT_OK(tablet_replica_->log_anchor_registry()->UpdateRegistration(
+ RETURN_NOT_OK(tablet_replica_->log_anchor_registry()->RegisterOrUpdate(
last_logged_opid->index(), anchor_owner_token, &log_anchor_));
LOG(INFO) << Substitute(