You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2020/09/01 22:24:59 UTC

[kudu] branch master updated: KUDU-2612 p9: anchor participant ops in WAL

This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 68ee03c  KUDU-2612 p9: anchor participant ops in WAL
68ee03c is described below

commit 68ee03c2bd0bae950c08582910e675cda54b6ca7
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Fri Aug 7 18:16:46 2020 -0700

    KUDU-2612 p9: anchor participant ops in WAL
    
    Participant ops will now anchor WAL segments in the same way that we
    anchor WAL segments for other in-memory state (e.g. MRS, DMS) that gets
    rebuilt upon recovery.
    
    Each participant op for a given transaction will update the single
    anchor associated with that transaction. As a transaction is
    transitioned from state to state, the transaction's prior participant op
    is unanchored in favor of anchoring the new op.
    
    Since there currently isn't a need to rid a participant of in-memory
    state associated with a transaction, this adds a test-only method to
    remove a committed or aborted transaction -- once this method called, a
    transaction's anchor is removed and WAL GC may proceed to remove the
    latest participant op.
    
    Change-Id: I936f0a345c4b6095f0d99b6dd244e3092ae3f9d7
    Reviewed-on: http://gerrit.cloudera.org:8080/16358
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Tested-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/consensus/log_anchor_registry-test.cc |  17 +-
 src/kudu/consensus/log_anchor_registry.cc      |  19 +-
 src/kudu/consensus/log_anchor_registry.h       |  12 +-
 src/kudu/gutil/map-util.h                      |  14 +-
 src/kudu/tablet/ops/participant_op.cc          |  17 +-
 src/kudu/tablet/ops/participant_op.h           |   9 +-
 src/kudu/tablet/tablet.h                       |   6 +-
 src/kudu/tablet/tablet_bootstrap.cc            |   2 +-
 src/kudu/tablet/tablet_replica-test-base.cc    |  40 +++-
 src/kudu/tablet/tablet_replica-test-base.h     |  16 +-
 src/kudu/tablet/tablet_replica-test.cc         |  18 --
 src/kudu/tablet/txn_participant-test-util.h    |  28 ++-
 src/kudu/tablet/txn_participant-test.cc        | 250 ++++++++++++++++++++++++-
 src/kudu/tablet/txn_participant.cc             |  20 +-
 src/kudu/tablet/txn_participant.h              |  55 +++++-
 src/kudu/tserver/tablet_copy_source_session.cc |   2 +-
 16 files changed, 449 insertions(+), 76 deletions(-)

diff --git a/src/kudu/consensus/log_anchor_registry-test.cc b/src/kudu/consensus/log_anchor_registry-test.cc
index a824152..04daf0c 100644
--- a/src/kudu/consensus/log_anchor_registry-test.cc
+++ b/src/kudu/consensus/log_anchor_registry-test.cc
@@ -25,6 +25,7 @@
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
@@ -48,10 +49,24 @@ TEST_F(LogAnchorRegistryTest, TestUpdateRegistration) {
   ASSERT_FALSE(anchor.is_registered);
   ASSERT_FALSE(anchor.when_registered.Initialized());
   reg->Register(kInitialIndex, test_name, &anchor);
+  SCOPED_CLEANUP({
+    ASSERT_OK(reg->Unregister(&anchor));
+    ASSERT_FALSE(anchor.is_registered);
+  });
   ASSERT_TRUE(anchor.is_registered);
   ASSERT_TRUE(anchor.when_registered.Initialized());
-  ASSERT_OK(reg->UpdateRegistration(kInitialIndex + 1, test_name, &anchor));
+  ASSERT_OK(reg->RegisterOrUpdate(kInitialIndex + 1, test_name, &anchor));
+  ASSERT_EQ(kInitialIndex + 1, anchor.log_index);
   ASSERT_OK(reg->Unregister(&anchor));
+  ASSERT_FALSE(anchor.is_registered);
+
+  ASSERT_OK(reg->RegisterOrUpdate(kInitialIndex + 2, test_name, &anchor));
+  ASSERT_TRUE(anchor.is_registered);
+  ASSERT_EQ(kInitialIndex + 2, anchor.log_index);
+
+  ASSERT_OK(reg->RegisterOrUpdate(kInitialIndex + 3, test_name, &anchor));
+  ASSERT_TRUE(anchor.is_registered);
+  ASSERT_EQ(kInitialIndex + 3, anchor.log_index);
 }
 
 TEST_F(LogAnchorRegistryTest, TestDuplicateInserts) {
diff --git a/src/kudu/consensus/log_anchor_registry.cc b/src/kudu/consensus/log_anchor_registry.cc
index 648e2ab..a2e5c5e 100644
--- a/src/kudu/consensus/log_anchor_registry.cc
+++ b/src/kudu/consensus/log_anchor_registry.cc
@@ -50,12 +50,13 @@ void LogAnchorRegistry::Register(int64_t log_index,
   RegisterUnlocked(log_index, owner, anchor);
 }
 
-Status LogAnchorRegistry::UpdateRegistration(int64_t log_index,
-                                             const std::string& owner,
-                                             LogAnchor* anchor) {
+Status LogAnchorRegistry::RegisterOrUpdate(int64_t log_index,
+                                           const std::string& owner,
+                                           LogAnchor* anchor) {
   std::lock_guard<simple_spinlock> l(lock_);
-  RETURN_NOT_OK_PREPEND(UnregisterUnlocked(anchor),
-                        "Unable to swap registration, anchor not registered");
+  if (anchor->is_registered) {
+    RETURN_NOT_OK(UnregisterUnlocked(anchor));
+  }
   RegisterUnlocked(log_index, owner, anchor);
   return Status::OK();
 }
@@ -157,12 +158,10 @@ MinLogIndexAnchorer::~MinLogIndexAnchorer() {
 
 void MinLogIndexAnchorer::AnchorIfMinimum(int64_t log_index) {
   std::lock_guard<simple_spinlock> l(lock_);
-  if (PREDICT_FALSE(minimum_log_index_ == kInvalidOpIdIndex)) {
-    minimum_log_index_ = log_index;
-    registry_->Register(minimum_log_index_, owner_, &anchor_);
-  } else if (log_index < minimum_log_index_) {
+  if (log_index < minimum_log_index_ ||
+      PREDICT_FALSE(minimum_log_index_ == kInvalidOpIdIndex)) {
     minimum_log_index_ = log_index;
-    CHECK_OK(registry_->UpdateRegistration(minimum_log_index_, owner_, &anchor_));
+    registry_->RegisterOrUpdate(minimum_log_index_, owner_, &anchor_);
   }
 }
 
diff --git a/src/kudu/consensus/log_anchor_registry.h b/src/kudu/consensus/log_anchor_registry.h
index 56857d7..6ccb40f 100644
--- a/src/kudu/consensus/log_anchor_registry.h
+++ b/src/kudu/consensus/log_anchor_registry.h
@@ -51,13 +51,11 @@ class LogAnchorRegistry : public RefCountedThreadSafe<LogAnchorRegistry> {
   // anchor: Pointer to LogAnchor structure that will be populated on registration.
   void Register(int64_t log_index, const std::string& owner, LogAnchor* anchor);
 
-  // Atomically update the registration of an anchor to a new log index.
-  // Before: anchor must be registered with some log index.
-  // After: anchor is now registered using index 'log_index'.
-  // See Register().
-  Status UpdateRegistration(int64_t log_index,
-                            const std::string& owner,
-                            LogAnchor* anchor);
+  // Register interest for a particular log index, or update an anchor if it
+  // already exists.
+  Status RegisterOrUpdate(int64_t log_index,
+                          const std::string& owner,
+                          LogAnchor* anchor);
 
   // Release the anchor on a log index.
   // Note: anchor must be the original pointer passed to Register().
diff --git a/src/kudu/gutil/map-util.h b/src/kudu/gutil/map-util.h
index ae6d2d1..985b91c 100644
--- a/src/kudu/gutil/map-util.h
+++ b/src/kudu/gutil/map-util.h
@@ -616,22 +616,22 @@ LookupOrInsertNewSharedPtr(
 }
 
 // A variant of LookupOrInsertNewSharedPtr where the value is constructed using
-// a single-parameter constructor.  Note: the constructor argument is computed
-// even if it will not be used, so only values cheap to compute should be passed
-// here.  On the other hand it does not matter how expensive the construction of
-// the actual stored value is, as that only occurs if necessary.
-template <class Collection, class Arg>
+// constructor arguments.  Note: the constructor arguments are computed even if
+// they will not be used, so only values cheap to compute should be passed
+// here.  On the other hand it does not matter how expensive the construction
+// of the actual stored value is, as that only occurs if necessary.
+template <class Collection, class... Args>
 typename Collection::mapped_type&
 LookupOrInsertNewSharedPtr(
     Collection* const collection,
     const typename Collection::key_type& key,
-    const Arg& arg) {
+    const Args&... args) {
   typedef typename Collection::mapped_type SharedPtr;
   typedef typename Collection::mapped_type::element_type Element;
   std::pair<typename Collection::iterator, bool> ret =
       collection->insert(typename Collection::value_type(key, SharedPtr()));
   if (ret.second) {
-    ret.first->second.reset(new Element(arg));
+    ret.first->second.reset(new Element(args...));
   }
   return ret.first->second;
 }
diff --git a/src/kudu/tablet/ops/participant_op.cc b/src/kudu/tablet/ops/participant_op.cc
index cf9250d..25b70c6 100644
--- a/src/kudu/tablet/ops/participant_op.cc
+++ b/src/kudu/tablet/ops/participant_op.cc
@@ -31,6 +31,7 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/tablet/ops/op.h"
+#include "kudu/tablet/tablet_replica.h"
 #include "kudu/tablet/txn_participant.h"
 #include "kudu/util/debug/trace_event.h"
 #include "kudu/util/pb_util.h"
@@ -40,6 +41,7 @@
 using kudu::consensus::CommitMsg;
 using kudu::consensus::ReplicateMsg;
 using kudu::consensus::OperationType;
+using kudu::consensus::OpId;
 using kudu::pb_util::SecureShortDebugString;
 using kudu::tablet::TabletReplica;
 using kudu::tserver::ParticipantOpPB;
@@ -65,7 +67,8 @@ void ParticipantOpState::AcquireTxnAndLock() {
   DCHECK(!txn_lock_);
   DCHECK(!txn_);
   int64_t txn_id = request_->op().txn_id();
-  txn_ = txn_participant_->GetOrCreateTransaction(txn_id);
+  txn_ = txn_participant_->GetOrCreateTransaction(txn_id,
+                                                  tablet_replica_->log_anchor_registry().get());
   txn_->AcquireWriteLock(&txn_lock_);
 }
 
@@ -103,7 +106,7 @@ Status ParticipantOpState::ValidateOp() const {
   return Status::OK();
 }
 
-Status ParticipantOpState::PerformOp() {
+Status ParticipantOpState::PerformOp(const OpId& op_id) {
   const auto& op = request()->op();
   Txn* txn = txn_.get();
   Status s;
@@ -112,19 +115,19 @@ Status ParticipantOpState::PerformOp() {
     // metadata. When we begin validating write ops before committing, we'll
     // need to populate the response with errors.
     case ParticipantOpPB::BEGIN_TXN: {
-      txn->BeginTransaction();
+      txn->BeginTransaction(op_id);
       break;
     }
     case ParticipantOpPB::BEGIN_COMMIT: {
-      txn->BeginCommit();
+      txn->BeginCommit(op_id);
       break;
     }
     case ParticipantOpPB::FINALIZE_COMMIT: {
-      txn->FinalizeCommit(op.finalized_commit_timestamp());
+      txn->FinalizeCommit(op_id, op.finalized_commit_timestamp());
       break;
     }
     case ParticipantOpPB::ABORT_TXN: {
-      txn->AbortTransaction();
+      txn->AbortTransaction(op_id);
       break;
     }
     case ParticipantOpPB::UNKNOWN: {
@@ -163,7 +166,7 @@ Status ParticipantOp::Start() {
 Status ParticipantOp::Apply(CommitMsg** commit_msg) {
   TRACE_EVENT0("op", "ParticipantOp::Apply");
   TRACE("APPLY: Starting.");
-  CHECK_OK(state_->PerformOp());
+  CHECK_OK(state_->PerformOp(state()->op_id()));
   *commit_msg = google::protobuf::Arena::CreateMessage<CommitMsg>(state_->pb_arena());
   (*commit_msg)->set_op_type(OperationType::PARTICIPANT_OP);
   TRACE("APPLY: Finished.");
diff --git a/src/kudu/tablet/ops/participant_op.h b/src/kudu/tablet/ops/participant_op.h
index ea2ccc2..11e9e04 100644
--- a/src/kudu/tablet/ops/participant_op.h
+++ b/src/kudu/tablet/ops/participant_op.h
@@ -32,6 +32,10 @@
 namespace kudu {
 class rw_semaphore;
 
+namespace consensus {
+class OpId;
+} // namespace consensus
+
 namespace tablet {
 class TabletReplica;
 
@@ -61,7 +65,10 @@ class ParticipantOpState : public OpState {
   // Performs the transaction state change requested by this op. Must be called
   // while the transaction lock is held, i.e. between the calls to
   // AcquireTxnAndLock() and ReleaseTxn().
-  Status PerformOp();
+  //
+  // Anchors the given 'op_id' in the WAL, ensuring that subsequent bootstraps
+  // of the tablet's WAL will leave the transaction in the appropriate state.
+  Status PerformOp(const consensus::OpId& op_id);
 
   // Releases the transaction and its lock.
   void ReleaseTxn();
diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h
index e4ef3b9..2b642c2 100644
--- a/src/kudu/tablet/tablet.h
+++ b/src/kudu/tablet/tablet.h
@@ -698,8 +698,6 @@ class Tablet {
   // released after the schema change has been applied.
   mutable rw_semaphore schema_lock_;
 
-  TxnParticipant txn_participant_;
-
   const Schema key_schema_;
 
   scoped_refptr<TabletMetadata> metadata_;
@@ -729,6 +727,10 @@ class Tablet {
   scoped_refptr<log::LogAnchorRegistry> log_anchor_registry_;
   TabletMemTrackers mem_trackers_;
 
+  // Maintains the set of in-flight transactions, and any WAL anchors
+  // associated with them.
+  TxnParticipant txn_participant_;
+
   scoped_refptr<MetricEntity> metric_entity_;
   std::unique_ptr<TabletMetrics> metrics_;
 
diff --git a/src/kudu/tablet/tablet_bootstrap.cc b/src/kudu/tablet/tablet_bootstrap.cc
index 976db5d..e691191 100644
--- a/src/kudu/tablet/tablet_bootstrap.cc
+++ b/src/kudu/tablet/tablet_bootstrap.cc
@@ -1537,7 +1537,7 @@ Status TabletBootstrap::PlayTxnParticipantOpRequest(const IOContext* /*io_contex
   });
   // NOTE: don't bother validating the current state of the op. Presumably that
   // happened the first time this op was written.
-  RETURN_NOT_OK(op_state.PerformOp());
+  RETURN_NOT_OK(op_state.PerformOp(replicate_msg->id()));
   return AppendCommitMsg(commit_msg);
 }
 
diff --git a/src/kudu/tablet/tablet_replica-test-base.cc b/src/kudu/tablet/tablet_replica-test-base.cc
index 3e0fb8a..d7a7560 100644
--- a/src/kudu/tablet/tablet_replica-test-base.cc
+++ b/src/kudu/tablet/tablet_replica-test-base.cc
@@ -20,11 +20,13 @@
 #include <memory>
 #include <ostream>
 #include <string>
+#include <utility>
 
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
 #include "kudu/common/common.pb.h"
+#include "kudu/common/wire_protocol.h"
 #include "kudu/consensus/consensus_meta.h"
 #include "kudu/consensus/consensus_meta_manager.h"
 #include "kudu/consensus/log.h"
@@ -38,12 +40,17 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/messenger.h"
 #include "kudu/rpc/result_tracker.h"
+#include "kudu/tablet/ops/op.h"
+#include "kudu/tablet/ops/write_op.h"
 #include "kudu/tablet/tablet-test-util.h"
 #include "kudu/tablet/tablet.h"
 #include "kudu/tablet/tablet_bootstrap.h"
 #include "kudu/tablet/tablet_metadata.h"
 #include "kudu/tablet/tablet_replica.h"
+#include "kudu/tserver/tserver.pb.h"
+#include "kudu/util/countdown_latch.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/pb_util.h"
 #include "kudu/util/test_macros.h"
 
 using kudu::consensus::ConsensusBootstrapInfo;
@@ -53,11 +60,14 @@ using kudu::consensus::RaftConfigPB;
 using kudu::consensus::RaftPeerPB;
 using kudu::log::Log;
 using kudu::log::LogOptions;
+using kudu::pb_util::SecureDebugString;
 using kudu::rpc::MessengerBuilder;
 using kudu::rpc::ResultTracker;
 using kudu::tablet::KuduTabletTest;
-using kudu::tablet::TabletReplica;
+using kudu::tserver::WriteRequestPB;
+using kudu::tserver::WriteResponsePB;
 using std::shared_ptr;
+using std::unique_ptr;
 using std::string;
 
 METRIC_DECLARE_entity(tablet);
@@ -69,6 +79,25 @@ namespace {
 const MonoDelta kLeadershipTimeout = MonoDelta::FromSeconds(10);
 } // anonymous namespace
 
+Status TabletReplicaTestBase::ExecuteWrite(TabletReplica* replica, const WriteRequestPB& req) {
+  WriteResponsePB resp;
+  unique_ptr<WriteOpState> op_state(new WriteOpState(replica,
+                                                     &req,
+                                                     nullptr, // No RequestIdPB
+                                                     &resp));
+
+  CountDownLatch rpc_latch(1);
+  op_state->set_completion_callback(unique_ptr<OpCompletionCallback>(
+      new LatchOpCompletionCallback<WriteResponsePB>(&rpc_latch, &resp)));
+
+  RETURN_NOT_OK(replica->SubmitWrite(std::move(op_state)));
+  rpc_latch.Wait();
+  if (resp.has_error()) {
+    return StatusFromPB(resp.error().status());
+  }
+  return Status::OK();
+}
+
 void TabletReplicaTestBase::SetUp() {
   KuduTabletTest::SetUp();
 
@@ -154,9 +183,16 @@ Status TabletReplicaTestBase::StartReplicaAndWaitUntilLeader(const ConsensusBoot
   return tablet_replica_->consensus()->WaitUntilLeaderForTests(kLeadershipTimeout);
 }
 
-Status TabletReplicaTestBase::RestartReplica() {
+Status TabletReplicaTestBase::RestartReplica(bool reset_tablet) {
   tablet_replica_->Shutdown();
   tablet_replica_.reset();
+  // Reset the underlying harness's tablet if requested.
+  if (reset_tablet) {
+    CreateTestTablet();
+    // NOTE: the FsManager is owned by the harness, so refreshing the harness
+    // means we have to refresh anything that depends on its FsManager.
+    cmeta_manager_.reset(new ConsensusMetadataManager(fs_manager()));
+  }
   RETURN_NOT_OK(SetUpReplica(/*new_replica=*/ false));
   scoped_refptr<ConsensusMetadata> cmeta;
   RETURN_NOT_OK(cmeta_manager_->Load(tablet_replica_->tablet_id(), &cmeta));
diff --git a/src/kudu/tablet/tablet_replica-test-base.h b/src/kudu/tablet/tablet_replica-test-base.h
index a302dd2..f47478c 100644
--- a/src/kudu/tablet/tablet_replica-test-base.h
+++ b/src/kudu/tablet/tablet_replica-test-base.h
@@ -39,6 +39,10 @@ namespace rpc {
 class Messenger;
 }  // namespace rpc
 
+namespace tserver {
+class WriteRequestPB;
+}  // namespace tserver
+
 namespace tablet {
 
 class TabletReplicaTestBase : public KuduTabletTest {
@@ -46,6 +50,11 @@ class TabletReplicaTestBase : public KuduTabletTest {
   explicit TabletReplicaTestBase(const Schema& schema)
       : KuduTabletTest(schema, TabletHarness::Options::ClockType::HYBRID_CLOCK),
         dns_resolver_(new DnsResolver) {}
+
+  // Submits the given request to the given replica, waiting for the request to
+  // complete before returning.
+  static Status ExecuteWrite(TabletReplica* replica, const tserver::WriteRequestPB& req);
+
   void SetUp() override;
   void TearDown() override;
 
@@ -58,7 +67,12 @@ class TabletReplicaTestBase : public KuduTabletTest {
 
   // Shuts down and restarts the tablet replica, bootstrapping it from its
   // on-disk stores and WALs.
-  Status RestartReplica();
+  //
+  // If 'reset_tablet' is set to true, resets the underlying tablet harness'
+  // Tablet instance. The 'false' default is for cases in which the tablet may
+  // have persisted a schema change; otherwise rebuilding the tablet may crash
+  // with a schema mismatch.
+  Status RestartReplica(bool reset_tablet = false);
 
   const scoped_refptr<TabletReplica>& tablet_replica() const { return tablet_replica_; }
 
diff --git a/src/kudu/tablet/tablet_replica-test.cc b/src/kudu/tablet/tablet_replica-test.cc
index e6f31ef..32a2c62 100644
--- a/src/kudu/tablet/tablet_replica-test.cc
+++ b/src/kudu/tablet/tablet_replica-test.cc
@@ -140,24 +140,6 @@ class TabletReplicaTest : public TabletReplicaTestBase {
     return Status::OK();
   }
 
-  Status ExecuteWrite(TabletReplica* replica, const WriteRequestPB& req) {
-    unique_ptr<WriteResponsePB> resp(new WriteResponsePB());
-    unique_ptr<WriteOpState> op_state(new WriteOpState(replica,
-                                                       &req,
-                                                       nullptr, // No RequestIdPB
-                                                       resp.get()));
-
-    CountDownLatch rpc_latch(1);
-    op_state->set_completion_callback(unique_ptr<OpCompletionCallback>(
-        new LatchOpCompletionCallback<WriteResponsePB>(&rpc_latch, resp.get())));
-
-    RETURN_NOT_OK(replica->SubmitWrite(std::move(op_state)));
-    rpc_latch.Wait();
-    CHECK(!resp->has_error())
-        << "\nReq:\n" << SecureDebugString(req) << "Resp:\n" << SecureDebugString(*resp);
-    return Status::OK();
-  }
-
   Status UpdateSchema(const SchemaPB& schema, int schema_version) {
     AlterSchemaRequestPB alter;
     alter.set_dest_uuid(tablet()->metadata()->fs_manager()->uuid());
diff --git a/src/kudu/tablet/txn_participant-test-util.h b/src/kudu/tablet/txn_participant-test-util.h
index b4bed41..fcce4ee 100644
--- a/src/kudu/tablet/txn_participant-test-util.h
+++ b/src/kudu/tablet/txn_participant-test-util.h
@@ -35,13 +35,14 @@ const std::vector<tserver::ParticipantOpPB::ParticipantOpType> kCommitSequence =
   tserver::ParticipantOpPB::FINALIZE_COMMIT,
 };
 
-Status CallParticipantOp(TabletReplica* replica,
-                         int64_t txn_id,
-                         tserver::ParticipantOpPB::ParticipantOpType type,
-                         int64_t finalized_commit_timestamp,
-                         tserver::ParticipantResponsePB* resp) {
-  tserver::ParticipantRequestPB req;
-  auto* op = req.mutable_op();
+std::unique_ptr<ParticipantOpState> NewParticipantOp(
+    TabletReplica* replica,
+    int64_t txn_id,
+    tserver::ParticipantOpPB::ParticipantOpType type,
+    int64_t finalized_commit_timestamp,
+    tserver::ParticipantRequestPB* req,
+    tserver::ParticipantResponsePB* resp) {
+  auto* op = req->mutable_op();
   op->set_txn_id(txn_id);
   op->set_type(type);
   if (type == tserver::ParticipantOpPB::FINALIZE_COMMIT) {
@@ -50,8 +51,19 @@ Status CallParticipantOp(TabletReplica* replica,
   std::unique_ptr<ParticipantOpState> op_state(new ParticipantOpState(
       replica,
       replica->tablet()->txn_participant(),
-      &req,
+      req,
       resp));
+  return op_state;
+}
+
+Status CallParticipantOp(TabletReplica* replica,
+                         int64_t txn_id,
+                         tserver::ParticipantOpPB::ParticipantOpType type,
+                         int64_t finalized_commit_timestamp,
+                         tserver::ParticipantResponsePB* resp) {
+  tserver::ParticipantRequestPB req;
+  std::unique_ptr<ParticipantOpState> op_state =
+    NewParticipantOp(replica, txn_id, type, finalized_commit_timestamp, &req, resp);
   CountDownLatch latch(1);
   op_state->set_completion_callback(std::unique_ptr<OpCompletionCallback>(
       new LatchOpCompletionCallback<tserver::ParticipantResponsePB>(&latch, resp)));
diff --git a/src/kudu/tablet/txn_participant-test.cc b/src/kudu/tablet/txn_participant-test.cc
index 5dbc925..44ded0e 100644
--- a/src/kudu/tablet/txn_participant-test.cc
+++ b/src/kudu/tablet/txn_participant-test.cc
@@ -21,55 +21,133 @@
 #include <cstdint>
 #include <map>
 #include <memory>
+#include <ostream>
 #include <string>
 #include <thread>
 #include <utility>
 #include <vector>
 
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
 #include <gtest/gtest.h>
 
 #include "kudu/common/common.pb.h"
+#include "kudu/common/partial_row.h"
+#include "kudu/common/row_operations.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/wire_protocol.h"
 #include "kudu/common/wire_protocol.pb.h"
+#include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/log.h"
+#include "kudu/consensus/log_anchor_registry.h"
+#include "kudu/consensus/opid.pb.h"
 #include "kudu/consensus/raft_consensus.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/tablet/ops/op.h"
+#include "kudu/tablet/ops/op_driver.h"
+#include "kudu/tablet/ops/op_tracker.h"
+#include "kudu/tablet/ops/participant_op.h"
 #include "kudu/tablet/tablet.h"
 #include "kudu/tablet/tablet_replica-test-base.h"
 #include "kudu/tablet/tablet_replica.h"
 #include "kudu/tablet/txn_participant-test-util.h"
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/tserver/tserver_admin.pb.h"
+#include "kudu/util/countdown_latch.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 
+using kudu::consensus::CommitMsg;
 using kudu::consensus::ConsensusBootstrapInfo;
 using kudu::pb_util::SecureShortDebugString;
 using kudu::tserver::ParticipantRequestPB;
 using kudu::tserver::ParticipantResponsePB;
 using kudu::tserver::ParticipantOpPB;
+using kudu::tserver::WriteRequestPB;
 using std::map;
 using std::thread;
 using std::unique_ptr;
 using std::vector;
 
+DECLARE_bool(enable_maintenance_manager);
+DECLARE_bool(log_preallocate_segments);
+DECLARE_bool(log_async_preallocate_segments);
+
 namespace kudu {
 namespace tablet {
 
+namespace {
+Schema GetTestSchema() {
+  return Schema({ ColumnSchema("key", INT32) }, 1);
+}
+
+// A participant op that waits to start and finish applying based on input
+// latches.
+class DelayedParticipantOp : public ParticipantOp {
+ public:
+  DelayedParticipantOp(CountDownLatch* apply_started,
+                       CountDownLatch* apply_continue,
+                       unique_ptr<ParticipantOpState> state)
+    : ParticipantOp(std::move(state), consensus::LEADER),
+      apply_started_(apply_started),
+      apply_continue_(apply_continue) {}
+
+  Status Apply(CommitMsg** commit_msg) override {
+    apply_started_->CountDown();
+    LOG(INFO) << "Delaying apply...";
+    apply_continue_->Wait();
+    return ParticipantOp::Apply(commit_msg);
+  }
+
+ private:
+  CountDownLatch* apply_started_;
+  CountDownLatch* apply_continue_;
+};
+} // anonymous namespace
+
 class TxnParticipantTest : public TabletReplicaTestBase {
  public:
   TxnParticipantTest()
-      : TabletReplicaTestBase(Schema({ ColumnSchema("key", INT32) }, 1)) {}
+      : TabletReplicaTestBase(GetTestSchema()) {}
 
   void SetUp() override {
+    // Some of these tests will test the durability semantics of participants.
+    // So we have finer-grained control of on-disk state, disable anything that
+    // might write to disk in the background.
+    FLAGS_enable_maintenance_manager = false;
+    FLAGS_log_preallocate_segments = false;
+    FLAGS_log_async_preallocate_segments = false;
+
     NO_FATALS(TabletReplicaTestBase::SetUp());
     ConsensusBootstrapInfo info;
     ASSERT_OK(StartReplicaAndWaitUntilLeader(info));
   }
 
-  const TxnParticipant* txn_participant() const {
+  Status Write(int key) {
+    WriteRequestPB req;
+    req.set_tablet_id(tablet_replica_->tablet_id());
+    const auto& schema = GetTestSchema();
+    RETURN_NOT_OK(SchemaToPB(schema, req.mutable_schema()));
+    KuduPartialRow row(&schema);
+    RETURN_NOT_OK(row.SetInt32(0, key));
+    RowOperationsPBEncoder enc(req.mutable_row_operations());
+    enc.Add(RowOperationsPB::INSERT, row);
+    return ExecuteWrite(tablet_replica_.get(), req);
+  }
+
+  // Writes an op to the WAL, rolls over onto a new WAL segment, and flushes
+  // the MRS, leaving us with a new WAL segment that should be GC-able unless
+  // previous WAL segments are anchored.
+  Status WriteRolloverAndFlush(int* current_key) {
+    RETURN_NOT_OK(Write(*current_key++));
+    RETURN_NOT_OK(tablet_replica_->log()->WaitUntilAllFlushed());
+    RETURN_NOT_OK(tablet_replica_->log()->AllocateSegmentAndRollOverForTests());
+    return tablet_replica_->tablet()->Flush();
+  }
+
+  TxnParticipant* txn_participant() {
     return tablet_replica_->tablet()->txn_participant();
   }
 };
@@ -300,6 +378,174 @@ TEST_F(TxnParticipantTest, TestReplayParticipantOps) {
   ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
       { kTxnId, Txn::kCommitted, kDummyCommitTimestamp }
   }), txn_participant()->GetTxnsForTests());
+  ASSERT_OK(RestartReplica());
+  ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
+      { kTxnId, Txn::kCommitted, kDummyCommitTimestamp }
+  }), txn_participant()->GetTxnsForTests());
+}
+
+// Test that each transaction has a single anchor that gets updated as
+// participant ops land.
+TEST_F(TxnParticipantTest, TestAllOpsRegisterAnchors) {
+  int64_t expected_index = 1;
+  // Validates that each op in the given sequence updates the single anchor
+  // maintained for the transaction.
+  const auto check_participant_ops_are_anchored =
+    [&] (int64_t txn_id, const vector<ParticipantOpPB::ParticipantOpType>& ops) {
+      for (const auto& op : ops) {
+        ParticipantResponsePB resp;
+        ASSERT_OK(CallParticipantOp(tablet_replica_.get(), txn_id, op,
+                                    kDummyCommitTimestamp, &resp));
+        ASSERT_EQ(1, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests());
+        int64_t log_index = -1;
+        tablet_replica_->log_anchor_registry()->GetEarliestRegisteredLogIndex(&log_index);
+        ASSERT_EQ(++expected_index, log_index);
+      }
+      ASSERT_TRUE(txn_participant()->ClearIfCompleteForTests(txn_id));
+      ASSERT_EQ(0, tablet_replica_->log_anchor_registry()->GetAnchorCountForTests());
+    };
+  NO_FATALS(check_participant_ops_are_anchored(1, {
+      ParticipantOpPB::BEGIN_TXN,
+      ParticipantOpPB::BEGIN_COMMIT,
+      ParticipantOpPB::FINALIZE_COMMIT
+  }));
+  NO_FATALS(check_participant_ops_are_anchored(2, {
+      ParticipantOpPB::BEGIN_TXN,
+      ParticipantOpPB::BEGIN_COMMIT,
+      ParticipantOpPB::ABORT_TXN
+  }));
+}
+
+// Test that participant ops are anchored, the anchors are updated as a
+// transaction's state gets updated.
+TEST_F(TxnParticipantTest, TestParticipantOpsAnchorWALs) {
+  const int64_t kTxnId = 1;
+  // First, perform some initial participant ops and roll the WAL segments so
+  // there are some candidates for WAL GC.
+  ParticipantResponsePB resp;
+  ASSERT_OK(CallParticipantOp(tablet_replica_.get(), kTxnId, ParticipantOpPB::BEGIN_TXN,
+                              kDummyCommitTimestamp, &resp));
+  ASSERT_FALSE(resp.has_error());
+  ASSERT_OK(tablet_replica_->log()->WaitUntilAllFlushed());
+  ASSERT_OK(tablet_replica_->log()->AllocateSegmentAndRollOverForTests());
+
+  // Write and flush some ops that would otherwise lead to GC-able WAL
+  // segments. Since there is an anchored participant op in the WAL before
+  // these writes, the tablet should not be GC-able.
+  int current_key = 0;
+  ASSERT_OK(WriteRolloverAndFlush(&current_key));
+  ASSERT_OK(WriteRolloverAndFlush(&current_key));
+  int64_t gcable_size;
+  ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size));
+  ASSERT_EQ(0, gcable_size);
+
+  // WAL GC should proceed to clear out ops for both the transaction and the
+  // inserts.
+  ASSERT_OK(CallParticipantOp(tablet_replica_.get(), kTxnId, ParticipantOpPB::BEGIN_COMMIT,
+                              kDummyCommitTimestamp, &resp));
+  ASSERT_FALSE(resp.has_error());
+  ASSERT_OK(CallParticipantOp(tablet_replica_.get(), kTxnId, ParticipantOpPB::FINALIZE_COMMIT,
+                              kDummyCommitTimestamp, &resp));
+  ASSERT_FALSE(resp.has_error());
+  ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size));
+  ASSERT_GT(gcable_size, 0);
+
+  ASSERT_OK(tablet_replica_->RunLogGC());
+  ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size));
+  ASSERT_EQ(0, gcable_size);
+
+  // Ensure the transaction bootstraps to the expected state.
+  // NOTE: we need to reset the tablet here to reset the TxnParticipant.
+  // Otherwise, we might start the replica with a LogAnchor already registered.
+  ASSERT_OK(RestartReplica(/*reset_tablet*/true));
+  ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
+      { kTxnId, Txn::kCommitted, kDummyCommitTimestamp }
+  }), txn_participant()->GetTxnsForTests());
+
+  // Roll onto new WAL segments and add more segments so we can get to a state
+  // without any transaction ops in the WALs.
+  ASSERT_OK(WriteRolloverAndFlush(&current_key));
+  ASSERT_OK(WriteRolloverAndFlush(&current_key));
+
+  // While the transaction still exists, we shouldn't GC anything.
+  ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size));
+  ASSERT_EQ(0, gcable_size);
+
+  // Once we cull the transaction state in memory, we should be left with no
+  // trace of the transaction.
+  ASSERT_TRUE(txn_participant()->ClearIfCompleteForTests(kTxnId));
+  ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size));
+  ASSERT_GT(gcable_size, 0);
+
+  ASSERT_OK(tablet_replica_->RunLogGC());
+  ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size));
+  ASSERT_EQ(0, gcable_size);
+
+  // Do a final check that we bootstrap to the expected state (i.e. the
+  // transaction is culled).
+  ASSERT_OK(RestartReplica(/*reset_tablet*/true));
+  ASSERT_TRUE(txn_participant()->GetTxnsForTests().empty());
+}
+
+// Similar to the above test, but checking that in-flight ops anchor the WALs.
+TEST_F(TxnParticipantTest, TestActiveParticipantOpsAnchorWALs) {
+  const int64_t kTxnId = 1;
+  ParticipantRequestPB req;
+  ParticipantResponsePB resp;
+  auto op_state = NewParticipantOp(tablet_replica_.get(), kTxnId, ParticipantOpPB::BEGIN_TXN,
+                                   kDummyCommitTimestamp, &req, &resp);
+  CountDownLatch latch(1);
+  CountDownLatch apply_start(1);
+  CountDownLatch apply_continue(1);
+  op_state->set_completion_callback(std::unique_ptr<OpCompletionCallback>(
+      new LatchOpCompletionCallback<tserver::ParticipantResponsePB>(&latch, &resp)));
+  scoped_refptr<OpDriver> driver;
+  unique_ptr<DelayedParticipantOp> op(
+      new DelayedParticipantOp(&apply_start, &apply_continue, std::move(op_state)));
+  ASSERT_OK(tablet_replica_->NewLeaderOpDriver(std::move(op), &driver));
+  ASSERT_OK(driver->ExecuteAsync());
+  // Wait for the apply to start, indicating that we have persisted and
+  // replicated but not yet Raft committed the participant op.
+  apply_start.Wait();
+  ASSERT_TRUE(driver->GetOpId().IsInitialized());
+  ASSERT_EQ(1, tablet_replica_->op_tracker()->GetNumPendingForTests());
+
+  // Create some WAL segments to ensure some would-be-GC-able segments.
+  int current_key = 0;
+  ASSERT_OK(WriteRolloverAndFlush(&current_key));
+  ASSERT_OK(WriteRolloverAndFlush(&current_key));
+
+  // Our participant op is still pending, and nothing should be GC-able.
+  ASSERT_EQ(1, tablet_replica_->op_tracker()->GetNumPendingForTests());
+  int64_t gcable_size;
+  ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size));
+  ASSERT_EQ(0, gcable_size);
+
+  // Finish applying the participant op and proceed to completion.
+  apply_continue.CountDown();
+  latch.Wait();
+
+  // Even though we've completed the op, the replicate message should still be
+  // anchored while the in-memory transaction state exists on this participant.
+  ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size));
+  ASSERT_EQ(0, gcable_size);
+  ASSERT_OK(WriteRolloverAndFlush(&current_key));
+  ASSERT_OK(WriteRolloverAndFlush(&current_key));
+  ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size));
+  ASSERT_EQ(0, gcable_size);
+
+  // The moment we update the in-memory state, we should be able to GC.
+  ASSERT_OK(CallParticipantOp(tablet_replica_.get(), kTxnId, ParticipantOpPB::BEGIN_COMMIT,
+                              kDummyCommitTimestamp, &resp));
+  ASSERT_FALSE(resp.has_error());
+  ASSERT_OK(tablet_replica_->GetGCableDataSize(&gcable_size));
+  ASSERT_GT(gcable_size, 0);
+
+  // As a sanity check, ensure we get to the expected state if we reboot.
+  ASSERT_OK(RestartReplica(/*reset_tablet*/true));
+  ASSERT_EQ(vector<TxnParticipant::TxnEntry>({
+      { kTxnId, Txn::kCommitInProgress, -1 }
+  }), txn_participant()->GetTxnsForTests());
 }
 
 } // namespace tablet
diff --git a/src/kudu/tablet/txn_participant.cc b/src/kudu/tablet/txn_participant.cc
index a3ef237..8ade9ec 100644
--- a/src/kudu/tablet/txn_participant.cc
+++ b/src/kudu/tablet/txn_participant.cc
@@ -27,6 +27,7 @@
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
 
+using kudu::log::LogAnchorRegistry;
 using std::vector;
 using strings::Substitute;
 
@@ -38,10 +39,11 @@ void Txn::AcquireWriteLock(std::unique_lock<rw_semaphore>* txn_lock) {
   *txn_lock = std::move(l);
 }
 
-scoped_refptr<Txn> TxnParticipant::GetOrCreateTransaction(int64_t txn_id) {
+scoped_refptr<Txn> TxnParticipant::GetOrCreateTransaction(int64_t txn_id,
+                                                          LogAnchorRegistry* log_anchor_registry) {
   // TODO(awong): add a 'user' field to these transactions.
   std::lock_guard<simple_spinlock> l(lock_);
-  return LookupOrInsertNewSharedPtr(&txns_, txn_id);
+  return LookupOrInsertNewSharedPtr(&txns_, txn_id, txn_id, log_anchor_registry);
 }
 
 void TxnParticipant::ClearIfInitFailed(int64_t txn_id) {
@@ -54,6 +56,20 @@ void TxnParticipant::ClearIfInitFailed(int64_t txn_id) {
   }
 }
 
+bool TxnParticipant::ClearIfCompleteForTests(int64_t txn_id) {
+  std::lock_guard<simple_spinlock> l(lock_);
+  Txn* txn = FindPointeeOrNull(txns_, txn_id);
+  // NOTE: If this is the only reference to the transaction, we can forego
+  // locking the state.
+  if (txn && txn->HasOneRef() &&
+      (txn->state() == Txn::kAborted ||
+       txn->state() == Txn::kCommitted)) {
+    txns_.erase(txn_id);
+    return true;
+  }
+  return false;
+}
+
 vector<TxnParticipant::TxnEntry> TxnParticipant::GetTxnsForTests() const {
   vector<TxnEntry> txns;
   {
diff --git a/src/kudu/tablet/txn_participant.h b/src/kudu/tablet/txn_participant.h
index c2c91c9..098d32b 100644
--- a/src/kudu/tablet/txn_participant.h
+++ b/src/kudu/tablet/txn_participant.h
@@ -24,6 +24,8 @@
 
 #include <glog/logging.h>
 
+#include "kudu/consensus/log_anchor_registry.h"
+#include "kudu/consensus/opid.pb.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
@@ -79,7 +81,22 @@ class Txn : public RefCountedThreadSafe<Txn> {
     __builtin_unreachable();
   }
 
-  Txn() : state_(kInitializing), commit_timestamp_(-1) {}
+  // Constructs a transaction instance with the given transaction ID and WAL
+  // anchor registry.
+  //
+  // The WAL anchor registry is used to ensure that the WAL segment that
+  // contains the participant op that replays to the transaction's current
+  // in-memory state is not GCed, allowing us to rebuild this transaction's
+  // in-memory state upon rebooting a server.
+  Txn(int64_t txn_id, log::LogAnchorRegistry* log_anchor_registry)
+      : txn_id_(txn_id),
+        log_anchor_registry_(log_anchor_registry),
+        state_(kInitializing),
+        commit_timestamp_(-1) {}
+
+  ~Txn() {
+    CHECK_OK(log_anchor_registry_->UnregisterIfAnchored(&log_anchor_));
+  }
 
   // Takes the state lock in write mode and returns it. As transaction state is
   // meant to be driven via an op driver, lock acquisition is expected to be
@@ -132,17 +149,25 @@ class Txn : public RefCountedThreadSafe<Txn> {
 
   // Applies the given state transitions. Should be called while holding the
   // state lock in write mode after successfully replicating a participant op.
-  void BeginTransaction() {
+  void BeginTransaction(const consensus::OpId& op_id) {
+    CHECK_OK(log_anchor_registry_->RegisterOrUpdate(
+        op_id.index(), strings::Substitute("Txn-$0-$1", txn_id_, this), &log_anchor_));
     SetState(kOpen);
   }
-  void BeginCommit() {
+  void BeginCommit(const consensus::OpId& op_id) {
+    CHECK_OK(log_anchor_registry_->RegisterOrUpdate(
+        op_id.index(), strings::Substitute("Txn-$0-$1", txn_id_, this), &log_anchor_));
     SetState(kCommitInProgress);
   }
-  void FinalizeCommit(int64_t finalized_commit_timestamp) {
+  void FinalizeCommit(const consensus::OpId& op_id, int64_t finalized_commit_timestamp) {
+    CHECK_OK(log_anchor_registry_->RegisterOrUpdate(
+        op_id.index(), strings::Substitute("Txn-$0-$1", txn_id_, this), &log_anchor_));
     SetState(kCommitted);
     commit_timestamp_ = finalized_commit_timestamp;
   }
-  void AbortTransaction() {
+  void AbortTransaction(const consensus::OpId& op_id) {
+    CHECK_OK(log_anchor_registry_->RegisterOrUpdate(
+        op_id.index(), strings::Substitute("Txn-$0-$1", txn_id_, this), &log_anchor_));
     SetState(kAborted);
   }
 
@@ -171,6 +196,14 @@ class Txn : public RefCountedThreadSafe<Txn> {
     return Status::OK();
   }
 
+  // Transaction ID for this transaction.
+  const int64_t txn_id_;
+
+  // Log anchor registry with which to anchor WAL segments, and an anchor to
+  // update upon applying a state change.
+  log::LogAnchorRegistry* log_anchor_registry_;
+  log::LogAnchor log_anchor_;
+
   // Lock protecting access to 'state_' and 'commit_timestamp'. Ops that intend
   // on mutating 'state_' must take this lock in write mode. Ops that intend on
   // reading 'state_' and relying on it remaining constant must take this lock
@@ -198,7 +231,8 @@ class TxnParticipant {
 
   // Gets the transaction state for the given transaction ID, creating it in
   // the kInitializing state if one doesn't already exist.
-  scoped_refptr<Txn> GetOrCreateTransaction(int64_t txn_id);
+  scoped_refptr<Txn> GetOrCreateTransaction(int64_t txn_id,
+                                            log::LogAnchorRegistry* log_anchor_registry);
 
   // Removes the given transaction if it failed to initialize, e.g. the op that
   // created it failed to replicate, leaving it in the kInitializing state but
@@ -209,6 +243,15 @@ class TxnParticipant {
   // the Txn, we can thread-safely determine whether it has been abandoned.
   void ClearIfInitFailed(int64_t txn_id);
 
+  // Removes the given transaction if it is in a terminal state, i.e. it is
+  // either kAborted or kCommitted, freeing any WAL anchors it may have held.
+  // Assumes there are no active op drivers updating state (i.e. that the
+  // transaction reference in our map is the only one).
+  //
+  // Returns whether or not this call actually cleared the transaction (i.e.
+  // returns 'false' if the transaction was not found)..
+  bool ClearIfCompleteForTests(int64_t txn_id);
+
   // Returns the transactions, sorted by transaction ID.
   std::vector<TxnEntry> GetTxnsForTests() const;
 
diff --git a/src/kudu/tserver/tablet_copy_source_session.cc b/src/kudu/tserver/tablet_copy_source_session.cc
index b919ce3..f48f036 100644
--- a/src/kudu/tserver/tablet_copy_source_session.cc
+++ b/src/kudu/tserver/tablet_copy_source_session.cc
@@ -198,7 +198,7 @@ Status TabletCopySourceSession::InitOnce() {
   // tablet copy loop due to a follower falling too far behind the
   // leader's log when tablet copy is slow. The remote controls when
   // this anchor is released by ending the tablet copy session.
-  RETURN_NOT_OK(tablet_replica_->log_anchor_registry()->UpdateRegistration(
+  RETURN_NOT_OK(tablet_replica_->log_anchor_registry()->RegisterOrUpdate(
       last_logged_opid->index(), anchor_owner_token, &log_anchor_));
 
   LOG(INFO) << Substitute(